diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 9f803e58..4334bef0 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -95,62 +95,49 @@ fn main() { println!("Now listening on {:?}", address); let future = listener - .filter_map(|(socket, client_addr)| { - let client_addr = client_addr.to_string(); - - // This closure is called whenever a new connection has been received. The `socket` - // is a `Result<..., IoError>` which contains an error if for example protocol - // negotiation or the secio handshake failed. We handle this situation by printing a - // message on stderr and ignoring the connection. - match socket { - Ok(s) => Some((s, client_addr)), - Err(err) => { - eprintln!("Failed connection attempt from {}\n => Error: {:?}", - client_addr, err); - None - }, - } - }) - .for_each(|(socket, client_addr)| { - // This closure is called whenever a new connection has been received and successfully - // upgraded to use secio/plaintext and echo. - println!("Successfully negotiated protocol with {}", client_addr); + // This closure is called whenever a new connection has been received. + // `socket` is a future that will be triggered once the upgrade to secio, multiplex + // and echo is complete. + let client_addr = client_addr.to_string(); + println!("Incoming connection from {}", client_addr); - // We loop forever in order to handle all the messages sent by the client. - let client_finished = { - let client_addr = client_addr.clone(); - loop_fn(socket, move |socket| { - let client_addr = client_addr.clone(); - socket.into_future() - .map_err(|(err, _)| err) - .and_then(move |(msg, rest)| { - if let Some(msg) = msg { - // One message has been received. We send it back to the client. - println!("Received a message from {}: {:?}\n => Sending back \ - identical message to remote", client_addr, msg); - Box::new(rest.send(msg).map(|m| Loop::Continue(m))) - as Box> - } else { - // End of stream. Connection closed. Breaking the loop. - println!("Received EOF from {}\n => Dropping connection", - client_addr); - Box::new(Ok(Loop::Break(())).into_future()) - as Box> - } - }) + socket + .and_then(move |socket| { + println!("Successfully negotiated protocol with {}", client_addr); + + // We loop forever in order to handle all the messages sent by the client. + loop_fn(socket, move |socket| { + let client_addr = client_addr.clone(); + socket.into_future() + .map_err(|(err, _)| err) + .and_then(move |(msg, rest)| { + if let Some(msg) = msg { + // One message has been received. We send it back to the client. + println!("Received a message from {}: {:?}\n => Sending back \ + identical message to remote", client_addr, msg); + Box::new(rest.send(msg).map(|m| Loop::Continue(m))) + as Box> + } else { + // End of stream. Connection closed. Breaking the loop. + println!("Received EOF from {}\n => Dropping connection", + client_addr); + Box::new(Ok(Loop::Break(())).into_future()) + as Box> + } + }) + }) }) - }; - // We absorb errors from the `client_finished` future so that an error while processing - // a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the - // entire server. - client_finished.then(move |res| { - if let Err(err) = res { - println!("Error while processing client {}: {:?}", client_addr, err); - } - Ok(()) - }) + // We absorb errors from the future so that an error while processing a client + // (eg. if the client unexpectedly disconnects) doesn't propagate and stop the + // entire server. + .then(move |res| { + if let Err(err) = res { + println!("Error while processing client: {:?}", err); + } + Ok(()) + }) }); // `future` is a future that contains all the behaviour that we want, but nothing has actually diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 60037db0..17df7bda 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -44,6 +44,7 @@ //! figured out. use futures::{Async, Future, Poll, Stream}; +use futures::future::{IntoFuture, FutureResult}; use futures::stream::Fuse as StreamFuse; use futures::stream; use multiaddr::Multiaddr; @@ -87,15 +88,8 @@ where C::NamesIter: Clone, // TODO: not elegant { type RawConn = ::Substream; - type Listener = ConnectionReuseListener< - Box< - Stream< - Item = (Result, Multiaddr), - Error = IoError, - >, - >, - C::Output, - >; + type Listener = Box>; + type ListenerUpgrade = FutureResult; type Dial = Box>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -108,10 +102,11 @@ where let listener = ConnectionReuseListener { listener: listener.fuse(), + current_upgrades: Vec::new(), connections: Vec::new(), }; - Ok((listener, new_addr)) + Ok((Box::new(listener) as Box<_>, new_addr)) } fn dial(self, addr: Multiaddr) -> Result { @@ -163,34 +158,29 @@ where /// Implementation of `Stream +pub struct ConnectionReuseListener where - S: Stream, Multiaddr), Error = IoError>, - M: StreamMuxer + S: Stream, + F: Future, + M: StreamMuxer, { listener: StreamFuse, + current_upgrades: Vec<(F, Multiaddr)>, connections: Vec<(M, ::InboundSubstream, Multiaddr)>, } -impl Stream for ConnectionReuseListener -where S: Stream, Multiaddr), Error = IoError>, +impl Stream for ConnectionReuseListener +where S: Stream, + F: Future, M: StreamMuxer + Clone + 'static // TODO: 'static :( { - type Item = (Result, Multiaddr); + type Item = (FutureResult, Multiaddr); type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { match self.listener.poll() { Ok(Async::Ready(Some((upgrade, client_addr)))) => { - match upgrade { - Ok(upgrade) => { - let next_incoming = upgrade.clone().inbound(); - self.connections.push((upgrade, next_incoming, client_addr)); - }, - Err(err) => { - return Ok(Async::Ready(Some((Err(err), client_addr)))); - }, - } + self.current_upgrades.push((upgrade, client_addr)); } Ok(Async::NotReady) => (), Ok(Async::Ready(None)) => { @@ -204,11 +194,41 @@ where S: Stream, Multiaddr), Error = IoError>, } } }; - + // Most of the time, this array will contain 0 or 1 elements, but sometimes it may contain // more and we don't want to panic if that happens. With 8 elements, we can be pretty // confident that this is never going to spill into a `Vec`. - let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new(); + let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new(); + let mut early_ret = None; + + for (index, &mut (ref mut current_upgrade, ref mut client_addr)) in + self.current_upgrades.iter_mut().enumerate() + { + match current_upgrade.poll() { + Ok(Async::Ready(muxer)) => { + let next_incoming = muxer.clone().inbound(); + self.connections.push((muxer, next_incoming, client_addr.clone())); + upgrades_to_drop.push(index); + }, + Ok(Async::NotReady) => {}, + Err(err) => { + upgrades_to_drop.push(index); + early_ret = Some(Async::Ready(Some((Err(err).into_future(), client_addr.clone())))); + }, + } + } + + for &index in upgrades_to_drop.iter().rev() { + self.current_upgrades.swap_remove(index); + } + + if let Some(early_ret) = early_ret { + return Ok(early_ret); + } + + // We reuse `upgrades_to_drop`. + upgrades_to_drop.clear(); + let mut connections_to_drop = upgrades_to_drop; for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in self.connections.iter_mut().enumerate() @@ -217,7 +237,7 @@ where S: Stream, Multiaddr), Error = IoError>, Ok(Async::Ready(incoming)) => { let mut new_next = muxer.clone().inbound(); *next_incoming = new_next; - return Ok(Async::Ready(Some((Ok(incoming), client_addr.clone())))); + return Ok(Async::Ready(Some((Ok(incoming).into_future(), client_addr.clone())))); } Ok(Async::NotReady) => {} Err(_) => { diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 2c513600..a18bb9d8 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -26,8 +26,8 @@ //! encryption middleware to the connection). //! //! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and -//! `UpgradeNode::or_upgrade` methods, you can combine multiple transports and/or upgrades together -//! in a complex chain of protocols negotiation. +//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades +//! together in a complex chain of protocols negotiation. use bytes::Bytes; use connection_reuse::ConnectionReuse; @@ -56,7 +56,16 @@ pub trait Transport { type RawConn: AsyncRead + AsyncWrite; /// The listener produces incoming connections. - type Listener: Stream, Multiaddr), Error = IoError>; + /// + /// An item should be produced whenever a connection is received at the lowest level of the + /// transport stack. The item is a `Future` that is signalled once some pre-processing has + /// taken place, and that connection has been upgraded to the wanted protocols. + type Listener: Stream; + + /// After a connection has been received, we may need to do some asynchronous pre-processing + /// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we + /// want to be able to continue polling on the listener. + type ListenerUpgrade: Future; /// A future which indicates that we are currently dialing to a peer. type Dial: IntoFuture; @@ -138,7 +147,8 @@ pub struct DeniedTransport; impl Transport for DeniedTransport { // TODO: could use `!` for associated types once stable type RawConn = Cursor>; - type Listener = Box, Multiaddr), Error = IoError>>; + type Listener = Box>; + type ListenerUpgrade = Box>; type Dial = Box>; #[inline] @@ -175,6 +185,7 @@ where { type RawConn = EitherSocket; type Listener = EitherListenStream; + type ListenerUpgrade = EitherTransportFuture; type Dial = EitherTransportFuture<::Future, ::Future>; @@ -308,19 +319,19 @@ pub enum EitherListenStream { impl Stream for EitherListenStream where - AStream: Stream, Multiaddr), Error = IoError>, - BStream: Stream, Multiaddr), Error = IoError>, + AStream: Stream, + BStream: Stream, { - type Item = (Result, IoError>, Multiaddr); + type Item = (EitherTransportFuture, Multiaddr); type Error = IoError; #[inline] fn poll(&mut self) -> Poll, Self::Error> { match self { &mut EitherListenStream::First(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::First), a)))), + .map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::First(s), a)))), &mut EitherListenStream::Second(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::Second), a)))), + .map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::Second(s), a)))), } } } @@ -853,7 +864,7 @@ where self, addr: Multiaddr, ) -> Result< - (Box, Multiaddr), Error = IoError> + 'a>, Multiaddr), + (Box + 'a>, Multiaddr), Error = IoError> + 'a>, Multiaddr), (Self, Multiaddr), > where @@ -879,30 +890,22 @@ where // Instead the `stream` will produce `Ok(Err(...))`. // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. let stream = listening_stream - // Try to negotiate the protocol - .and_then(move |(connection, client_addr)| { - // Turn the `Result` into - // a `Result, IoError>` - let connection = connection.map(|connection| { - let upgrade = upgrade.clone(); - let iter = upgrade.protocol_names() - .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); - multistream_select::listener_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .and_then(|(upgrade_id, connection)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) - }) - }); + .map(move |(connection, client_addr)| { + let upgrade = upgrade.clone(); + let connection = connection + // Try to negotiate the protocol + .and_then(move |connection| { + let iter = upgrade.protocol_names() + .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); + multistream_select::listener_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .and_then(|(upgrade_id, connection)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) + }) + .into_future() + }); - connection - .into_future() - .flatten() - .then(move |nego_res| { - match nego_res { - Ok(upgraded) => Ok((Ok(upgraded), client_addr)), - Err(err) => Ok((Err(err), client_addr)), - } - }) + (Box::new(connection) as Box<_>, client_addr) }); Ok((Box::new(stream), new_addr)) @@ -918,7 +921,8 @@ where C: Clone, { type RawConn = C::Output; - type Listener = Box, Multiaddr), Error = IoError>>; + type Listener = Box>; + type ListenerUpgrade = Box>; type Dial = Box>; #[inline] diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index c20dbf7c..cc311769 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -59,7 +59,7 @@ use std::io::Error as IoError; use std::net::SocketAddr; use tokio_core::reactor::Handle; use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew}; -use futures::Future; +use futures::future::{self, Future, FutureResult, IntoFuture}; use futures::stream::Stream; use multiaddr::{Multiaddr, AddrComponent, ToMultiaddr}; use swarm::Transport; @@ -84,13 +84,9 @@ impl TcpConfig { } impl Transport for TcpConfig { - /// The raw connection. type RawConn = TcpStream; - - /// The listener produces incoming connections. - type Listener = Box, Multiaddr), Error = IoError>>; - - /// A future which indicates currently dialing to a peer. + type Listener = Box>; + type ListenerUpgrade = FutureResult; type Dial = TcpStreamNew; /// Listen on the given multi-addr. @@ -109,12 +105,13 @@ impl Transport for TcpConfig { } Err(_) => addr, }; - let future = futures::future::result(listener).map(|listener| { + + let future = future::result(listener).map(|listener| { // Pull out a stream of sockets for incoming connections listener.incoming().map(|(sock, addr)| { let addr = addr.to_multiaddr() .expect("generating a multiaddr from a socket addr never fails"); - (Ok(sock), addr) + (Ok(sock).into_future(), addr) }) }) .flatten_stream(); @@ -224,16 +221,18 @@ mod tests { let tcp = TcpConfig::new(core.handle()); let handle = core.handle(); let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| { - // Define what to do with the socket that just connected to us - // Which in this case is read 3 bytes - let handle_conn = tokio_io::io::read_exact(sock.unwrap(), [0; 3]) - .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) - .map_err(|err| panic!("IO error {:?}", err)); + sock.and_then(|sock| { + // Define what to do with the socket that just connected to us + // Which in this case is read 3 bytes + let handle_conn = tokio_io::io::read_exact(sock, [0; 3]) + .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) + .map_err(|err| panic!("IO error {:?}", err)); - // Spawn the future as a concurrent task - handle.spawn(handle_conn); + // Spawn the future as a concurrent task + handle.spawn(handle_conn); - Ok(()) + Ok(()) + }) }); core.run(listener).unwrap();