Fix concerns

This commit is contained in:
Pierre Krieger 2017-12-18 12:40:16 +01:00
parent 1da2c5015c
commit 952e3926c8
3 changed files with 31 additions and 27 deletions

View File

@ -92,35 +92,38 @@ fn main() {
// This closure is called whenever a new connection has been received and successfully // This closure is called whenever a new connection has been received and successfully
// upgraded to use secio/plaintext and echo. // upgraded to use secio/plaintext and echo.
println!("Successfully negotiated protocol with {}", client_addr); println!("Successfully negotiated protocol with {}", client_addr);
let client_addr2 = client_addr.clone();
// We loop forever in order to handle all the messages sent by the client. // We loop forever in order to handle all the messages sent by the client.
let client_finished = loop_fn(socket, move |socket| { let client_finished = {
let client_addr = client_addr.clone(); let client_addr = client_addr.clone();
socket.into_future() loop_fn(socket, move |socket| {
.map_err(|(err, _)| err) let client_addr = client_addr.clone();
.and_then(move |(msg, rest)| { socket.into_future()
if let Some(msg) = msg { .map_err(|(err, _)| err)
// One message has been received. We send it back to the client. .and_then(move |(msg, rest)| {
println!("Received a message from {}: {:?}\n => Sending back \ if let Some(msg) = msg {
identical message to remote", client_addr, msg); // One message has been received. We send it back to the client.
Box::new(rest.send(msg).map(|m| Loop::Continue(m))) println!("Received a message from {}: {:?}\n => Sending back \
as Box<Future<Item = _, Error = _>> identical message to remote", client_addr, msg);
} else { Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
// End of stream. Connection closed. Breaking the loop. as Box<Future<Item = _, Error = _>>
println!("Received EOF from {}\n => Dropping connection", client_addr); } else {
Box::new(Ok(Loop::Break(())).into_future()) // End of stream. Connection closed. Breaking the loop.
as Box<Future<Item = _, Error = _>> println!("Received EOF from {}\n => Dropping connection",
} client_addr);
}) Box::new(Ok(Loop::Break(())).into_future())
}); as Box<Future<Item = _, Error = _>>
}
})
})
};
// We absorb errors from the `client_finished` future so that an error while processing // We absorb errors from the `client_finished` future so that an error while processing
// a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the // a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server. // entire server.
client_finished.then(move |res| { client_finished.then(move |res| {
if let Err(err) = res { if let Err(err) = res {
println!("Error while processing client {}: {:?}", client_addr2, err); println!("Error while processing client {}: {:?}", client_addr, err);
} }
Ok(()) Ok(())
}) })

View File

@ -164,8 +164,9 @@ where
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the /// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct. /// `ConnectionReuse` struct.
pub struct ConnectionReuseListener<S, M> pub struct ConnectionReuseListener<S, M>
where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>, where
M: StreamMuxer S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
M: StreamMuxer
{ {
listener: StreamFuse<S>, listener: StreamFuse<S>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>, connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,

View File

@ -305,12 +305,12 @@ pub enum EitherListenStream<A, B> {
Second(B), Second(B),
} }
impl<A, B, Sa, Sb> Stream for EitherListenStream<A, B> impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where where
A: Stream<Item = (Result<Sa, IoError>, Multiaddr), Error = IoError>, AStream: Stream<Item = (Result<AInner, IoError>, Multiaddr), Error = IoError>,
B: Stream<Item = (Result<Sb, IoError>, Multiaddr), Error = IoError>, BStream: Stream<Item = (Result<BInner, IoError>, Multiaddr), Error = IoError>,
{ {
type Item = (Result<EitherSocket<Sa, Sb>, IoError>, Multiaddr); type Item = (Result<EitherSocket<AInner, BInner>, IoError>, Multiaddr);
type Error = IoError; type Error = IoError;
#[inline] #[inline]
@ -869,7 +869,7 @@ where
// Try to negotiate the protocol. // Try to negotiate the protocol.
// Note that failing to negotiate a protocol will never produce a future with an error. // Note that failing to negotiate a protocol will never produce a future with an error.
// Instead the `stream` will produce an `Ok(Err(...))`. // Instead the `stream` will produce `Ok(Err(...))`.
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`. // `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
let stream = listening_stream let stream = listening_stream
// Try to negotiate the protocol // Try to negotiate the protocol