Change API to allow multiple simultaneous clients

This commit is contained in:
Pierre Krieger
2017-12-19 18:09:17 +01:00
parent ec2a8b3832
commit 3438640907
4 changed files with 141 additions and 131 deletions

View File

@ -95,62 +95,49 @@ fn main() {
println!("Now listening on {:?}", address);
let future = listener
.filter_map(|(socket, client_addr)| {
let client_addr = client_addr.to_string();
// This closure is called whenever a new connection has been received. The `socket`
// is a `Result<..., IoError>` which contains an error if for example protocol
// negotiation or the secio handshake failed. We handle this situation by printing a
// message on stderr and ignoring the connection.
match socket {
Ok(s) => Some((s, client_addr)),
Err(err) => {
eprintln!("Failed connection attempt from {}\n => Error: {:?}",
client_addr, err);
None
},
}
})
.for_each(|(socket, client_addr)| {
// This closure is called whenever a new connection has been received and successfully
// upgraded to use secio/plaintext and echo.
println!("Successfully negotiated protocol with {}", client_addr);
// This closure is called whenever a new connection has been received.
// `socket` is a future that will be triggered once the upgrade to secio, multiplex
// and echo is complete.
let client_addr = client_addr.to_string();
println!("Incoming connection from {}", client_addr);
// We loop forever in order to handle all the messages sent by the client.
let client_finished = {
let client_addr = client_addr.clone();
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
socket
.and_then(move |socket| {
println!("Successfully negotiated protocol with {}", client_addr);
// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
})
};
// We absorb errors from the `client_finished` future so that an error while processing
// a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
client_finished.then(move |res| {
if let Err(err) = res {
println!("Error while processing client {}: {:?}", client_addr, err);
}
Ok(())
})
// We absorb errors from the future so that an error while processing a client
// (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
.then(move |res| {
if let Err(err) = res {
println!("Error while processing client: {:?}", err);
}
Ok(())
})
});
// `future` is a future that contains all the behaviour that we want, but nothing has actually