diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index cf628c9d..06dea677 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -30,7 +30,7 @@ extern crate tokio_io; use bytes::BytesMut; use futures::{Future, Sink, Stream}; use std::env; -use swarm::{UpgradeExt, SimpleProtocol, Transport}; +use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; @@ -68,8 +68,10 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse() + .into_connection_reuse(); + let transport_with_echo = transport + .clone() // On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol // just for this example. // For this purpose, we create a `SimpleProtocol` struct. @@ -86,34 +88,23 @@ fn main() { // of any opened stream. // We use it to dial the address. - let dialer = transport - .dial_and_listen(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr")) + let dialer = transport_with_echo + .dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr")) // If the multiaddr protocol exists but is not supported, then we get an error containing // the transport and the original multiaddress. Therefore we cannot directly use `unwrap()` // or `expect()`, but have to add a `map_err()` beforehand. .map_err(|(_, addr)| addr).expect("unsupported multiaddr") - .and_then(|(incoming, echo)| { + .and_then(|echo| { // `echo` is what the closure used when initializing "echo" returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. println!("Sending \"hello world\" to listener"); - echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some)) - .select( - incoming - .for_each(|_| { - println!("opened"); - Ok(()) - }) - .map(|()| None), - ) - .map(|(n, _)| n) - .map_err(|(e, _)| e) + echo.send("hello world".into()) }) .and_then(|echo| { // The message has been successfully sent. Now wait for an answer. - echo.unwrap() - .into_future() + echo.into_future() .map(|(msg, rest)| { println!("Received message from listener: {:?}", msg); rest @@ -124,5 +115,6 @@ fn main() { // `dialer` is a future that contains all the behaviour that we want, but nothing has actually // started yet. Because we created the `TcpConfig` with tokio, we need to run the future // through the tokio core. - core.run(dialer).unwrap(); + core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(())))) + .unwrap_or_else(|_| panic!()); } diff --git a/libp2p-swarm/Cargo.toml b/libp2p-swarm/Cargo.toml index cf03e056..1439f839 100644 --- a/libp2p-swarm/Cargo.toml +++ b/libp2p-swarm/Cargo.toml @@ -8,6 +8,7 @@ bytes = "0.4" multiaddr = "0.2.0" multistream-select = { path = "../multistream-select" } futures = { version = "0.1", features = ["use_std"] } +parking_lot = "0.5.3" smallvec = "0.5" tokio-io = "0.1" diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 17df7bda..906e1886 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -37,20 +37,23 @@ //! //! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has //! already been opened earlier, and open an outgoing substream on it. If none is available, it -//! will dial the given multiaddress. +//! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with +//! us. In order to handle these new substreams you should use the `next_incoming` method of the +//! `MuxedTransport` trait. //! TODO: this raises several questions ^ //! //! TODO: this whole code is a dummy and should be rewritten after the design has been properly //! figured out. -use futures::{Async, Future, Poll, Stream}; -use futures::future::{IntoFuture, FutureResult}; +use futures::future::{self, IntoFuture, FutureResult}; +use futures::{stream, Async, Future, Poll, Stream, task}; use futures::stream::Fuse as StreamFuse; -use futures::stream; use multiaddr::Multiaddr; use muxing::StreamMuxer; +use parking_lot::Mutex; use smallvec::SmallVec; use std::io::Error as IoError; +use std::sync::Arc; use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode}; /// Allows reusing the same muxed connection multiple times. @@ -66,6 +69,14 @@ where { // Underlying transport and connection upgrade for when we need to dial or listen. inner: UpgradedNode, + shared: Arc>>, +} + +struct Shared { + // List of futures to dialed connections. + incoming: Vec>>>>>, + // Tasks to signal when an element is added to `incoming`. Only used when `incoming` is empty. + to_signal: Vec, } impl From> for ConnectionReuse @@ -75,7 +86,13 @@ where { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { - ConnectionReuse { inner: node } + ConnectionReuse { + inner: node, + shared: Arc::new(Mutex::new(Shared { + incoming: Vec::new(), + to_signal: Vec::new(), + })), + } } } @@ -96,7 +113,7 @@ where let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { Ok((l, a)) => (l, a), Err((inner, addr)) => { - return Err((ConnectionReuse { inner: inner }, addr)); + return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr)); } }; @@ -110,14 +127,30 @@ where } fn dial(self, addr: Multiaddr) -> Result { - let dial = match self.inner.dial(addr) { + let dial = match self.inner.dial(addr.clone()) { Ok(l) => l, Err((inner, addr)) => { - return Err((ConnectionReuse { inner: inner }, addr)); + return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr)); } }; - let future = dial.and_then(|dial| dial.outbound()); + let dial = dial + .map_err:: Mutex>, _>(|err| Mutex::new(Some(err))) + .shared(); + + let ingoing = dial.clone() + .map(|muxer| stream::repeat(muxer)) + .flatten_stream() + .map(move |muxer| ((&*muxer).clone(), addr.clone())); + + let mut lock = self.shared.lock(); + lock.incoming.push(Box::new(ingoing) as Box<_>); + for task in lock.to_signal.drain(..) { task.notify(); } + drop(lock); + + let future = dial + .map_err(|err| err.lock().take().expect("error can only be extracted once")) + .and_then(|dial| (&*dial).clone().outbound()); Ok(Box::new(future) as Box<_>) } } @@ -130,29 +163,15 @@ where C::Output: StreamMuxer + Clone, C::NamesIter: Clone, // TODO: not elegant { - type Incoming = stream::AndThen< - stream::Repeat, - fn(C::Output) - -> <>::Output as StreamMuxer>::InboundSubstream, - <>::Output as StreamMuxer>::InboundSubstream, - >; - type Outgoing = - <>::Output as StreamMuxer>::OutboundSubstream; - type DialAndListen = Box>; + type Incoming = Box::Substream, Multiaddr), Error = IoError>>; - fn dial_and_listen(self, addr: Multiaddr) -> Result { - self.inner - .dial(addr) - .map_err(|(inner, addr)| (ConnectionReuse { inner: inner }, addr)) - .map(|fut| { - fut.map(|muxer| { - ( - stream::repeat(muxer.clone()).and_then(StreamMuxer::inbound as fn(_) -> _), - muxer.outbound(), - ) - }) - }) - .map(|fut| Box::new(fut) as _) + #[inline] + fn next_incoming(self) -> Self::Incoming { + let future = ConnectionReuseIncoming { shared: self.shared.clone() } + .and_then(|(out, addr)| { + out.inbound().map(|o| (o, addr)) + }); + Box::new(future) as Box<_> } } @@ -253,3 +272,50 @@ where S: Stream, Ok(Async::NotReady) } } + +/// Implementation of `Future { + shared: Arc>>, +} + +impl Future for ConnectionReuseIncoming + where O: Clone +{ + type Item = (O, Multiaddr); + type Error = IoError; + + fn poll(&mut self) -> Poll { + let mut lock = self.shared.lock(); + + let mut to_remove = SmallVec::<[_; 8]>::new(); + let mut ret_value = None; + + for (offset, future) in lock.incoming.iter_mut().enumerate() { + match future.poll() { + Ok(Async::Ready(Some((value, addr)))) => { + ret_value = Some((value.clone(), addr)); + break; + }, + Ok(Async::Ready(None)) => { + to_remove.push(offset); + }, + Ok(Async::NotReady) => {}, + Err(_) => { + to_remove.push(offset); + }, + } + } + + for offset in to_remove.into_iter().rev() { + lock.incoming.swap_remove(offset); + } + + if let Some(ret_value) = ret_value { + Ok(Async::Ready(ret_value)) + } else { + lock.to_signal.push(task::current()); + Ok(Async::NotReady) + } + } +} diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index 17850a5a..59e1fb6d 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -48,11 +48,12 @@ //! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on //! transports that can receive incoming connections on streams that have been opened with `dial()`. //! -//! The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of -//! incoming connections. +//! The trait provides the `next_incoming()` method, which returns a future that will resolve to +//! the next substream that arrives from a dialed node. //! //! > **Note**: This trait is mainly implemented for transports that provide stream muxing -//! > capabilities. +//! > capabilities, but it can also be implemented in a dummy way by returning an empty +//! > iterator. //! //! # Connection upgrades //! @@ -78,7 +79,7 @@ //! `Transport` trait. The return value of this method also implements the `Transport` trait, which //! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an //! upgraded connection or a listener that will yield upgraded connections. Similarly, the -//! `dial_and_listen()` method will automatically apply the upgrade on both the dialer and the +//! `next_incoming()` method will automatically apply the upgrade on both the dialer and the //! listener. An error is produced if the remote doesn't support the protocol corresponding to the //! connection upgrade. //! @@ -123,7 +124,7 @@ //! transport. //! //! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named -//! `dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`, +//! `dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`, //! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific //! way to use the protocol. //! @@ -167,6 +168,7 @@ extern crate bytes; #[macro_use] extern crate futures; extern crate multistream_select; +extern crate parking_lot; extern crate smallvec; extern crate tokio_io; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index a18bb9d8..2e959eb8 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -31,7 +31,7 @@ use bytes::Bytes; use connection_reuse::ConnectionReuse; -use futures::{Async, Poll, Stream}; +use futures::{Async, Poll, stream, Stream}; use futures::future::{self, FromErr, Future, FutureResult, IntoFuture}; use multiaddr::Multiaddr; use multistream_select; @@ -123,21 +123,24 @@ pub trait Transport { /// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which /// the dialed node can dial you back. pub trait MuxedTransport: Transport { - /// Produces substreams on the dialed connection. - type Incoming: Stream; + /// Future resolving to an incoming connection. + type Incoming: Future; - /// Future resolving to an outgoing connection - type Outgoing: Future; + /// Returns the next incoming substream opened by a node that we dialed ourselves. + /// + /// > **Note**: Doesn't produce incoming substreams coming from addresses we are listening on. + /// > This only concerns nodes that we dialed with `dial()`. + fn next_incoming(self) -> Self::Incoming + where Self: Sized; - /// Future resolving to a tuple of `(Incoming, Outgoing)` - type DialAndListen: Future; - - /// Dial to the given multi-addr, and listen to incoming substreams on the dialed connection. - /// - /// Returns either a future which may resolve to a connection, or gives back the multiaddress. - fn dial_and_listen(self, addr: Multiaddr) -> Result - where - Self: Sized; + /// Returns a stream of incoming connections. + #[inline] + fn incoming(self) -> stream::AndThen, fn(Self) -> Self::Incoming, + Self::Incoming> + where Self: Sized + Clone + { + stream::repeat(self).and_then(|me| me.next_incoming()) + } } /// Dummy implementation of `Transport` that just denies every single attempt. @@ -163,14 +166,11 @@ impl Transport for DeniedTransport { } impl MuxedTransport for DeniedTransport { - // TODO: could use `!` once stable - type Incoming = Box>; - type Outgoing = Box>; - type DialAndListen = Box>; + type Incoming = future::Empty<(Self::RawConn, Multiaddr), IoError>; #[inline] - fn dial_and_listen(self, addr: Multiaddr) -> Result { - Err((DeniedTransport, addr)) + fn next_incoming(self) -> Self::Incoming { + future::empty() } } @@ -251,38 +251,19 @@ impl MuxedTransport for OrTransport where A: MuxedTransport, B: MuxedTransport, - A::DialAndListen: 'static, - B::DialAndListen: 'static, + A::Incoming: 'static, // TODO: meh :-/ + B::Incoming: 'static, // TODO: meh :-/ { - type Incoming = EitherIncomingStream; - type Outgoing = future::Either< - future::Map Self::RawConn>, - future::Map Self::RawConn>, - >; - type DialAndListen = Box>; + type Incoming = Box, Multiaddr), Error = IoError>>; - fn dial_and_listen(self, addr: Multiaddr) -> Result { - let (first, addr) = match self.0.dial_and_listen(addr) { - Ok(connec) => { - return Ok(Box::new(connec.map(|(inc, out)| { - ( - EitherIncomingStream::First(inc), - future::Either::A(out.map(EitherSocket::First as fn(_) -> _)), - ) - }))); - } - Err(err) => err, - }; - - match self.1.dial_and_listen(addr) { - Ok(connec) => Ok(Box::new(connec.map(|(inc, out)| { - ( - EitherIncomingStream::Second(inc), - future::Either::B(out.map(EitherSocket::Second as fn(_) -> _)), - ) - }))), - Err((second, addr)) => Err((OrTransport(first, second), addr)), - } + #[inline] + fn next_incoming(self) -> Self::Incoming { + let first = self.0.next_incoming().map(|(out, addr)| (EitherSocket::First(out), addr)); + let second = self.1.next_incoming().map(|(out, addr)| (EitherSocket::Second(out), addr)); + let future = first.select(second) + .map(|(i, _)| i) + .map_err(|(e, _)| e); + Box::new(future) as Box<_> } } @@ -765,93 +746,31 @@ where Ok(Box::new(future)) } - /// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade - /// the connection. Also listens to incoming substream requires on that dialed connection, and - /// automatically upgrades the incoming substreams. - /// - /// Note that this does the same as `MuxedTransport::dial_and_listen`, but with less - /// restrictions on the trait requirements. - pub fn dial_and_listen( - self, - addr: Multiaddr, - ) -> Result< - Box< - Future< - Item = ( - Box + 'a>, - Box + 'a>, - ), - Error = IoError, - > - + 'a, - >, - (Self, Multiaddr), - > - where - T: MuxedTransport, - C::NamesIter: Clone, // TODO: not elegant - C: Clone, + /// If the underlying transport is a `MuxedTransport`, then after calling `dial` we may receive + /// substreams opened by the dialed nodes. + /// + /// This function returns the next incoming substream. You are strongly encouraged to call it + /// if you have a muxed transport. + pub fn next_incoming(self) -> Box + 'a> + where T: MuxedTransport { let upgrade = self.upgrade; - let upgrade2 = upgrade.clone(); - self.transports - .dial_and_listen(addr) - .map_err(move |(trans, addr)| { - let builder = UpgradedNode { - transports: trans, - upgrade: upgrade, - }; + let future = self.transports.next_incoming() + // Try to negotiate the protocol. + .and_then(move |(connection, addr)| { + let iter = upgrade.protocol_names() + .map(|(name, id)| (name, ::eq, id)); + let negotiated = multistream_select::dialer_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)); + negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) + }) + .and_then(|(upgrade_id, connection, upgrade, addr)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) + .map(|u| (u, addr)) + }); - (builder, addr) - }) - .map(move |dialed_fut| { - let dialed_fut = dialed_fut - // Try to negotiate the protocol. - .map(move |(in_stream, dialer)| { - let upgrade = upgrade2.clone(); - - let dialer = { - let iter = upgrade2.protocol_names() - .map(|(name, id)| (name, ::eq, id)); - let negotiated = dialer.and_then(|dialer| { - multistream_select::dialer_select_proto(dialer, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - }); - negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2)) - } - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) - }); - - let in_stream = in_stream - // Try to negotiate the protocol. - .and_then(move |connection| { - let upgrade = upgrade.clone(); - - let iter = upgrade.protocol_names() - .map((|(n, t)| { - (n, ::eq, t) - }) as fn(_) -> _); - let negotiated = multistream_select::listener_select_proto( - connection, - iter, - ); - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade)) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - }) - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) - }); - - ( - Box::new(in_stream) as Box>, - Box::new(dialer) as Box>, - ) - }); - - Box::new(dialed_fut) as _ - }) + Box::new(future) as Box<_> } /// Start listening on the multiaddr using the transport that was passed to `new`. @@ -944,13 +863,10 @@ where C::NamesIter: Clone, // TODO: not elegant C: Clone, { - type Incoming = Box>; - type Outgoing = Box>; - type DialAndListen = Box>; + type Incoming = Box>; #[inline] - fn dial_and_listen(self, addr: Multiaddr) -> Result { - // Calls an inherent function above - self.dial_and_listen(addr) + fn next_incoming(self) -> Self::Incoming { + self.next_incoming() } }