mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-12 17:41:22 +00:00
Introduce several concrete future types. (#433)
* multisteam-select: introduce `DialerFuture`. * multistream-select: add more concrete futures. * multistream-select: add ListenerFuture. * multistream-select: add ListenerSelectFuture * Formatting. * Add DialerSelectFuture type alias. * Add UpgradeApplyFuture and NegotiationFuture. * In iterator wrappers also pass-through size_hint. * Minor refactoring. * Address review comments. * Add some comments. * Hide state enums in wrapping structs.
This commit is contained in:
@ -22,14 +22,16 @@
|
||||
//! `multistream-select` for the dialer.
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::future::{loop_fn, result, Loop, Either};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use futures::{future::Either, prelude::*, sink, stream::StreamFuture};
|
||||
use protocol::{Dialer, DialerFuture, DialerToListenerMessage, ListenerToDialerMessage};
|
||||
use std::mem;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use ProtocolChoiceError;
|
||||
|
||||
use protocol::Dialer;
|
||||
use protocol::DialerToListenerMessage;
|
||||
use protocol::ListenerToDialerMessage;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer
|
||||
/// either sequentially of by considering all protocols in parallel.
|
||||
pub type DialerSelectFuture<R, I, P> =
|
||||
Either<DialerSelectSeq<R, IgnoreMatchFn<I>, P>, DialerSelectPar<R, I, P>>;
|
||||
|
||||
/// Helps selecting a protocol amongst the ones supported.
|
||||
///
|
||||
@ -43,148 +45,337 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
/// success, the function returns the identifier (of type `P`), plus the socket which now uses that
|
||||
/// chosen protocol.
|
||||
#[inline]
|
||||
pub fn dialer_select_proto<R, I, M, P>(
|
||||
inner: R,
|
||||
protocols: I,
|
||||
) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
|
||||
pub fn dialer_select_proto<R, I, M, P>(inner: R, protocols: I) -> DialerSelectFuture<R, I, P>
|
||||
where
|
||||
R: AsyncRead + AsyncWrite,
|
||||
I: Iterator<Item = (Bytes, M, P)>,
|
||||
I: Iterator<Item=(Bytes, M, P)>,
|
||||
M: FnMut(&Bytes, &Bytes) -> bool,
|
||||
{
|
||||
// We choose between the "serial" and "parallel" strategies based on the number of protocols.
|
||||
if protocols.size_hint().1.map(|n| n <= 3).unwrap_or(false) {
|
||||
let fut = dialer_select_proto_serial(inner, protocols.map(|(n, _, id)| (n, id)));
|
||||
Either::A(fut)
|
||||
Either::A(dialer_select_proto_serial(inner, IgnoreMatchFn(protocols)))
|
||||
} else {
|
||||
let fut = dialer_select_proto_parallel(inner, protocols);
|
||||
Either::B(fut)
|
||||
Either::B(dialer_select_proto_parallel(inner, protocols))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Iterator, which ignores match predicates of the iterator it wraps.
|
||||
pub struct IgnoreMatchFn<I>(I);
|
||||
|
||||
impl<I, M, P> Iterator for IgnoreMatchFn<I>
|
||||
where
|
||||
I: Iterator<Item=(Bytes, M, P)>
|
||||
{
|
||||
type Item = (Bytes, P);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.0.next().map(|(b, _, p)| (b, p))
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.0.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Helps selecting a protocol amongst the ones supported.
|
||||
///
|
||||
/// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce
|
||||
/// match functions, because it's not needed.
|
||||
pub fn dialer_select_proto_serial<R, I, P>(
|
||||
inner: R,
|
||||
mut protocols: I,
|
||||
) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
|
||||
pub fn dialer_select_proto_serial<R, I, P>(inner: R, protocols: I,) -> DialerSelectSeq<R, I, P>
|
||||
where
|
||||
R: AsyncRead + AsyncWrite,
|
||||
I: Iterator<Item = (Bytes, P)>,
|
||||
{
|
||||
Dialer::new(inner).from_err().and_then(move |dialer| {
|
||||
// Similar to a `loop` keyword.
|
||||
loop_fn(dialer, move |dialer| {
|
||||
result(protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound))
|
||||
// If the `protocols` iterator produced an element, send it to the dialer
|
||||
.and_then(|(proto_name, proto_value)| {
|
||||
let req = DialerToListenerMessage::ProtocolRequest {
|
||||
name: proto_name.clone()
|
||||
};
|
||||
trace!("sending {:?}", req);
|
||||
dialer.send(req)
|
||||
.map(|d| (d, proto_name, proto_value))
|
||||
.from_err()
|
||||
})
|
||||
// Once sent, read one element from `dialer`.
|
||||
.and_then(|(dialer, proto_name, proto_value)| {
|
||||
dialer
|
||||
.into_future()
|
||||
.map(|(msg, rest)| (msg, rest, proto_name, proto_value))
|
||||
.map_err(|(e, _)| e.into())
|
||||
})
|
||||
// Once read, analyze the response.
|
||||
.and_then(|(message, rest, proto_name, proto_value)| {
|
||||
trace!("received {:?}", message);
|
||||
let message = message.ok_or(ProtocolChoiceError::UnexpectedMessage)?;
|
||||
|
||||
match message {
|
||||
ListenerToDialerMessage::ProtocolAck { ref name }
|
||||
if name == &proto_name =>
|
||||
{
|
||||
// Satisfactory response, break the loop.
|
||||
Ok(Loop::Break((proto_value, rest.into_inner())))
|
||||
},
|
||||
ListenerToDialerMessage::NotAvailable => {
|
||||
Ok(Loop::Continue(rest))
|
||||
},
|
||||
_ => Err(ProtocolChoiceError::UnexpectedMessage),
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
DialerSelectSeq {
|
||||
inner: DialerSelectSeqState::AwaitDialer { dialer_fut: Dialer::new(inner), protocols }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Future, returned by `dialer_select_proto_serial` which selects a protocol
|
||||
/// and dialer sequentially.
|
||||
pub struct DialerSelectSeq<R: AsyncRead + AsyncWrite, I, P> {
|
||||
inner: DialerSelectSeqState<R, I, P>
|
||||
}
|
||||
|
||||
enum DialerSelectSeqState<R: AsyncRead + AsyncWrite, I, P> {
|
||||
AwaitDialer {
|
||||
dialer_fut: DialerFuture<R>,
|
||||
protocols: I
|
||||
},
|
||||
NextProtocol {
|
||||
dialer: Dialer<R>,
|
||||
protocols: I
|
||||
},
|
||||
SendProtocol {
|
||||
sender: sink::Send<Dialer<R>>,
|
||||
proto_name: Bytes,
|
||||
proto_value: P,
|
||||
protocols: I
|
||||
},
|
||||
AwaitProtocol {
|
||||
stream: StreamFuture<Dialer<R>>,
|
||||
proto_name: Bytes,
|
||||
proto_value: P,
|
||||
protocols: I
|
||||
},
|
||||
Undefined
|
||||
}
|
||||
|
||||
impl<R, I, P> Future for DialerSelectSeq<R, I, P>
|
||||
where
|
||||
I: Iterator<Item=(Bytes, P)>,
|
||||
R: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = (P, R);
|
||||
type Error = ProtocolChoiceError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
match mem::replace(&mut self.inner, DialerSelectSeqState::Undefined) {
|
||||
DialerSelectSeqState::AwaitDialer { mut dialer_fut, protocols } => {
|
||||
let dialer = match dialer_fut.poll()? {
|
||||
Async::Ready(d) => d,
|
||||
Async::NotReady => {
|
||||
self.inner = DialerSelectSeqState::AwaitDialer { dialer_fut, protocols };
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
};
|
||||
self.inner = DialerSelectSeqState::NextProtocol { dialer, protocols }
|
||||
}
|
||||
DialerSelectSeqState::NextProtocol { dialer, mut protocols } => {
|
||||
let (proto_name, proto_value) =
|
||||
protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound)?;
|
||||
let req = DialerToListenerMessage::ProtocolRequest {
|
||||
name: proto_name.clone()
|
||||
};
|
||||
trace!("sending {:?}", req);
|
||||
let sender = dialer.send(req);
|
||||
self.inner = DialerSelectSeqState::SendProtocol {
|
||||
sender,
|
||||
proto_name,
|
||||
proto_value,
|
||||
protocols
|
||||
}
|
||||
}
|
||||
DialerSelectSeqState::SendProtocol { mut sender, proto_name, proto_value, protocols } => {
|
||||
let dialer = match sender.poll()? {
|
||||
Async::Ready(d) => d,
|
||||
Async::NotReady => {
|
||||
self.inner = DialerSelectSeqState::SendProtocol {
|
||||
sender,
|
||||
proto_name,
|
||||
proto_value,
|
||||
protocols
|
||||
};
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
};
|
||||
let stream = dialer.into_future();
|
||||
self.inner = DialerSelectSeqState::AwaitProtocol {
|
||||
stream,
|
||||
proto_name,
|
||||
proto_value,
|
||||
protocols
|
||||
};
|
||||
}
|
||||
DialerSelectSeqState::AwaitProtocol { mut stream, proto_name, proto_value, protocols } => {
|
||||
let (m, r) = match stream.poll() {
|
||||
Ok(Async::Ready(x)) => x,
|
||||
Ok(Async::NotReady) => {
|
||||
self.inner = DialerSelectSeqState::AwaitProtocol {
|
||||
stream,
|
||||
proto_name,
|
||||
proto_value,
|
||||
protocols
|
||||
};
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
Err((e, _)) => return Err(ProtocolChoiceError::from(e))
|
||||
};
|
||||
trace!("received {:?}", m);
|
||||
match m.ok_or(ProtocolChoiceError::UnexpectedMessage)? {
|
||||
ListenerToDialerMessage::ProtocolAck { ref name } if name == &proto_name => {
|
||||
return Ok(Async::Ready((proto_value, r.into_inner())))
|
||||
},
|
||||
ListenerToDialerMessage::NotAvailable => {
|
||||
self.inner = DialerSelectSeqState::NextProtocol { dialer: r, protocols }
|
||||
}
|
||||
_ => return Err(ProtocolChoiceError::UnexpectedMessage)
|
||||
}
|
||||
}
|
||||
DialerSelectSeqState::Undefined =>
|
||||
panic!("DialerSelectSeqState::poll called after completion")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Helps selecting a protocol amongst the ones supported.
|
||||
///
|
||||
/// Same as `dialer_select_proto`. Queries the list of supported protocols from the remote, then
|
||||
/// chooses the most appropriate one.
|
||||
pub fn dialer_select_proto_parallel<R, I, M, P>(
|
||||
inner: R,
|
||||
protocols: I,
|
||||
) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
|
||||
pub fn dialer_select_proto_parallel<R, I, M, P>(inner: R, protocols: I) -> DialerSelectPar<R, I, P>
|
||||
where
|
||||
R: AsyncRead + AsyncWrite,
|
||||
I: Iterator<Item = (Bytes, M, P)>,
|
||||
M: FnMut(&Bytes, &Bytes) -> bool,
|
||||
{
|
||||
Dialer::new(inner)
|
||||
.from_err()
|
||||
.and_then(move |dialer| {
|
||||
trace!("requesting protocols list");
|
||||
dialer
|
||||
.send(DialerToListenerMessage::ProtocolsListRequest)
|
||||
.from_err()
|
||||
})
|
||||
.and_then(move |dialer| dialer.into_future().map_err(|(e, _)| e.into()))
|
||||
.and_then(move |(msg, dialer)| {
|
||||
trace!("protocols list response: {:?}", msg);
|
||||
let list = match msg {
|
||||
Some(ListenerToDialerMessage::ProtocolsListResponse { list }) => list,
|
||||
_ => return Err(ProtocolChoiceError::UnexpectedMessage),
|
||||
};
|
||||
DialerSelectPar {
|
||||
inner: DialerSelectParState::AwaitDialer { dialer_fut: Dialer::new(inner), protocols }
|
||||
}
|
||||
}
|
||||
|
||||
let mut found = None;
|
||||
for (local_name, mut match_fn, ident) in protocols {
|
||||
for remote_name in &list {
|
||||
if match_fn(remote_name, &local_name) {
|
||||
found = Some((remote_name.clone(), ident));
|
||||
break;
|
||||
|
||||
/// Future, returned by `dialer_select_proto_parallel`, which selects a protocol and dialer in
|
||||
/// parellel, by first requesting the liste of protocols supported by the remote endpoint and
|
||||
/// then selecting the most appropriate one by applying a match predicate to the result.
|
||||
pub struct DialerSelectPar<R: AsyncRead + AsyncWrite, I, P> {
|
||||
inner: DialerSelectParState<R, I, P>
|
||||
}
|
||||
|
||||
enum DialerSelectParState<R: AsyncRead + AsyncWrite, I, P> {
|
||||
AwaitDialer {
|
||||
dialer_fut: DialerFuture<R>,
|
||||
protocols: I
|
||||
},
|
||||
SendRequest {
|
||||
sender: sink::Send<Dialer<R>>,
|
||||
protocols: I
|
||||
},
|
||||
AwaitResponse {
|
||||
stream: StreamFuture<Dialer<R>>,
|
||||
protocols: I
|
||||
},
|
||||
SendProtocol {
|
||||
sender: sink::Send<Dialer<R>>,
|
||||
proto_name: Bytes,
|
||||
proto_val: P
|
||||
},
|
||||
AwaitProtocol {
|
||||
stream: StreamFuture<Dialer<R>>,
|
||||
proto_name: Bytes,
|
||||
proto_val: P
|
||||
},
|
||||
Undefined
|
||||
}
|
||||
|
||||
impl<R, I, M, P> Future for DialerSelectPar<R, I, P>
|
||||
where
|
||||
I: Iterator<Item=(Bytes, M, P)>,
|
||||
M: FnMut(&Bytes, &Bytes) -> bool,
|
||||
R: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = (P, R);
|
||||
type Error = ProtocolChoiceError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
match mem::replace(&mut self.inner, DialerSelectParState::Undefined) {
|
||||
DialerSelectParState::AwaitDialer { mut dialer_fut, protocols } => {
|
||||
let dialer = match dialer_fut.poll()? {
|
||||
Async::Ready(d) => d,
|
||||
Async::NotReady => {
|
||||
self.inner = DialerSelectParState::AwaitDialer { dialer_fut, protocols };
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
};
|
||||
trace!("requesting protocols list");
|
||||
let sender = dialer.send(DialerToListenerMessage::ProtocolsListRequest);
|
||||
self.inner = DialerSelectParState::SendRequest { sender, protocols };
|
||||
}
|
||||
DialerSelectParState::SendRequest { mut sender, protocols } => {
|
||||
let dialer = match sender.poll()? {
|
||||
Async::Ready(d) => d,
|
||||
Async::NotReady => {
|
||||
self.inner = DialerSelectParState::SendRequest { sender, protocols };
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
};
|
||||
let stream = dialer.into_future();
|
||||
self.inner = DialerSelectParState::AwaitResponse { stream, protocols };
|
||||
}
|
||||
DialerSelectParState::AwaitResponse { mut stream, protocols } => {
|
||||
let (m, d) = match stream.poll() {
|
||||
Ok(Async::Ready(x)) => x,
|
||||
Ok(Async::NotReady) => {
|
||||
self.inner = DialerSelectParState::AwaitResponse { stream, protocols };
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
Err((e, _)) => return Err(ProtocolChoiceError::from(e))
|
||||
};
|
||||
trace!("protocols list response: {:?}", m);
|
||||
let list = match m {
|
||||
Some(ListenerToDialerMessage::ProtocolsListResponse { list }) => list,
|
||||
_ => return Err(ProtocolChoiceError::UnexpectedMessage),
|
||||
};
|
||||
let mut found = None;
|
||||
for (local_name, mut match_fn, ident) in protocols {
|
||||
for remote_name in &list {
|
||||
if match_fn(remote_name, &local_name) {
|
||||
found = Some((remote_name.clone(), ident));
|
||||
break;
|
||||
}
|
||||
}
|
||||
if found.is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let (proto_name, proto_val) = found.ok_or(ProtocolChoiceError::NoProtocolFound)?;
|
||||
trace!("sending {:?}", proto_name);
|
||||
let sender = d.send(DialerToListenerMessage::ProtocolRequest {
|
||||
name: proto_name.clone(),
|
||||
});
|
||||
self.inner = DialerSelectParState::SendProtocol { sender, proto_name, proto_val };
|
||||
}
|
||||
DialerSelectParState::SendProtocol { mut sender, proto_name, proto_val } => {
|
||||
let dialer = match sender.poll()? {
|
||||
Async::Ready(d) => d,
|
||||
Async::NotReady => {
|
||||
self.inner = DialerSelectParState::SendProtocol {
|
||||
sender,
|
||||
proto_name,
|
||||
proto_val
|
||||
};
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
};
|
||||
let stream = dialer.into_future();
|
||||
self.inner = DialerSelectParState::AwaitProtocol {
|
||||
stream,
|
||||
proto_name,
|
||||
proto_val
|
||||
};
|
||||
}
|
||||
DialerSelectParState::AwaitProtocol { mut stream, proto_name, proto_val } => {
|
||||
let (m, r) = match stream.poll() {
|
||||
Ok(Async::Ready(x)) => x,
|
||||
Ok(Async::NotReady) => {
|
||||
self.inner = DialerSelectParState::AwaitProtocol {
|
||||
stream,
|
||||
proto_name,
|
||||
proto_val
|
||||
};
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
Err((e, _)) => return Err(ProtocolChoiceError::from(e))
|
||||
};
|
||||
trace!("received {:?}", m);
|
||||
match m {
|
||||
Some(ListenerToDialerMessage::ProtocolAck { ref name }) if name == &proto_name => {
|
||||
return Ok(Async::Ready((proto_val, r.into_inner())))
|
||||
}
|
||||
_ => return Err(ProtocolChoiceError::UnexpectedMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if found.is_some() {
|
||||
break;
|
||||
}
|
||||
DialerSelectParState::Undefined =>
|
||||
panic!("DialerSelectParState::poll called after completion")
|
||||
}
|
||||
|
||||
let (proto_name, proto_val) = found.ok_or(ProtocolChoiceError::NoProtocolFound)?;
|
||||
Ok((proto_name, proto_val, dialer))
|
||||
})
|
||||
.and_then(|(proto_name, proto_val, dialer)| {
|
||||
trace!("sending {:?}", proto_name);
|
||||
dialer
|
||||
.send(DialerToListenerMessage::ProtocolRequest {
|
||||
name: proto_name.clone(),
|
||||
})
|
||||
.from_err()
|
||||
.map(|dialer| (proto_name, proto_val, dialer))
|
||||
})
|
||||
.and_then(|(proto_name, proto_val, dialer)| {
|
||||
dialer
|
||||
.into_future()
|
||||
.map(|(msg, rest)| (proto_name, proto_val, msg, rest))
|
||||
.map_err(|(err, _)| err.into())
|
||||
})
|
||||
.and_then(|(proto_name, proto_val, msg, dialer)| {
|
||||
trace!("received {:?}", msg);
|
||||
match msg {
|
||||
Some(ListenerToDialerMessage::ProtocolAck { ref name }) if name == &proto_name => {
|
||||
Ok((proto_val, dialer.into_inner()))
|
||||
}
|
||||
_ => Err(ProtocolChoiceError::UnexpectedMessage),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user