From 952e3926c803b1df30a84f669d057d7f87c99e47 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 18 Dec 2017 12:40:16 +0100 Subject: [PATCH] Fix concerns --- example/examples/echo-server.rs | 43 +++++++++++++++------------- libp2p-swarm/src/connection_reuse.rs | 5 ++-- libp2p-swarm/src/transport.rs | 10 +++---- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 00795894..ceccdbab 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -92,35 +92,38 @@ fn main() { // This closure is called whenever a new connection has been received and successfully // upgraded to use secio/plaintext and echo. println!("Successfully negotiated protocol with {}", client_addr); - let client_addr2 = client_addr.clone(); // We loop forever in order to handle all the messages sent by the client. - let client_finished = loop_fn(socket, move |socket| { + let client_finished = { let client_addr = client_addr.clone(); - socket.into_future() - .map_err(|(err, _)| err) - .and_then(move |(msg, rest)| { - if let Some(msg) = msg { - // One message has been received. We send it back to the client. - println!("Received a message from {}: {:?}\n => Sending back \ - identical message to remote", client_addr, msg); - Box::new(rest.send(msg).map(|m| Loop::Continue(m))) - as Box> - } else { - // End of stream. Connection closed. Breaking the loop. - println!("Received EOF from {}\n => Dropping connection", client_addr); - Box::new(Ok(Loop::Break(())).into_future()) - as Box> - } - }) - }); + loop_fn(socket, move |socket| { + let client_addr = client_addr.clone(); + socket.into_future() + .map_err(|(err, _)| err) + .and_then(move |(msg, rest)| { + if let Some(msg) = msg { + // One message has been received. We send it back to the client. + println!("Received a message from {}: {:?}\n => Sending back \ + identical message to remote", client_addr, msg); + Box::new(rest.send(msg).map(|m| Loop::Continue(m))) + as Box> + } else { + // End of stream. Connection closed. Breaking the loop. + println!("Received EOF from {}\n => Dropping connection", + client_addr); + Box::new(Ok(Loop::Break(())).into_future()) + as Box> + } + }) + }) + }; // We absorb errors from the `client_finished` future so that an error while processing // a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the // entire server. client_finished.then(move |res| { if let Err(err) = res { - println!("Error while processing client {}: {:?}", client_addr2, err); + println!("Error while processing client {}: {:?}", client_addr, err); } Ok(()) }) diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 19d17c47..60037db0 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -164,8 +164,9 @@ where /// Implementation of `Stream -where S: Stream, Multiaddr), Error = IoError>, - M: StreamMuxer +where + S: Stream, Multiaddr), Error = IoError>, + M: StreamMuxer { listener: StreamFuse, connections: Vec<(M, ::InboundSubstream, Multiaddr)>, diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index ccf22e6e..42a09f94 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -305,12 +305,12 @@ pub enum EitherListenStream { Second(B), } -impl Stream for EitherListenStream +impl Stream for EitherListenStream where - A: Stream, Multiaddr), Error = IoError>, - B: Stream, Multiaddr), Error = IoError>, + AStream: Stream, Multiaddr), Error = IoError>, + BStream: Stream, Multiaddr), Error = IoError>, { - type Item = (Result, IoError>, Multiaddr); + type Item = (Result, IoError>, Multiaddr); type Error = IoError; #[inline] @@ -869,7 +869,7 @@ where // Try to negotiate the protocol. // Note that failing to negotiate a protocol will never produce a future with an error. - // Instead the `stream` will produce an `Ok(Err(...))`. + // Instead the `stream` will produce `Ok(Err(...))`. // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. let stream = listening_stream // Try to negotiate the protocol