Return TCP listener errors. (#1218)

* Add listener ID and error event.

Report listener errors to client code so they are aware that an error
occurred within a listener. By default we continue to poll listeners
which produced an error, but clients can remove listeners by ID.

* tcp: Report errors.

Instead of silently waiting after errors we return all errors, but pause
after each error, before continuing.

* Add a test.

To ease testing, `Listener` is made generic and we test that no values
and errors are lost. Elapsed time between item generation is not
measured.

* Support the new methods in core-derive.

* Address review concerns.

* Remove `Display` impl of `ListenerId`.

* Add 'static bound to `on_listener_error` error.
This commit is contained in:
Toralf Wittner
2019-08-13 15:41:12 +02:00
committed by Pierre Krieger
parent ec22688f96
commit 2c4b52a545
8 changed files with 294 additions and 94 deletions

View File

@ -53,17 +53,20 @@ use void::Void;
/// // The `listeners` will now generate events when polled.
/// let future = listeners.for_each(move |event| {
/// match event {
/// ListenersEvent::NewAddress { listen_addr } => {
/// println!("Listener is listening at address {}", listen_addr);
/// ListenersEvent::NewAddress { listener_id, listen_addr } => {
/// println!("Listener {:?} is listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::AddressExpired { listen_addr } => {
/// println!("Listener is no longer listening at address {}", listen_addr);
/// ListenersEvent::AddressExpired { listener_id, listen_addr } => {
/// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::Closed { result, .. } => {
/// println!("Listener has been closed: {:?}", result);
/// ListenersEvent::Closed { listener_id, .. } => {
/// println!("Listener {:?} has been closed", listener_id);
/// },
/// ListenersEvent::Incoming { upgrade, listen_addr, .. } => {
/// println!("A connection has arrived on {}", listen_addr);
/// ListenersEvent::Error { listener_id, error } => {
/// println!("Listener {:?} has experienced an error: {}", listener_id, error);
/// },
/// ListenersEvent::Incoming { listener_id, upgrade, listen_addr, .. } => {
/// println!("Listener {:?} has a new connection on {}", listener_id, listen_addr);
/// // We don't do anything with the newly-opened connection, but in a real-life
/// // program you probably want to use it!
/// drop(upgrade);
@ -83,15 +86,26 @@ where
/// Transport used to spawn listeners.
transport: TTrans,
/// All the active listeners.
listeners: VecDeque<Listener<TTrans>>
listeners: VecDeque<Listener<TTrans>>,
/// The next listener ID to assign.
next_id: ListenerId
}
/// The ID of a single listener.
///
/// It is part of most [`ListenersEvent`]s and can be used to remove
/// individual listeners from the [`ListenersStream`].
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);
/// A single active listener.
#[derive(Debug)]
struct Listener<TTrans>
where
TTrans: Transport,
{
/// The ID of this listener.
id: ListenerId,
/// The object that actually listens.
listener: TTrans::Listener,
/// Addresses it is listening on.
@ -105,16 +119,22 @@ where
{
/// A new address is being listened on.
NewAddress {
/// The listener that is listening on the new address.
listener_id: ListenerId,
/// The new address that is being listened on.
listen_addr: Multiaddr
},
/// An address is no longer being listened on.
AddressExpired {
/// The listener that is no longer listening on the address.
listener_id: ListenerId,
/// The new address that is being listened on.
listen_addr: Multiaddr
},
/// A connection is incoming on one of the listeners.
Incoming {
/// The listener that produced the upgrade.
listener_id: ListenerId,
/// The produced upgrade.
upgrade: TTrans::ListenerUpgrade,
/// Address of the listener which received the connection.
@ -122,13 +142,23 @@ where
/// Address used to send back data to the incoming client.
send_back_addr: Multiaddr,
},
/// A listener has closed, either gracefully or with an error.
/// A listener closed.
Closed {
/// The ID of the listener that closed.
listener_id: ListenerId,
/// The listener that closed.
listener: TTrans::Listener,
/// The error that happened. `Ok` if gracefully closed.
result: Result<(), <TTrans::Listener as Stream>::Error>,
},
/// A listener errored.
///
/// The listener will continue to be polled for new events and the event
/// is for informational purposes only.
Error {
/// The ID of the listener that errored.
listener_id: ListenerId,
/// The error value.
error: <TTrans::Listener as Stream>::Error
}
}
impl<TTrans> ListenersStream<TTrans>
@ -136,34 +166,49 @@ where
TTrans: Transport,
{
/// Starts a new stream of listeners.
#[inline]
pub fn new(transport: TTrans) -> Self {
ListenersStream {
transport,
listeners: VecDeque::new()
listeners: VecDeque::new(),
next_id: ListenerId(1)
}
}
/// Same as `new`, but pre-allocates enough memory for the given number of
/// simultaneous listeners.
#[inline]
pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
ListenersStream {
transport,
listeners: VecDeque::with_capacity(capacity)
listeners: VecDeque::with_capacity(capacity),
next_id: ListenerId(1)
}
}
/// Start listening on a multiaddress.
///
/// Returns an error if the transport doesn't support the given multiaddress.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<(), TransportError<TTrans::Error>>
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>>
where
TTrans: Clone,
{
let listener = self.transport.clone().listen_on(addr)?;
self.listeners.push_back(Listener { listener, addresses: SmallVec::new() });
Ok(())
self.listeners.push_back(Listener {
id: self.next_id,
listener,
addresses: SmallVec::new()
});
let id = self.next_id;
self.next_id = ListenerId(self.next_id.0 + 1);
Ok(id)
}
/// Remove the listener matching the given `ListenerId`.
pub fn remove_listener(&mut self, id: ListenerId) -> Option<TTrans::Listener> {
if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
self.listeners.remove(i).map(|l| l.listener)
} else {
None
}
}
/// Returns the transport passed when building this object.
@ -191,8 +236,10 @@ where
debug_assert!(listener.addresses.contains(&listen_addr),
"Transport reported listen address {} not in the list: {:?}",
listen_addr, listener.addresses);
let id = listener.id;
self.listeners.push_front(listener);
return Async::Ready(ListenersEvent::Incoming {
listener_id: id,
upgrade,
listen_addr,
send_back_addr: remote_addr
@ -204,24 +251,32 @@ where
if !listener.addresses.contains(&a) {
listener.addresses.push(a.clone());
}
let id = listener.id;
self.listeners.push_front(listener);
return Async::Ready(ListenersEvent::NewAddress { listen_addr: a })
return Async::Ready(ListenersEvent::NewAddress {
listener_id: id,
listen_addr: a
})
}
Ok(Async::Ready(Some(ListenerEvent::AddressExpired(a)))) => {
listener.addresses.retain(|x| x != &a);
let id = listener.id;
self.listeners.push_front(listener);
return Async::Ready(ListenersEvent::AddressExpired { listen_addr: a })
return Async::Ready(ListenersEvent::AddressExpired {
listener_id: id,
listen_addr: a
})
}
Ok(Async::Ready(None)) => {
return Async::Ready(ListenersEvent::Closed {
listener: listener.listener,
result: Ok(()),
listener_id: listener.id,
listener: listener.listener
})
}
Err(err) => {
return Async::Ready(ListenersEvent::Closed {
listener: listener.listener,
result: Err(err),
return Async::Ready(ListenersEvent::Error {
listener_id: listener.id,
error: err
})
}
}
@ -239,7 +294,6 @@ where
type Item = ListenersEvent<TTrans>;
type Error = Void; // TODO: use ! once stable
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(self.poll().map(Option::Some))
}
@ -264,22 +318,30 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
ListenersEvent::NewAddress { listen_addr } => f
ListenersEvent::NewAddress { listener_id, listen_addr } => f
.debug_struct("ListenersEvent::NewAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::AddressExpired { listen_addr } => f
ListenersEvent::AddressExpired { listener_id, listen_addr } => f
.debug_struct("ListenersEvent::AddressExpired")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::Incoming { listen_addr, .. } => f
ListenersEvent::Incoming { listener_id, listen_addr, .. } => f
.debug_struct("ListenersEvent::Incoming")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::Closed { result, .. } => f
ListenersEvent::Closed { listener_id, .. } => f
.debug_struct("ListenersEvent::Closed")
.field("result", result)
.field("listener_id", listener_id)
.finish(),
ListenersEvent::Error { listener_id, error } => f
.debug_struct("ListenersEvent::Error")
.field("listener_id", listener_id)
.field("error", error)
.finish()
}
}
}
@ -478,7 +540,7 @@ mod tests {
}
#[test]
fn listener_stream_poll_with_erroring_listener_emits_closed_event() {
fn listener_stream_poll_with_erroring_listener_emits_error_event() {
let mut t = DummyTransport::new();
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
@ -493,7 +555,7 @@ mod tests {
ls.listen_on(addr).expect("listen_on failed");
set_listener_state(&mut ls, 0, ListenerState::Error); // simulate an error on the socket
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Closed{..})
assert_matches!(listeners_event, ListenersEvent::Error{..})
});
assert_eq!(ls.listeners.len(), 0); // it's gone
}

View File

@ -37,3 +37,5 @@ pub use collection::ConnectionInfo;
pub use node::Substream;
pub use handled_node::{NodeHandlerEvent, NodeHandlerEndpoint};
pub use network::{Peer, Network, NetworkEvent};
pub use listeners::ListenerId;

View File

@ -38,7 +38,7 @@ use crate::{
handled_node::IntoNodeHandler,
node::Substream
},
nodes::listeners::{ListenersEvent, ListenersStream},
nodes::listeners::{ListenersEvent, ListenerId, ListenersStream},
transport::{Transport, TransportError}
};
use fnv::FnvHashMap;
@ -162,20 +162,32 @@ where
{
/// One of the listeners gracefully closed.
ListenerClosed {
/// The listener ID that closed.
listener_id: ListenerId,
/// The listener which closed.
listener: TTrans::Listener,
/// The error that happened. `Ok` if gracefully closed.
result: Result<(), <TTrans::Listener as Stream>::Error>,
},
/// One of the listeners errored.
ListenerError {
/// The listener that errored.
listener_id: ListenerId,
/// The listener error.
error: <TTrans::Listener as Stream>::Error
},
/// One of the listeners is now listening on an additional address.
NewListenerAddress {
/// The listener that is listening on the new address.
listener_id: ListenerId,
/// The new address the listener is now also listening on.
listen_addr: Multiaddr
},
/// One of the listeners is no longer listening on some address.
ExpiredListenerAddress {
/// The listener that is no longer listening on some address.
listener_id: ListenerId,
/// The expired address.
listen_addr: Multiaddr
},
@ -277,46 +289,50 @@ where
TPeerId: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
NetworkEvent::NewListenerAddress { ref listen_addr } => {
match self {
NetworkEvent::NewListenerAddress { listener_id, listen_addr } => {
f.debug_struct("NewListenerAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish()
}
NetworkEvent::ExpiredListenerAddress { ref listen_addr } => {
NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr } => {
f.debug_struct("ExpiredListenerAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish()
}
NetworkEvent::ListenerClosed { ref result, .. } => {
NetworkEvent::ListenerClosed { listener_id, .. } => {
f.debug_struct("ListenerClosed")
.field("result", result)
.field("listener_id", listener_id)
.finish()
}
NetworkEvent::IncomingConnection(ref event) => {
NetworkEvent::ListenerError { listener_id, error } => {
f.debug_struct("ListenerError")
.field("listener_id", listener_id)
.field("error", error)
.finish()
}
NetworkEvent::IncomingConnection(event) => {
f.debug_struct("IncomingConnection")
.field("listen_addr", &event.listen_addr)
.field("send_back_addr", &event.send_back_addr)
.finish()
}
NetworkEvent::IncomingConnectionError {
ref listen_addr,
ref send_back_addr,
ref error
} => {
NetworkEvent::IncomingConnectionError { listen_addr, send_back_addr, error } => {
f.debug_struct("IncomingConnectionError")
.field("listen_addr", listen_addr)
.field("send_back_addr", send_back_addr)
.field("error", error)
.finish()
}
NetworkEvent::Connected { ref conn_info, ref endpoint } => {
NetworkEvent::Connected { conn_info, endpoint } => {
f.debug_struct("Connected")
.field("conn_info", conn_info)
.field("endpoint", endpoint)
.finish()
}
NetworkEvent::Replaced { ref new_info, ref old_info, ref closed_endpoint, ref endpoint } => {
NetworkEvent::Replaced { new_info, old_info, closed_endpoint, endpoint } => {
f.debug_struct("Replaced")
.field("new_info", new_info)
.field("old_info", old_info)
@ -324,14 +340,14 @@ where
.field("endpoint", endpoint)
.finish()
}
NetworkEvent::NodeClosed { ref conn_info, ref endpoint, ref error } => {
NetworkEvent::NodeClosed { conn_info, endpoint, error } => {
f.debug_struct("NodeClosed")
.field("conn_info", conn_info)
.field("endpoint", endpoint)
.field("error", error)
.finish()
}
NetworkEvent::DialError { ref new_state, ref peer_id, ref multiaddr, ref error } => {
NetworkEvent::DialError { new_state, peer_id, multiaddr, error } => {
f.debug_struct("DialError")
.field("new_state", new_state)
.field("peer_id", peer_id)
@ -339,13 +355,13 @@ where
.field("error", error)
.finish()
}
NetworkEvent::UnknownPeerDialError { ref multiaddr, ref error, .. } => {
NetworkEvent::UnknownPeerDialError { multiaddr, error, .. } => {
f.debug_struct("UnknownPeerDialError")
.field("multiaddr", multiaddr)
.field("error", error)
.finish()
}
NetworkEvent::NodeEvent { ref conn_info, ref event } => {
NetworkEvent::NodeEvent { conn_info, event } => {
f.debug_struct("NodeEvent")
.field("conn_info", conn_info)
.field("event", event)
@ -536,6 +552,8 @@ where TTransErr: error::Error + 'static
pub struct IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
where TTrans: Transport
{
/// The listener who received the connection.
listener_id: ListenerId,
/// The produced upgrade.
upgrade: TTrans::ListenerUpgrade,
/// PeerId of the local node.
@ -568,6 +586,11 @@ where
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
{
/// The ID of the listener with the incoming connection.
pub fn listener_id(&self) -> ListenerId {
self.listener_id
}
/// Starts processing the incoming connection and sets the handler to use for it.
pub fn accept(self, handler: THandler) {
self.accept_with_builder(|_| handler)
@ -700,10 +723,15 @@ where
}
/// Start listening on the given multiaddress.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<(), TransportError<TTrans::Error>> {
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>> {
self.listeners.listen_on(addr)
}
/// Remove a previously added listener.
pub fn remove_listener(&mut self, id: ListenerId) -> Option<TTrans::Listener> {
self.listeners.remove_listener(id)
}
/// Returns an iterator that produces the list of addresses we are listening on.
pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.listen_addrs()
@ -977,8 +1005,9 @@ where
_ => {
match self.listeners.poll() {
Async::NotReady => (),
Async::Ready(ListenersEvent::Incoming { upgrade, listen_addr, send_back_addr }) => {
Async::Ready(ListenersEvent::Incoming { listener_id, upgrade, listen_addr, send_back_addr }) => {
let event = IncomingConnectionEvent {
listener_id,
upgrade,
local_peer_id: self.reach_attempts.local_peer_id.clone(),
listen_addr,
@ -988,14 +1017,17 @@ where
};
return Async::Ready(NetworkEvent::IncomingConnection(event));
}
Async::Ready(ListenersEvent::NewAddress { listen_addr }) => {
return Async::Ready(NetworkEvent::NewListenerAddress { listen_addr })
Async::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => {
return Async::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr })
}
Async::Ready(ListenersEvent::AddressExpired { listen_addr }) => {
return Async::Ready(NetworkEvent::ExpiredListenerAddress { listen_addr })
Async::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
return Async::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
}
Async::Ready(ListenersEvent::Closed { listener, result }) => {
return Async::Ready(NetworkEvent::ListenerClosed { listener, result })
Async::Ready(ListenersEvent::Closed { listener_id, listener }) => {
return Async::Ready(NetworkEvent::ListenerClosed { listener_id, listener })
}
Async::Ready(ListenersEvent::Error { listener_id, error }) => {
return Async::Ready(NetworkEvent::ListenerError { listener_id, error })
}
}
}

View File

@ -56,6 +56,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let into_proto_select_ident = quote!{::libp2p::swarm::IntoProtocolsHandlerSelect};
let peer_id = quote!{::libp2p::core::PeerId};
let connected_point = quote!{::libp2p::core::ConnectedPoint};
let listener_id = quote!{::libp2p::core::nodes::ListenerId};
// Name of the type parameter that represents the substream.
let substream_generic = {
@ -284,6 +285,32 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
})
};
// Build the list of statements to put in the body of `inject_listener_error()`.
let inject_listener_error_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None
}
Some(match field.ident {
Some(ref i) => quote!(self.#i.inject_listener_error(id, err);),
None => quote!(self.#field_n.inject_listener_error(id, err);)
})
})
};
// Build the list of statements to put in the body of `inject_listener_closed()`.
let inject_listener_closed_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None
}
Some(match field.ident {
Some(ref i) => quote!(self.#i.inject_listener_closed(id);),
None => quote!(self.#field_n.inject_listener_closed(id);)
})
})
};
// Build the list of variants to put in the body of `inject_node_event()`.
//
// The event type is a construction of nested `#either_ident`s of the events of the children.
@ -467,6 +494,14 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_new_external_addr_stmts);*
}
fn inject_listener_error(&mut self, id: #listener_id, err: &(dyn std::error::Error + 'static)) {
#(#inject_listener_error_stmts);*
}
fn inject_listener_closed(&mut self, id: #listener_id) {
#(#inject_listener_closed_stmts);*
}
fn inject_node_event(
&mut self,
peer_id: #peer_id,

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::protocols_handler::{IntoProtocolsHandler, ProtocolsHandler};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, nodes::ListenerId};
use futures::prelude::*;
use std::error;
@ -121,6 +121,14 @@ pub trait NetworkBehaviour {
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
}
/// A listener experienced an error.
fn inject_listener_error(&mut self, _id: ListenerId, _err: &(dyn std::error::Error + 'static)) {
}
/// A listener closed.
fn inject_listener_closed(&mut self, _id: ListenerId) {
}
/// Polls for things that swarm should do.
///
/// This API mimics the API of the `Stream` trait. The method may register the current task in

View File

@ -83,6 +83,7 @@ use libp2p_core::{
Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
muxing::StreamMuxer,
nodes::{
ListenerId,
collection::ConnectionInfo,
handled_node::NodeHandler,
node::Substream,
@ -218,10 +219,15 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Starts listening on the given address.
///
/// Returns an error if the address is not supported.
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<(), TransportError<TTransport::Error>> {
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTransport::Error>> {
me.network.listen_on(addr)
}
/// Remove some listener.
pub fn remove_listener(me: &mut Self, id: ListenerId) -> Option<TTransport::Listener> {
me.network.remove_listener(id)
}
/// Tries to dial the given address.
///
/// Returns an error if the address is not supported.
@ -366,17 +372,20 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
let handler = self.behaviour.new_handler();
incoming.accept(handler.into_node_handler_builder());
},
Async::Ready(NetworkEvent::NewListenerAddress { listen_addr }) => {
Async::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
if !self.listened_addrs.contains(&listen_addr) {
self.listened_addrs.push(listen_addr.clone())
}
self.behaviour.inject_new_listen_addr(&listen_addr);
}
Async::Ready(NetworkEvent::ExpiredListenerAddress { listen_addr }) => {
Async::Ready(NetworkEvent::ExpiredListenerAddress { listen_addr, .. }) => {
self.listened_addrs.retain(|a| a != &listen_addr);
self.behaviour.inject_expired_listen_addr(&listen_addr);
}
Async::Ready(NetworkEvent::ListenerClosed { .. }) => {},
Async::Ready(NetworkEvent::ListenerClosed { listener_id, .. }) =>
self.behaviour.inject_listener_closed(listener_id),
Async::Ready(NetworkEvent::ListenerError { listener_id, error }) =>
self.behaviour.inject_listener_error(listener_id, &error),
Async::Ready(NetworkEvent::IncomingConnectionError { .. }) => {},
Async::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, new_state }) => {
self.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);

View File

@ -16,8 +16,8 @@ ipnet = "2.0.0"
libp2p-core = { version = "0.11.0", path = "../../core" }
log = "0.4.1"
futures = "0.1"
tk-listen = "0.2.0"
tokio-io = "0.1"
tokio-timer = "0.2"
tokio-tcp = "0.1"
[dev-dependencies]

View File

@ -53,15 +53,14 @@ use libp2p_core::{
use log::{debug, trace};
use std::{
collections::VecDeque,
fmt,
io::{self, Read, Write},
iter::{self, FromIterator},
net::{IpAddr, SocketAddr},
time::Duration,
time::{Duration, Instant},
vec::IntoIter
};
use tk_listen::{ListenExt, SleepOnError};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use tokio_tcp::{ConnectFuture, Incoming, TcpStream};
/// Represents the configuration for a TCP/IP transport capability for libp2p.
@ -178,7 +177,7 @@ impl Transport for TcpConfig {
};
let stream = TcpListenStream {
inner: Ok(listener.incoming().sleep_on_error(self.sleep_on_error)),
inner: Listener::new(listener.incoming(), self.sleep_on_error),
port,
addrs,
pending: VecDeque::new(),
@ -353,10 +352,59 @@ enum Addresses {
type Buffer = VecDeque<ListenerEvent<FutureResult<TcpTransStream, io::Error>>>;
/// Incoming connection stream which pauses after errors.
#[derive(Debug)]
struct Listener<S> {
/// The incoming connections.
stream: S,
/// The current pause if any.
pause: Option<Delay>,
/// How long to pause after an error.
pause_duration: Duration
}
impl<S> Listener<S>
where
S: Stream,
S::Error: std::fmt::Display
{
fn new(stream: S, duration: Duration) -> Self {
Listener { stream, pause: None, pause_duration: duration }
}
}
impl<S> Stream for Listener<S>
where
S: Stream,
S::Error: std::fmt::Display
{
type Item = S::Item;
type Error = S::Error;
/// Polls for incoming connections, pausing if an error is encountered.
fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
match self.pause.as_mut().map(|p| p.poll()) {
Some(Ok(Async::NotReady)) => return Ok(Async::NotReady),
Some(Ok(Async::Ready(()))) | Some(Err(_)) => { self.pause.take(); }
None => ()
}
match self.stream.poll() {
Ok(x) => Ok(x),
Err(e) => {
debug!("error accepting incoming connection: {}", e);
self.pause = Some(Delay::new(Instant::now() + self.pause_duration));
Err(e)
}
}
}
}
/// Stream that listens on an TCP/IP address.
#[derive(Debug)]
pub struct TcpListenStream {
/// Stream of incoming sockets.
inner: Result<SleepOnError<Incoming>, Option<io::Error>>,
inner: Listener<Incoming>,
/// The port which we use as our listen port in listener event addresses.
port: u16,
/// The set of known addresses.
@ -428,21 +476,16 @@ impl Stream for TcpListenStream {
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
let inner = match self.inner {
Ok(ref mut inc) => inc,
Err(ref mut err) => return Err(err.take().expect("poll called again after error"))
};
loop {
if let Some(event) = self.pending.pop_front() {
return Ok(Async::Ready(Some(event)))
}
let sock = match inner.poll() {
let sock = match self.inner.poll() {
Ok(Async::Ready(Some(sock))) => sock,
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!("sleep_on_error never produces an error")
Err(e) => return Err(e)
};
let sock_addr = match sock.peer_addr() {
@ -485,16 +528,6 @@ impl Stream for TcpListenStream {
}
}
impl fmt::Debug for TcpListenStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.inner {
Ok(_) => write!(f, "TcpListenStream"),
Err(None) => write!(f, "TcpListenStream(Errored)"),
Err(Some(ref err)) => write!(f, "TcpListenStream({:?})", err),
}
}
}
/// Wraps around a `TcpStream` and adds logging for important events.
#[derive(Debug)]
pub struct TcpTransStream {
@ -545,13 +578,32 @@ impl Drop for TcpTransStream {
#[cfg(test)]
mod tests {
use futures::prelude::*;
use futures::{prelude::*, future::{self, Loop}, stream};
use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::{multiaddr_to_socketaddr, TcpConfig};
use tokio::runtime::current_thread::Runtime;
use std::{net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration};
use super::{multiaddr_to_socketaddr, TcpConfig, Listener};
use tokio::runtime::current_thread::{self, Runtime};
use tokio_io;
#[test]
fn pause_on_error() {
// We create a stream of values and errors and continue polling even after errors
// have been encountered. We count the number of items (including errors) and assert
// that no item has been missed.
let rs = stream::iter_result(vec![Ok(1), Err(1), Ok(1), Err(1)]);
let ls = Listener::new(rs, Duration::from_secs(1));
let sum = future::loop_fn((0, ls), |(acc, ls)| {
ls.into_future().then(move |item| {
match item {
Ok((None, _)) => Ok::<_, std::convert::Infallible>(Loop::Break(acc)),
Ok((Some(n), rest)) => Ok(Loop::Continue((acc + n, rest))),
Err((n, rest)) => Ok(Loop::Continue((acc + n, rest)))
}
})
});
assert_eq!(4, current_thread::block_on_all(sum).unwrap())
}
#[test]
fn wildcard_expansion() {
let mut listener = TcpConfig::new()