Add some Send implementations to Futures (#412)

This commit is contained in:
Pierre Krieger
2018-08-14 15:23:30 +02:00
committed by Benjamin Kampmann
parent e5afab104a
commit bd73f60358
8 changed files with 64 additions and 95 deletions

View File

@ -32,10 +32,10 @@ pub struct DeniedTransport;
impl Transport for DeniedTransport { impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable // TODO: could use `!` for associated types once stable
type Output = Cursor<Vec<u8>>; type Output = Cursor<Vec<u8>>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error>>; type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error> + Send + Sync>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error>>; type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error> + Send + Sync>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error>>; type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error> + Send + Sync>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error>>; type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error> + Send + Sync>;
#[inline] #[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use bytes::Bytes; use bytes::Bytes;
use futures::prelude::*; use futures::{prelude::*, future};
use multistream_select; use multistream_select;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -29,19 +29,18 @@ use upgrade::{ConnectionUpgrade, Endpoint};
/// ///
/// Returns a `Future` that returns the outcome of the connection upgrade. /// Returns a `Future` that returns the outcome of the connection upgrade.
#[inline] #[inline]
pub fn apply<'a, C, U, Maf>( pub fn apply<C, U, Maf>(
connection: C, connection: C,
upgrade: U, upgrade: U,
endpoint: Endpoint, endpoint: Endpoint,
remote_addr: Maf, remote_addr: Maf,
) -> Box<Future<Item = (U::Output, U::MultiaddrFuture), Error = IoError> + 'a> ) -> impl Future<Item = (U::Output, U::MultiaddrFuture), Error = IoError>
where where
U: ConnectionUpgrade<C, Maf> + 'a, U: ConnectionUpgrade<C, Maf>,
U::NamesIter: Clone, // TODO: not elegant U::NamesIter: Clone, // TODO: not elegant
C: AsyncRead + AsyncWrite + 'a, C: AsyncRead + AsyncWrite,
Maf: 'a,
{ {
let future = negotiate(connection, &upgrade, endpoint) negotiate(connection, &upgrade, endpoint)
.and_then(move |(upgrade_id, connection)| { .and_then(move |(upgrade_id, connection)| {
upgrade.upgrade(connection, upgrade_id, endpoint, remote_addr) upgrade.upgrade(connection, upgrade_id, endpoint, remote_addr)
}) })
@ -49,29 +48,25 @@ where
.then(|val| { .then(|val| {
match val { match val {
Ok(_) => debug!("Successfully applied negotiated protocol"), Ok(_) => debug!("Successfully applied negotiated protocol"),
Err(_) => debug!("Failed to apply negotiated protocol"), Err(ref err) => debug!("Failed to apply negotiated protocol: {:?}", err),
} }
val val
}); })
Box::new(future)
} }
/// Negotiates a protocol on a stream. /// Negotiates a protocol on a stream.
/// ///
/// Returns a `Future` that returns the negotiated protocol and the stream. /// Returns a `Future` that returns the negotiated protocol and the stream.
#[inline] #[inline]
pub fn negotiate<'a, C, I, U, Maf>( pub fn negotiate<C, I, U, Maf>(
connection: C, connection: C,
upgrade: &U, upgrade: &U,
endpoint: Endpoint, endpoint: Endpoint,
) -> Box<Future<Item = (U::UpgradeIdentifier, C), Error = IoError> + 'a> ) -> impl Future<Item = (U::UpgradeIdentifier, C), Error = IoError>
where where
U: ConnectionUpgrade<I, Maf> + 'a, U: ConnectionUpgrade<I, Maf>,
U::NamesIter: Clone, // TODO: not elegant U::NamesIter: Clone, // TODO: not elegant
C: AsyncRead + AsyncWrite + 'a, C: AsyncRead + AsyncWrite,
Maf: 'a,
I: 'a,
{ {
let iter = upgrade let iter = upgrade
.protocol_names() .protocol_names()
@ -79,11 +74,11 @@ where
debug!("Starting protocol negotiation"); debug!("Starting protocol negotiation");
let negotiation = match endpoint { let negotiation = match endpoint {
Endpoint::Listener => multistream_select::listener_select_proto(connection, iter), Endpoint::Listener => future::Either::A(multistream_select::listener_select_proto(connection, iter)),
Endpoint::Dialer => multistream_select::dialer_select_proto(connection, iter), Endpoint::Dialer => future::Either::B(multistream_select::dialer_select_proto(connection, iter)),
}; };
let future = negotiation negotiation
.map_err(|err| IoError::new(IoErrorKind::Other, err)) .map_err(|err| IoError::new(IoErrorKind::Other, err))
.then(move |negotiated| { .then(move |negotiated| {
match negotiated { match negotiated {
@ -91,7 +86,5 @@ where
Err(ref err) => debug!("Error while negotiated protocol upgrade: {:?}", err), Err(ref err) => debug!("Error while negotiated protocol upgrade: {:?}", err),
}; };
negotiated negotiated
}); })
Box::new(future)
} }

View File

@ -36,8 +36,8 @@ where
type NamesIter = iter::Empty<(Bytes, ())>; type NamesIter = iter::Empty<(Bytes, ())>;
type UpgradeIdentifier = (); // TODO: could use `!` type UpgradeIdentifier = (); // TODO: could use `!`
type Output = (); // TODO: could use `!` type Output = (); // TODO: could use `!`
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error>>; // TODO: could use `!` type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error> + Send + Sync>; // TODO: could use `!`
type Future = Box<Future<Item = ((), Self::MultiaddrFuture), Error = io::Error>>; // TODO: could use `!` type Future = Box<Future<Item = ((), Self::MultiaddrFuture), Error = io::Error> + Send + Sync>; // TODO: could use `!`
#[inline] #[inline]
fn protocol_names(&self) -> Self::NamesIter { fn protocol_names(&self) -> Self::NamesIter {

View File

@ -22,7 +22,7 @@
//! `multistream-select` for the dialer. //! `multistream-select` for the dialer.
use bytes::Bytes; use bytes::Bytes;
use futures::future::{loop_fn, result, Loop}; use futures::future::{loop_fn, result, Loop, Either};
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use ProtocolChoiceError; use ProtocolChoiceError;
@ -42,23 +42,23 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// remote, and the protocol name that we passed (so that you don't have to clone the name). On /// remote, and the protocol name that we passed (so that you don't have to clone the name). On
/// success, the function returns the identifier (of type `P`), plus the socket which now uses that /// success, the function returns the identifier (of type `P`), plus the socket which now uses that
/// chosen protocol. /// chosen protocol.
// TODO: remove the Box once -> impl Trait lands
#[inline] #[inline]
pub fn dialer_select_proto<'a, R, I, M, P>( pub fn dialer_select_proto<R, I, M, P>(
inner: R, inner: R,
protocols: I, protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a> ) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where where
R: AsyncRead + AsyncWrite + 'a, R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, M, P)> + 'a, I: Iterator<Item = (Bytes, M, P)>,
M: FnMut(&Bytes, &Bytes) -> bool + 'a, M: FnMut(&Bytes, &Bytes) -> bool,
P: 'a,
{ {
// We choose between the "serial" and "parallel" strategies based on the number of protocols. // We choose between the "serial" and "parallel" strategies based on the number of protocols.
if protocols.size_hint().1.map(|n| n <= 3).unwrap_or(false) { if protocols.size_hint().1.map(|n| n <= 3).unwrap_or(false) {
dialer_select_proto_serial(inner, protocols.map(|(n, _, id)| (n, id))) let fut = dialer_select_proto_serial(inner, protocols.map(|(n, _, id)| (n, id)));
Either::A(fut)
} else { } else {
dialer_select_proto_parallel(inner, protocols) let fut = dialer_select_proto_parallel(inner, protocols);
Either::B(fut)
} }
} }
@ -66,17 +66,15 @@ where
/// ///
/// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce /// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce
/// match functions, because it's not needed. /// match functions, because it's not needed.
// TODO: remove the Box once -> impl Trait lands pub fn dialer_select_proto_serial<R, I, P>(
pub fn dialer_select_proto_serial<'a, R, I, P>(
inner: R, inner: R,
mut protocols: I, mut protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a> ) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where where
R: AsyncRead + AsyncWrite + 'a, R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, P)> + 'a, I: Iterator<Item = (Bytes, P)>,
P: 'a,
{ {
let future = Dialer::new(inner).from_err().and_then(move |dialer| { Dialer::new(inner).from_err().and_then(move |dialer| {
// Similar to a `loop` keyword. // Similar to a `loop` keyword.
loop_fn(dialer, move |dialer| { loop_fn(dialer, move |dialer| {
result(protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound)) result(protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound))
@ -116,28 +114,23 @@ where
} }
}) })
}) })
}); })
// The "Rust doesn't have impl Trait yet" tax.
Box::new(future)
} }
/// Helps selecting a protocol amongst the ones supported. /// Helps selecting a protocol amongst the ones supported.
/// ///
/// Same as `dialer_select_proto`. Queries the list of supported protocols from the remote, then /// Same as `dialer_select_proto`. Queries the list of supported protocols from the remote, then
/// chooses the most appropriate one. /// chooses the most appropriate one.
// TODO: remove the Box once -> impl Trait lands pub fn dialer_select_proto_parallel<R, I, M, P>(
pub fn dialer_select_proto_parallel<'a, R, I, M, P>(
inner: R, inner: R,
protocols: I, protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a> ) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where where
R: AsyncRead + AsyncWrite + 'a, R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, M, P)> + 'a, I: Iterator<Item = (Bytes, M, P)>,
M: FnMut(&Bytes, &Bytes) -> bool + 'a, M: FnMut(&Bytes, &Bytes) -> bool,
P: 'a,
{ {
let future = Dialer::new(inner) Dialer::new(inner)
.from_err() .from_err()
.and_then(move |dialer| { .and_then(move |dialer| {
trace!("requesting protocols list"); trace!("requesting protocols list");
@ -193,8 +186,5 @@ where
} }
_ => Err(ProtocolChoiceError::UnexpectedMessage), _ => Err(ProtocolChoiceError::UnexpectedMessage),
} }
}); })
// The "Rust doesn't have impl Trait yet" tax.
Box::new(future)
} }

View File

@ -22,7 +22,7 @@
//! `multistream-select` for the listener. //! `multistream-select` for the listener.
use bytes::Bytes; use bytes::Bytes;
use futures::future::{err, loop_fn, Loop}; use futures::future::{err, loop_fn, Loop, Either};
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use ProtocolChoiceError; use ProtocolChoiceError;
@ -45,18 +45,16 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// ///
/// On success, returns the socket and the identifier of the chosen protocol (of type `P`). The /// On success, returns the socket and the identifier of the chosen protocol (of type `P`). The
/// socket now uses this protocol. /// socket now uses this protocol.
// TODO: remove the Box once -> impl Trait lands pub fn listener_select_proto<R, I, M, P>(
pub fn listener_select_proto<'a, R, I, M, P>(
inner: R, inner: R,
protocols: I, protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a> ) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where where
R: AsyncRead + AsyncWrite + 'a, R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, M, P)> + Clone + 'a, I: Iterator<Item = (Bytes, M, P)> + Clone,
M: FnMut(&Bytes, &Bytes) -> bool + 'a, M: FnMut(&Bytes, &Bytes) -> bool,
P: 'a,
{ {
let future = Listener::new(inner).from_err().and_then(move |listener| { Listener::new(inner).from_err().and_then(move |listener| {
loop_fn(listener, move |listener| { loop_fn(listener, move |listener| {
let protocols = protocols.clone(); let protocols = protocols.clone();
@ -73,7 +71,7 @@ where
.send(msg) .send(msg)
.from_err() .from_err()
.map(move |listener| (None, listener)); .map(move |listener| (None, listener));
Box::new(fut) as Box<Future<Item = _, Error = ProtocolChoiceError>> Either::A(Either::A(fut))
} }
Some(DialerToListenerMessage::ProtocolRequest { name }) => { Some(DialerToListenerMessage::ProtocolRequest { name }) => {
let mut outcome = None; let mut outcome = None;
@ -91,11 +89,11 @@ where
.send(send_back) .send(send_back)
.from_err() .from_err()
.map(move |listener| (outcome, listener)); .map(move |listener| (outcome, listener));
Box::new(fut) as Box<Future<Item = _, Error = ProtocolChoiceError>> Either::A(Either::B(fut))
} }
None => { None => {
debug!("no protocol request received"); debug!("no protocol request received");
Box::new(err(ProtocolChoiceError::NoProtocolFound)) as Box<_> Either::B(err(ProtocolChoiceError::NoProtocolFound))
} }
}) })
.map(|(outcome, listener): (_, Listener<R>)| match outcome { .map(|(outcome, listener): (_, Listener<R>)| match outcome {
@ -103,8 +101,5 @@ where
None => Loop::Continue(listener), None => Loop::Continue(listener),
}) })
}) })
}); })
// The "Rust doesn't have impl Trait yet" tax.
Box::new(future)
} }

View File

@ -46,23 +46,19 @@ where
{ {
/// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the /// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the
/// future returns a `Dialer`. /// future returns a `Dialer`.
pub fn new<'a>(inner: R) -> Box<Future<Item = Dialer<R>, Error = MultistreamSelectError> + 'a> pub fn new(inner: R) -> impl Future<Item = Dialer<R>, Error = MultistreamSelectError> {
where
R: 'a,
{
let write = LengthDelimitedBuilder::new() let write = LengthDelimitedBuilder::new()
.length_field_length(1) .length_field_length(1)
.new_write(inner); .new_write(inner);
let inner = LengthDelimitedFramedRead::new(write); let inner = LengthDelimitedFramedRead::new(write);
let future = inner inner
.send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF)) .send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF))
.from_err() .from_err()
.map(|inner| Dialer { .map(|inner| Dialer {
inner, inner,
handshake_finished: false, handshake_finished: false,
}); })
Box::new(future)
} }
/// Grants back the socket. Typically used after a `ProtocolAck` has been received. /// Grants back the socket. Typically used after a `ProtocolAck` has been received.

View File

@ -44,16 +44,13 @@ where
{ {
/// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the /// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the
/// future returns a `Listener`. /// future returns a `Listener`.
pub fn new<'a>(inner: R) -> Box<Future<Item = Listener<R>, Error = MultistreamSelectError> + 'a> pub fn new(inner: R) -> impl Future<Item = Listener<R>, Error = MultistreamSelectError> {
where
R: 'a,
{
let write = LengthDelimitedBuilder::new() let write = LengthDelimitedBuilder::new()
.length_field_length(1) .length_field_length(1)
.new_write(inner); .new_write(inner);
let inner = LengthDelimitedFramedRead::<Bytes, _>::new(write); let inner = LengthDelimitedFramedRead::<Bytes, _>::new(write);
let future = inner inner
.into_future() .into_future()
.map_err(|(e, _)| e.into()) .map_err(|(e, _)| e.into())
.and_then(|(msg, rest)| { .and_then(|(msg, rest)| {
@ -69,9 +66,7 @@ where
.send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF)) .send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF))
.from_err() .from_err()
}) })
.map(|inner| Listener { inner }); .map(|inner| Listener { inner })
Box::new(future)
} }
/// Grants back the socket. Typically used after a `ProtocolRequest` has been received and a /// Grants back the socket. Typically used after a `ProtocolRequest` has been received and a

View File

@ -86,10 +86,10 @@ impl UdsConfig {
impl Transport for UdsConfig { impl Transport for UdsConfig {
type Output = UnixStream; type Output = UnixStream;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>; type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send + Sync>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type MultiaddrFuture = FutureResult<Multiaddr, IoError>; type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
type Dial = Box<Future<Item = (UnixStream, Self::MultiaddrFuture), Error = IoError>>; type Dial = Box<Future<Item = (UnixStream, Self::MultiaddrFuture), Error = IoError> + Send + Sync>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
if let Ok(path) = multiaddr_to_path(&addr) { if let Ok(path) = multiaddr_to_path(&addr) {