diff --git a/README.md b/README.md index f065c1cb..edc4282a 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Right now everything including the crate organization is very much Work in Progr ## Documentation -This repository includes a facade crate named `libp2p`, which reexports the rest of the repository. +This repository includes a façade crate named `libp2p`, which reexports the rest of the repository. For documentation, you are encouraged to clone this repository or add `libp2p` as a dependency in your Cargo.toml and run `cargo doc`. diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index 3b51c55f..7035698a 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -45,21 +45,21 @@ pub trait NodeHandler { /// The handler is responsible for upgrading the substream to whatever protocol it wants. fn inject_substream(&mut self, substream: TSubstream, endpoint: NodeHandlerEndpoint); - /// Indicates the handler that the inbound part of the muxer has been closed, and that + /// Indicates to the handler that the inbound part of the muxer has been closed, and that /// therefore no more inbound substream will be produced. fn inject_inbound_closed(&mut self); - /// Indicates the handler that an outbound substream failed to open because the outbound + /// Indicates to the handler that an outbound substream failed to open because the outbound /// part of the muxer has been closed. fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo); - /// Indicates the handler that the multiaddr future has resolved. + /// Indicates to the handler that the multiaddr future has resolved. fn inject_multiaddr(&mut self, multiaddr: Result); - /// Injects an event coming from the outside in the handler. + /// Injects an event coming from the outside into the handler. fn inject_event(&mut self, event: Self::InEvent); - /// Indicates the node that it should shut down. After that, it is expected that `poll()` + /// Indicates that the node that it should shut down. After that, it is expected that `poll()` /// returns `Ready(None)` as soon as possible. /// /// This method allows an implementation to perform a graceful shutdown of the substreams, and @@ -78,7 +78,7 @@ pub enum NodeHandlerEndpoint { Listener, } -/// Event produces by a handler. +/// Event produced by a handler. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum NodeHandlerEvent { /// Require a new outbound substream to be opened with the remote. @@ -88,7 +88,7 @@ pub enum NodeHandlerEvent { Custom(TCustom), } -/// Event produces by a handler. +/// Event produced by a handler. impl NodeHandlerEvent { /// If this is `OutboundSubstreamRequest`, maps the content to something else. #[inline] @@ -173,7 +173,7 @@ where self.node.is_none() } - /// Indicates the handled node that it should shut down. After calling this method, the + /// Indicates to the handled node that it should shut down. After calling this method, the /// `Stream` will end in the not-so-distant future. /// /// After this method returns, `is_shutting_down()` should return true. diff --git a/core/src/nodes/listeners.rs b/core/src/nodes/listeners.rs index 8e1011d1..51aebd5e 100644 --- a/core/src/nodes/listeners.rs +++ b/core/src/nodes/listeners.rs @@ -37,6 +37,7 @@ where } /// A single active listener. +#[derive(Debug)] struct Listener where TTrans: Transport, @@ -161,7 +162,7 @@ where } } - // We register the current task to be waken up if a new listener is added. + // We register the current task to be woken up if a new listener is added. Async::NotReady } } @@ -220,9 +221,42 @@ where #[cfg(test)] mod tests { extern crate libp2p_tcp_transport; + use super::*; use transport; use tokio::runtime::current_thread::Runtime; + use std::io; + use futures::{future::{self}, stream}; + use tests::dummy_transport::{DummyTransport, ListenerState}; + + fn set_listener_state(ls: &mut ListenersStream, idx: usize, state: ListenerState) { + let l = &mut ls.listeners[idx]; + l.listener = + match state { + ListenerState::Error => { + let stream = stream::poll_fn(|| future::err(io::Error::new(io::ErrorKind::Other, "oh noes")).poll() ); + Box::new(stream) + } + ListenerState::Ok(async) => { + match async { + Async::NotReady => { + let stream = stream::poll_fn(|| Ok(Async::NotReady)); + Box::new(stream) + } + Async::Ready(Some(n)) => { + let addr = l.address.clone(); + let stream = stream::iter_ok(n..) + .map(move |stream| future::ok( (stream, future::ok(addr.clone())) )); + Box::new(stream) + } + Async::Ready(None) => { + let stream = stream::empty(); + Box::new(stream) + } + } + } + }; + } #[test] fn incoming_event() { @@ -251,4 +285,161 @@ mod tests { let mut runtime = Runtime::new().unwrap(); runtime.block_on(future).unwrap(); } + + #[test] + fn listener_stream_returns_transport() { + let t = DummyTransport::new(); + let ls = ListenersStream::new(t); + assert_eq!(ls.transport(), &t); + } + + #[test] + fn listener_stream_can_iterate_over_listeners() { + let t = DummyTransport::new(); + let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let addr2 = "/ip4/127.0.0.1/tcp/4321".parse::().expect("bad multiaddr"); + let expected_addrs = vec![addr1.to_string(), addr2.to_string()]; + + let mut ls = ListenersStream::new(t); + ls.listen_on(addr1).expect("listen_on failed"); + ls.listen_on(addr2).expect("listen_on failed"); + + let listener_addrs = ls.listeners().map(|ma| ma.to_string() ).collect::>(); + assert_eq!(listener_addrs, expected_addrs); + } + + #[test] + fn listener_stream_poll_without_listeners_is_not_ready() { + let t = DummyTransport::new(); + let mut ls = ListenersStream::new(t); + assert_matches!(ls.poll(), Async::NotReady); + } + + #[test] + fn listener_stream_poll_with_listeners_that_arent_ready_is_not_ready() { + let t = DummyTransport::new(); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let mut ls = ListenersStream::new(t); + ls.listen_on(addr).expect("listen_on failed"); + set_listener_state(&mut ls, 0, ListenerState::Ok(Async::NotReady)); + assert_matches!(ls.poll(), Async::NotReady); + assert_eq!(ls.listeners.len(), 1); // listener is still there + } + + #[test] + fn listener_stream_poll_with_ready_listeners_is_ready() { + let mut t = DummyTransport::new(); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); + let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let addr2 = "/ip4/127.0.0.2/tcp/4321".parse::().expect("bad multiaddr"); + let mut ls = ListenersStream::new(t); + ls.listen_on(addr1).expect("listen_on failed"); + ls.listen_on(addr2).expect("listen_on failed"); + + assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => { + assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr} => { + assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/4321"); + assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => { + assert_matches!(tup, (1, _)) + }); + }) + }); + // TODO: When several listeners are continuously Async::Ready – + // admittetdly a corner case – the last one is processed first and then + // put back *last* on the pile. This means that at the next poll() it + // will get polled again and if it always has data to yield, it will + // effectively block all other listeners from being "heard". One way + // around this is to switch to using a `VecDeque` to keep the listeners + // collection, and instead of pushing the processed item to the end of + // the list, stick it on top so that it'll be processed *last* instead + // during the next poll. This might also get us a performance win as + // even in the normal case, the most recently polled listener is more + // unlikely to have anything to yield than the others so we might avoid + // a few unneeded poll calls. + + // Make the second listener return NotReady so we get the first listener next poll() + set_listener_state(&mut ls, 1, ListenerState::Ok(Async::NotReady)); + assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => { + assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr} => { + assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234"); + assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => { + assert_matches!(tup, (1, _)) + }); + }) + }); + assert_eq!(ls.listeners.len(), 2); + } + + #[test] + fn listener_stream_poll_with_closed_listener_emits_closed_event() { + let t = DummyTransport::new(); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let mut ls = ListenersStream::new(t); + ls.listen_on(addr).expect("listen_on failed"); + set_listener_state(&mut ls, 0, ListenerState::Ok(Async::Ready(None))); + assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => { + assert_matches!(listeners_event, ListenersEvent::Closed{..}) + }); + assert_eq!(ls.listeners.len(), 0); // it's gone + } + + #[test] + fn listener_stream_poll_with_erroring_listener_emits_closed_event() { + let mut t = DummyTransport::new(); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let mut ls = ListenersStream::new(t); + ls.listen_on(addr).expect("listen_on failed"); + set_listener_state(&mut ls, 0, ListenerState::Error); // simulate an error on the socket + assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => { + assert_matches!(listeners_event, ListenersEvent::Closed{..}) + }); + assert_eq!(ls.listeners.len(), 0); // it's gone + } + + #[test] + fn listener_stream_poll_chatty_listeners_may_drown_others() { + let mut t = DummyTransport::new(); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); + let mut ls = ListenersStream::new(t); + for n in 0..4 { + let addr = format!("/ip4/127.0.0.{}/tcp/123{}", n, n).parse::().expect("bad multiaddr"); + ls.listen_on(addr).expect("listen_on failed"); + } + + // polling processes listeners in reverse order + // Only the last listener ever gets processed + for _n in 0..10 { + assert_matches!(ls.poll(), Async::Ready(Some(ListenersEvent::Incoming{listen_addr, ..})) => { + assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.3/tcp/1233") + }) + } + // Make last listener NotReady so now only the third listener is processed + set_listener_state(&mut ls, 3, ListenerState::Ok(Async::NotReady)); + for _n in 0..10 { + assert_matches!(ls.poll(), Async::Ready(Some(ListenersEvent::Incoming{listen_addr, ..})) => { + assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/1232") + }) + } + } + + #[test] + fn listener_stream_poll_processes_listeners_as_expected_if_they_are_not_yielding_continuously() { + let mut t = DummyTransport::new(); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); + let mut ls = ListenersStream::new(t); + for n in 0..4 { + let addr = format!("/ip4/127.0.0.{}/tcp/123{}", n, n).parse::().expect("bad multiaddr"); + ls.listen_on(addr).expect("listen_on failed"); + } + // If the listeners do not yield items continuously (the normal case) we + // process them in the expected, reverse, order. + for n in (0..4).rev() { + assert_matches!(ls.poll(), Async::Ready(Some(ListenersEvent::Incoming{listen_addr, ..})) => { + assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/123{}", n, n)); + }); + // kick the last listener (current) to NotReady state + set_listener_state(&mut ls, 3, ListenerState::Ok(Async::NotReady)); + } + } } diff --git a/core/src/tests/dummy_transport.rs b/core/src/tests/dummy_transport.rs new file mode 100644 index 00000000..6241a4c1 --- /dev/null +++ b/core/src/tests/dummy_transport.rs @@ -0,0 +1,92 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! `DummyTransport` is a `Transport` used in tests. It implements a bare-bones +//! version of the trait along with a way to setup the transport listeners with +//! an initial state to facilitate testing. + +use futures::prelude::*; +use futures::{future::{self, FutureResult}, stream}; +use {Multiaddr, Transport}; +use std::io; + + +#[derive(Debug, PartialEq, Clone, Copy)] +pub(crate) enum ListenerState { + /// The `usize` indexes items produced by the listener + Ok(Async>), + Error +} + +#[derive(Debug, PartialEq, Clone, Copy)] +pub(crate) struct DummyTransport { + listener_state: ListenerState, +} +impl DummyTransport { + pub(crate) fn new() -> Self { DummyTransport{ listener_state: ListenerState::Ok(Async::NotReady) }} + pub(crate) fn set_initial_listener_state(&mut self, state: ListenerState) { + self.listener_state = state; + } +} +impl Transport for DummyTransport { + type Output = usize; + type Listener = Box + Send>; + type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), io::Error>; + type MultiaddrFuture = FutureResult; + type Dial = Box + Send>; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> + where + Self: Sized + { + let addr2 = addr.clone(); + match self.listener_state { + ListenerState::Ok(async) => { + let tupelize = move |stream| future::ok( (stream, future::ok(addr.clone())) ); + Ok(match async { + Async::NotReady => { + let stream = stream::poll_fn(|| Ok(Async::NotReady)).map(tupelize); + (Box::new(stream), addr2) + }, + Async::Ready(Some(n)) => { + let stream = stream::iter_ok(n..).map(tupelize); + (Box::new(stream), addr2) + }, + Async::Ready(None) => { + let stream = stream::empty(); + (Box::new(stream), addr2) + }, + }) + } + ListenerState::Error => Err( (self, addr2) ) + } + } + + fn dial(self, _addr: Multiaddr) -> Result + where + Self: Sized + { + unimplemented!(); + } + + fn nat_traversal(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + unimplemented!(); + } +} diff --git a/core/src/tests/mod.rs b/core/src/tests/mod.rs index f678169d..97f3b6fe 100644 --- a/core/src/tests/mod.rs +++ b/core/src/tests/mod.rs @@ -20,3 +20,5 @@ #[cfg(test)] pub(crate) mod dummy_muxer; +#[cfg(test)] +pub(crate) mod dummy_transport;