Properly handle protocol negotiation errors

This commit is contained in:
Pierre Krieger
2017-12-13 12:08:40 +01:00
parent 6617059645
commit 1da2c5015c
5 changed files with 92 additions and 49 deletions

View File

@ -80,6 +80,7 @@ fn main() {
// `echo` is what the closure used when initializing "echo" returns. // `echo` is what the closure used when initializing "echo" returns.
// Consequently, please note that the `send` method is available only because the type // Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method. // `length_delimited::Framed` has a `send` method.
println!("Sending \"hello world\" to listener");
echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some)) echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some))
.select( .select(
incoming incoming
@ -97,7 +98,7 @@ fn main() {
echo.unwrap() echo.unwrap()
.into_future() .into_future()
.map(|(msg, rest)| { .map(|(msg, rest)| {
println!("received: {:?}", msg); println!("Received message from listener: {:?}", msg);
rest rest
}) })
.map_err(|(err, _)| err) .map_err(|(err, _)| err)

View File

@ -71,23 +71,44 @@ fn main() {
let future = transport.listen_on(swarm::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap()) let future = transport.listen_on(swarm::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap())
.unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen")).0 .unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen")).0
.filter_map(|(socket, client_addr)| {
let client_addr = client_addr.to_string();
// This closure is called whenever a new connection has been received. The `socket`
// is a `Result<..., IoError>` which contains an error if for example protocol
// negotiation or the secio handshake failed. We handle this situation by printing a
// message on stderr and ignoring the connection.
match socket {
Ok(s) => Some((s, client_addr)),
Err(err) => {
eprintln!("Failed connection attempt from {}\n => Error: {:?}",
client_addr, err);
None
},
}
})
.for_each(|(socket, client_addr)| { .for_each(|(socket, client_addr)| {
// This closure is called whenever a new connection has been received and successfully // This closure is called whenever a new connection has been received and successfully
// upgraded to use secio/plaintext and echo. // upgraded to use secio/plaintext and echo.
let client_addr = client_addr.to_string(); println!("Successfully negotiated protocol with {}", client_addr);
println!("Received connection from {}", client_addr); let client_addr2 = client_addr.clone();
// We loop forever in order to handle all the messages sent by the client. // We loop forever in order to handle all the messages sent by the client.
let client_finished = loop_fn(socket, |socket| { let client_finished = loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future() socket.into_future()
.map_err(|(err, _)| err) .map_err(|(err, _)| err)
.and_then(|(msg, rest)| { .and_then(move |(msg, rest)| {
if let Some(msg) = msg { if let Some(msg) = msg {
// One message has been received. We send it back to the client. // One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m))) Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>> as Box<Future<Item = _, Error = _>>
} else { } else {
// End of stream. Connection closed. Breaking the loop. // End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection", client_addr);
Box::new(Ok(Loop::Break(())).into_future()) Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>> as Box<Future<Item = _, Error = _>>
} }
@ -95,10 +116,11 @@ fn main() {
}); });
// We absorb errors from the `client_finished` future so that an error while processing // We absorb errors from the `client_finished` future so that an error while processing
// a client doesn't propagate and stop the entire server. // a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
client_finished.then(move |res| { client_finished.then(move |res| {
if let Err(err) = res { if let Err(err) = res {
println!("error while processing client {}: {:?}", client_addr, err); println!("Error while processing client {}: {:?}", client_addr2, err);
} }
Ok(()) Ok(())
}) })

View File

@ -88,7 +88,12 @@ where
{ {
type RawConn = <C::Output as StreamMuxer>::Substream; type RawConn = <C::Output as StreamMuxer>::Substream;
type Listener = ConnectionReuseListener< type Listener = ConnectionReuseListener<
Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>, Box<
Stream<
Item = (Result<C::Output, IoError>, Multiaddr),
Error = IoError,
>,
>,
C::Output, C::Output,
>; >;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>; type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
@ -159,27 +164,32 @@ where
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the /// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct. /// `ConnectionReuse` struct.
pub struct ConnectionReuseListener<S, M> pub struct ConnectionReuseListener<S, M>
where where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
S: Stream<Item = (M, Multiaddr), Error = IoError>, M: StreamMuxer
M: StreamMuxer,
{ {
listener: StreamFuse<S>, listener: StreamFuse<S>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>, connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
} }
impl<S, M> Stream for ConnectionReuseListener<S, M> impl<S, M> Stream for ConnectionReuseListener<S, M>
where where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
S: Stream<Item = (M, Multiaddr), Error = IoError>, M: StreamMuxer + Clone + 'static // TODO: 'static :(
M: StreamMuxer + Clone + 'static, // TODO: 'static :(
{ {
type Item = (M::Substream, Multiaddr); type Item = (Result<M::Substream, IoError>, Multiaddr);
type Error = IoError; type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.listener.poll() { match self.listener.poll() {
Ok(Async::Ready(Some((upgrade, client_addr)))) => { Ok(Async::Ready(Some((upgrade, client_addr)))) => {
match upgrade {
Ok(upgrade) => {
let next_incoming = upgrade.clone().inbound(); let next_incoming = upgrade.clone().inbound();
self.connections.push((upgrade, next_incoming, client_addr)); self.connections.push((upgrade, next_incoming, client_addr));
},
Err(err) => {
return Ok(Async::Ready(Some((Err(err), client_addr))));
},
}
} }
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
@ -206,7 +216,7 @@ where
Ok(Async::Ready(incoming)) => { Ok(Async::Ready(incoming)) => {
let mut new_next = muxer.clone().inbound(); let mut new_next = muxer.clone().inbound();
*next_incoming = new_next; *next_incoming = new_next;
return Ok(Async::Ready(Some((incoming, client_addr.clone())))); return Ok(Async::Ready(Some((Ok(incoming), client_addr.clone()))));
} }
Ok(Async::NotReady) => {} Ok(Async::NotReady) => {}
Err(_) => { Err(_) => {

View File

@ -55,7 +55,7 @@ pub trait Transport {
type RawConn: AsyncRead + AsyncWrite; type RawConn: AsyncRead + AsyncWrite;
/// The listener produces incoming connections. /// The listener produces incoming connections.
type Listener: Stream<Item = (Self::RawConn, Multiaddr), Error = IoError>; type Listener: Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>;
/// A future which indicates that we are currently dialing to a peer. /// A future which indicates that we are currently dialing to a peer.
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>; type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
@ -137,7 +137,7 @@ pub struct DeniedTransport;
impl Transport for DeniedTransport { impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable // TODO: could use `!` for associated types once stable
type RawConn = Cursor<Vec<u8>>; type RawConn = Cursor<Vec<u8>>;
type Listener = Box<Stream<Item = (Self::RawConn, Multiaddr), Error = IoError>>; type Listener = Box<Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>; type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
#[inline] #[inline]
@ -307,19 +307,19 @@ pub enum EitherListenStream<A, B> {
impl<A, B, Sa, Sb> Stream for EitherListenStream<A, B> impl<A, B, Sa, Sb> Stream for EitherListenStream<A, B>
where where
A: Stream<Item = (Sa, Multiaddr), Error = IoError>, A: Stream<Item = (Result<Sa, IoError>, Multiaddr), Error = IoError>,
B: Stream<Item = (Sb, Multiaddr), Error = IoError>, B: Stream<Item = (Result<Sb, IoError>, Multiaddr), Error = IoError>,
{ {
type Item = (EitherSocket<Sa, Sb>, Multiaddr); type Item = (Result<EitherSocket<Sa, Sb>, IoError>, Multiaddr);
type Error = IoError; type Error = IoError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self { match self {
&mut EitherListenStream::First(ref mut a) => a.poll() &mut EitherListenStream::First(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a)))), .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::First), a)))),
&mut EitherListenStream::Second(ref mut a) => a.poll() &mut EitherListenStream::Second(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a)))), .map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::Second), a)))),
} }
} }
} }
@ -846,10 +846,7 @@ where
self, self,
addr: Multiaddr, addr: Multiaddr,
) -> Result< ) -> Result<
( (Box<Stream<Item = (Result<C::Output, IoError>, Multiaddr), Error = IoError> + 'a>, Multiaddr),
Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Multiaddr,
),
(Self, Multiaddr), (Self, Multiaddr),
> >
where where
@ -870,22 +867,35 @@ where
} }
}; };
let stream = listening_stream
// Try to negotiate the protocol. // Try to negotiate the protocol.
// Note that failing to negotiate a protocol will never produce a future with an error.
// Instead the `stream` will produce an `Ok(Err(...))`.
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
let stream = listening_stream
// Try to negotiate the protocol
.and_then(move |(connection, client_addr)| { .and_then(move |(connection, client_addr)| {
// Turn the `Result<impl AsyncRead + AsyncWrite, IoError>` into
// a `Result<impl Future<Item = impl AsyncRead + AsyncWrite, Error = IoError>, IoError>`
let connection = connection.map(|connection| {
let upgrade = upgrade.clone(); let upgrade = upgrade.clone();
#[inline] let iter = upgrade.protocol_names()
fn iter_map<T>((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) { .map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
(n, <Bytes as PartialEq>::eq, t) multistream_select::listener_select_proto(connection, iter)
}
let iter = upgrade.protocol_names().map(iter_map);
let negotiated = multistream_select::listener_select_proto(connection, iter);
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr))
.map_err(|err| IoError::new(IoErrorKind::Other, err)) .map_err(|err| IoError::new(IoErrorKind::Other, err))
}) .and_then(|(upgrade_id, connection)| {
.and_then(|(upgrade_id, connection, upgrade, client_addr)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
.map(|s| (s, client_addr)) })
});
connection
.into_future()
.flatten()
.then(move |nego_res| {
match nego_res {
Ok(upgraded) => Ok((Ok(upgraded), client_addr)),
Err(err) => Ok((Err(err), client_addr)),
}
})
}); });
Ok((Box::new(stream), new_addr)) Ok((Box::new(stream), new_addr))
@ -901,7 +911,7 @@ where
C: Clone, C: Clone,
{ {
type RawConn = C::Output; type RawConn = C::Output;
type Listener = Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>; type Listener = Box<Stream<Item = (Result<C::Output, IoError>, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = C::Output, Error = IoError>>; type Dial = Box<Future<Item = C::Output, Error = IoError>>;
#[inline] #[inline]

View File

@ -88,7 +88,7 @@ impl Transport for TcpConfig {
type RawConn = TcpStream; type RawConn = TcpStream;
/// The listener produces incoming connections. /// The listener produces incoming connections.
type Listener = Box<Stream<Item = (Self::RawConn, Multiaddr), Error = IoError>>; type Listener = Box<Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>>;
/// A future which indicates currently dialing to a peer. /// A future which indicates currently dialing to a peer.
type Dial = TcpStreamNew; type Dial = TcpStreamNew;
@ -114,7 +114,7 @@ impl Transport for TcpConfig {
listener.incoming().map(|(sock, addr)| { listener.incoming().map(|(sock, addr)| {
let addr = addr.to_multiaddr() let addr = addr.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails"); .expect("generating a multiaddr from a socket addr never fails");
(sock, addr) (Ok(sock), addr)
}) })
}) })
.flatten_stream(); .flatten_stream();
@ -237,7 +237,7 @@ mod tests {
let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| { let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| {
// Define what to do with the socket that just connected to us // Define what to do with the socket that just connected to us
// Which in this case is read 3 bytes // Which in this case is read 3 bytes
let handle_conn = tokio_io::io::read_exact(sock, [0; 3]) let handle_conn = tokio_io::io::read_exact(sock.unwrap(), [0; 3])
.map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) .map(|(_, buf)| assert_eq!(buf, [1, 2, 3]))
.map_err(|err| panic!("IO error {:?}", err)); .map_err(|err| panic!("IO error {:?}", err));