From 1da2c5015c566f954e9601f485104c3677c3cad2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 13 Dec 2017 12:08:40 +0100 Subject: [PATCH] Properly handle protocol negotiation errors --- example/examples/echo-dialer.rs | 3 +- example/examples/echo-server.rs | 34 +++++++++++--- libp2p-swarm/src/connection_reuse.rs | 32 +++++++++----- libp2p-swarm/src/transport.rs | 66 ++++++++++++++++------------ libp2p-tcp-transport/src/lib.rs | 6 +-- 5 files changed, 92 insertions(+), 49 deletions(-) diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index c247b28b..0fb77da6 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -80,6 +80,7 @@ fn main() { // `echo` is what the closure used when initializing "echo" returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. + println!("Sending \"hello world\" to listener"); echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some)) .select( incoming @@ -97,7 +98,7 @@ fn main() { echo.unwrap() .into_future() .map(|(msg, rest)| { - println!("received: {:?}", msg); + println!("Received message from listener: {:?}", msg); rest }) .map_err(|(err, _)| err) diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 5c8fb209..00795894 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -71,23 +71,44 @@ fn main() { let future = transport.listen_on(swarm::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap()) .unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen")).0 + .filter_map(|(socket, client_addr)| { + let client_addr = client_addr.to_string(); + + // This closure is called whenever a new connection has been received. The `socket` + // is a `Result<..., IoError>` which contains an error if for example protocol + // negotiation or the secio handshake failed. We handle this situation by printing a + // message on stderr and ignoring the connection. + match socket { + Ok(s) => Some((s, client_addr)), + Err(err) => { + eprintln!("Failed connection attempt from {}\n => Error: {:?}", + client_addr, err); + None + }, + } + }) + .for_each(|(socket, client_addr)| { // This closure is called whenever a new connection has been received and successfully // upgraded to use secio/plaintext and echo. - let client_addr = client_addr.to_string(); - println!("Received connection from {}", client_addr); + println!("Successfully negotiated protocol with {}", client_addr); + let client_addr2 = client_addr.clone(); // We loop forever in order to handle all the messages sent by the client. - let client_finished = loop_fn(socket, |socket| { + let client_finished = loop_fn(socket, move |socket| { + let client_addr = client_addr.clone(); socket.into_future() .map_err(|(err, _)| err) - .and_then(|(msg, rest)| { + .and_then(move |(msg, rest)| { if let Some(msg) = msg { // One message has been received. We send it back to the client. + println!("Received a message from {}: {:?}\n => Sending back \ + identical message to remote", client_addr, msg); Box::new(rest.send(msg).map(|m| Loop::Continue(m))) as Box> } else { // End of stream. Connection closed. Breaking the loop. + println!("Received EOF from {}\n => Dropping connection", client_addr); Box::new(Ok(Loop::Break(())).into_future()) as Box> } @@ -95,10 +116,11 @@ fn main() { }); // We absorb errors from the `client_finished` future so that an error while processing - // a client doesn't propagate and stop the entire server. + // a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the + // entire server. client_finished.then(move |res| { if let Err(err) = res { - println!("error while processing client {}: {:?}", client_addr, err); + println!("Error while processing client {}: {:?}", client_addr2, err); } Ok(()) }) diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 04ecc894..19d17c47 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -88,7 +88,12 @@ where { type RawConn = ::Substream; type Listener = ConnectionReuseListener< - Box>, + Box< + Stream< + Item = (Result, Multiaddr), + Error = IoError, + >, + >, C::Output, >; type Dial = Box>; @@ -159,27 +164,32 @@ where /// Implementation of `Stream -where - S: Stream, - M: StreamMuxer, +where S: Stream, Multiaddr), Error = IoError>, + M: StreamMuxer { listener: StreamFuse, connections: Vec<(M, ::InboundSubstream, Multiaddr)>, } impl Stream for ConnectionReuseListener -where - S: Stream, - M: StreamMuxer + Clone + 'static, // TODO: 'static :( +where S: Stream, Multiaddr), Error = IoError>, + M: StreamMuxer + Clone + 'static // TODO: 'static :( { - type Item = (M::Substream, Multiaddr); + type Item = (Result, Multiaddr); type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { match self.listener.poll() { Ok(Async::Ready(Some((upgrade, client_addr)))) => { - let next_incoming = upgrade.clone().inbound(); - self.connections.push((upgrade, next_incoming, client_addr)); + match upgrade { + Ok(upgrade) => { + let next_incoming = upgrade.clone().inbound(); + self.connections.push((upgrade, next_incoming, client_addr)); + }, + Err(err) => { + return Ok(Async::Ready(Some((Err(err), client_addr)))); + }, + } } Ok(Async::NotReady) => (), Ok(Async::Ready(None)) => { @@ -206,7 +216,7 @@ where Ok(Async::Ready(incoming)) => { let mut new_next = muxer.clone().inbound(); *next_incoming = new_next; - return Ok(Async::Ready(Some((incoming, client_addr.clone())))); + return Ok(Async::Ready(Some((Ok(incoming), client_addr.clone())))); } Ok(Async::NotReady) => {} Err(_) => { diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index cc0e2819..ccf22e6e 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -55,7 +55,7 @@ pub trait Transport { type RawConn: AsyncRead + AsyncWrite; /// The listener produces incoming connections. - type Listener: Stream; + type Listener: Stream, Multiaddr), Error = IoError>; /// A future which indicates that we are currently dialing to a peer. type Dial: IntoFuture; @@ -137,7 +137,7 @@ pub struct DeniedTransport; impl Transport for DeniedTransport { // TODO: could use `!` for associated types once stable type RawConn = Cursor>; - type Listener = Box>; + type Listener = Box, Multiaddr), Error = IoError>>; type Dial = Box>; #[inline] @@ -307,19 +307,19 @@ pub enum EitherListenStream { impl Stream for EitherListenStream where - A: Stream, - B: Stream, + A: Stream, Multiaddr), Error = IoError>, + B: Stream, Multiaddr), Error = IoError>, { - type Item = (EitherSocket, Multiaddr); + type Item = (Result, IoError>, Multiaddr); type Error = IoError; #[inline] fn poll(&mut self) -> Poll, Self::Error> { match self { &mut EitherListenStream::First(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a)))), + .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::First), a)))), &mut EitherListenStream::Second(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a)))), + .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::Second), a)))), } } } @@ -846,10 +846,7 @@ where self, addr: Multiaddr, ) -> Result< - ( - Box + 'a>, - Multiaddr, - ), + (Box, Multiaddr), Error = IoError> + 'a>, Multiaddr), (Self, Multiaddr), > where @@ -870,23 +867,36 @@ where } }; + // Try to negotiate the protocol. + // Note that failing to negotiate a protocol will never produce a future with an error. + // Instead the `stream` will produce an `Ok(Err(...))`. + // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. let stream = listening_stream - // Try to negotiate the protocol. - .and_then(move |(connection, client_addr)| { - let upgrade = upgrade.clone(); - #[inline] - fn iter_map((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) { - (n, ::eq, t) - } - let iter = upgrade.protocol_names().map(iter_map); - let negotiated = multistream_select::listener_select_proto(connection, iter); - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - }) - .and_then(|(upgrade_id, connection, upgrade, client_addr)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) - .map(|s| (s, client_addr)) - }); + // Try to negotiate the protocol + .and_then(move |(connection, client_addr)| { + // Turn the `Result` into + // a `Result, IoError>` + let connection = connection.map(|connection| { + let upgrade = upgrade.clone(); + let iter = upgrade.protocol_names() + .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); + multistream_select::listener_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .and_then(|(upgrade_id, connection)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) + }) + }); + + connection + .into_future() + .flatten() + .then(move |nego_res| { + match nego_res { + Ok(upgraded) => Ok((Ok(upgraded), client_addr)), + Err(err) => Ok((Err(err), client_addr)), + } + }) + }); Ok((Box::new(stream), new_addr)) } @@ -901,7 +911,7 @@ where C: Clone, { type RawConn = C::Output; - type Listener = Box>; + type Listener = Box, Multiaddr), Error = IoError>>; type Dial = Box>; #[inline] diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index 78a7f68f..0e205a45 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -88,7 +88,7 @@ impl Transport for TcpConfig { type RawConn = TcpStream; /// The listener produces incoming connections. - type Listener = Box>; + type Listener = Box, Multiaddr), Error = IoError>>; /// A future which indicates currently dialing to a peer. type Dial = TcpStreamNew; @@ -114,7 +114,7 @@ impl Transport for TcpConfig { listener.incoming().map(|(sock, addr)| { let addr = addr.to_multiaddr() .expect("generating a multiaddr from a socket addr never fails"); - (sock, addr) + (Ok(sock), addr) }) }) .flatten_stream(); @@ -237,7 +237,7 @@ mod tests { let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| { // Define what to do with the socket that just connected to us // Which in this case is read 3 bytes - let handle_conn = tokio_io::io::read_exact(sock, [0; 3]) + let handle_conn = tokio_io::io::read_exact(sock.unwrap(), [0; 3]) .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) .map_err(|err| panic!("IO error {:?}", err));