diff --git a/core/Cargo.toml b/core/Cargo.toml index 69936846..048c464e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -31,3 +31,5 @@ tokio = "0.1" tokio-codec = "0.1" tokio-current-thread = "0.1" tokio-timer = "0.2" +assert_matches = "1.3" +tokio-mock-task = "0.1" diff --git a/core/src/lib.rs b/core/src/lib.rs index 23023712..469b09b4 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -235,6 +235,11 @@ extern crate tokio_codec; extern crate tokio_current_thread; #[cfg(test)] extern crate tokio_timer; +#[cfg(test)] +#[macro_use] +extern crate assert_matches; +#[cfg(test)] +extern crate tokio_mock_task; /// Multi-address re-export. pub extern crate multiaddr; @@ -245,6 +250,9 @@ mod peer_id; mod public_key; mod unique; +#[cfg(test)] +mod tests; + pub mod either; pub mod muxing; pub mod nodes; diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 9313f856..b3c0cc8a 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -23,6 +23,7 @@ use futures::{future, prelude::*}; use parking_lot::Mutex; use std::io::{Error as IoError, Read, Write}; use std::ops::Deref; +use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -257,6 +258,17 @@ where substream: Option<::Substream>, } +impl

fmt::Debug for SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Substream({:?})", self) + } +} + + impl

Read for SubstreamRef

where P: Deref, diff --git a/core/src/nodes/node.rs b/core/src/nodes/node.rs index 75620390..c5eafbaf 100644 --- a/core/src/nodes/node.rs +++ b/core/src/nodes/node.rs @@ -84,6 +84,7 @@ enum Addr { pub type Substream = muxing::SubstreamRef>; /// Event that can happen on the `NodeStream`. +#[derive(Debug)] pub enum NodeEvent where TMuxer: muxing::StreamMuxer, @@ -344,3 +345,219 @@ where TTrans: Transport, } } }*/ + +#[cfg(test)] +mod node_stream { + use multiaddr::Multiaddr; + use super::NodeStream; + use futures::{future::self, prelude::*, Future}; + use tokio_mock_task::MockTask; + use super::NodeEvent; + use tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; + use std::io::Error as IoError; + + + fn build_node_stream() -> NodeStream, Vec> { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad maddr")); + let muxer = DummyMuxer::new(); + NodeStream::<_, _, Vec>::new(muxer, addr) + } + + #[test] + fn multiaddr_is_available_once_polled() { + let mut node_stream = build_node_stream(); + assert!(node_stream.multiaddr().is_none()); + match node_stream.poll() { + Ok(Async::Ready(Some(NodeEvent::Multiaddr(Ok(addr))))) => { + assert_eq!(addr.to_string(), "/ip4/127.0.0.1/tcp/1234") + } + _ => panic!("unexpected poll return value" ) + } + assert!(node_stream.multiaddr().is_some()); + } + + #[test] + fn can_open_outbound_substreams_until_an_outbound_channel_is_closed() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad maddr")); + let mut muxer = DummyMuxer::new(); + muxer.set_outbound_connection_state(DummyConnectionState::Closed); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + + // open first substream works + assert!(ns.open_substream(vec![1,2,3]).is_ok()); + + // Given the state we set on the DummyMuxer, once we poll() we'll get an + // `OutboundClosed` which will make subsequent calls to `open_substream` fail + let out = ns.poll(); + assert_matches!(out, Ok(Async::Ready(Some(node_event))) => { + assert_matches!(node_event, NodeEvent::OutboundClosed{user_data} => { + assert_eq!(user_data, vec![1,2,3]) + }) + }); + + // Opening a second substream fails because `outbound_finished` is now true + assert_matches!(ns.open_substream(vec![22]), Err(user_data) => { + assert_eq!(user_data, vec![22]); + }); + } + + #[test] + fn query_inbound_outbound_state() { + let ns = build_node_stream(); + assert_eq!(ns.is_inbound_closed(), false); + assert_eq!(ns.is_outbound_closed(), false); + } + + #[test] + fn query_inbound_state() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad maddr")); + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Closed); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + + assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { + assert_matches!(node_event, NodeEvent::InboundClosed) + }); + + assert_eq!(ns.is_inbound_closed(), true); + } + + #[test] + fn query_outbound_state() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr")); + let mut muxer = DummyMuxer::new(); + muxer.set_outbound_connection_state(DummyConnectionState::Closed); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + + assert_eq!(ns.is_outbound_closed(), false); + + ns.open_substream(vec![1]).unwrap(); + let poll_result = ns.poll(); + + assert_matches!(poll_result, Ok(Async::Ready(Some(node_event))) => { + assert_matches!(node_event, NodeEvent::OutboundClosed{user_data} => { + assert_eq!(user_data, vec![1]) + }) + }); + + assert_eq!(ns.is_outbound_closed(), true, "outbound connection should be closed after polling"); + } + + #[test] + fn closing_a_node_stream_destroys_substreams_and_returns_submitted_user_data() { + let mut ns = build_node_stream(); + ns.open_substream(vec![2]).unwrap(); + ns.open_substream(vec![3]).unwrap(); + ns.open_substream(vec![5]).unwrap(); + let user_data_submitted = ns.close(); + assert_eq!(user_data_submitted, vec![ + vec![2], vec![3], vec![5] + ]); + } + + #[test] + fn poll_returns_not_ready_when_there_is_nothing_to_do() { + let mut task = MockTask::new(); + task.enter(|| { + // ensure the address never resolves + let addr = future::empty(); + let mut muxer = DummyMuxer::new(); + // ensure muxer.poll_inbound() returns Async::NotReady + muxer.set_inbound_connection_state(DummyConnectionState::Pending); + // ensure muxer.poll_outbound() returns Async::NotReady + muxer.set_outbound_connection_state(DummyConnectionState::Pending); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + + assert_matches!(ns.poll(), Ok(Async::NotReady)); + }); + } + + #[test] + fn poll_closes_the_node_stream_when_no_more_work_can_be_done() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr")); + let mut muxer = DummyMuxer::new(); + // ensure muxer.poll_inbound() returns Async::Ready(None) + muxer.set_inbound_connection_state(DummyConnectionState::Closed); + // ensure muxer.poll_outbound() returns Async::Ready(None) + muxer.set_outbound_connection_state(DummyConnectionState::Closed); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + ns.open_substream(vec![]).unwrap(); + ns.poll().unwrap(); // poll_inbound() + ns.poll().unwrap(); // poll_outbound() + ns.poll().unwrap(); // resolve the address + // Nothing more to do, the NodeStream should be closed + assert_matches!(ns.poll(), Ok(Async::Ready(None))); + } + + #[test] + fn poll_resolves_the_address() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr")); + let mut muxer = DummyMuxer::new(); + // ensure muxer.poll_inbound() returns Async::Ready(None) + muxer.set_inbound_connection_state(DummyConnectionState::Closed); + // ensure muxer.poll_outbound() returns Async::Ready(None) + muxer.set_outbound_connection_state(DummyConnectionState::Closed); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + ns.open_substream(vec![]).unwrap(); + ns.poll().unwrap(); // poll_inbound() + ns.poll().unwrap(); // poll_outbound() + assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { + assert_matches!(node_event, NodeEvent::Multiaddr(Ok(_))) + }); + } + + #[test] + fn poll_sets_up_substreams_yielding_them_in_reverse_order() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr")); + let mut muxer = DummyMuxer::new(); + // ensure muxer.poll_inbound() returns Async::Ready(None) + muxer.set_inbound_connection_state(DummyConnectionState::Closed); + // ensure muxer.poll_outbound() returns Async::Ready(Some(substream)) + muxer.set_outbound_connection_state(DummyConnectionState::Opened); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + ns.open_substream(vec![1]).unwrap(); + ns.open_substream(vec![2]).unwrap(); + ns.poll().unwrap(); // poll_inbound() + + // poll() sets up second outbound substream + assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { + assert_matches!(node_event, NodeEvent::OutboundSubstream{ user_data, substream:_ } => { + assert_eq!(user_data, vec![2]); + }) + }); + // Next poll() sets up first outbound substream + assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { + assert_matches!(node_event, NodeEvent::OutboundSubstream{ user_data, substream: _ } => { + assert_eq!(user_data, vec![1]); + }) + }); + } + + #[test] + fn poll_keeps_outbound_substreams_when_the_outgoing_connection_is_not_ready() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr")); + let mut muxer = DummyMuxer::new(); + // ensure muxer.poll_inbound() returns Async::Ready(None) + muxer.set_inbound_connection_state(DummyConnectionState::Closed); + // ensure muxer.poll_outbound() returns Async::NotReady + muxer.set_outbound_connection_state(DummyConnectionState::Pending); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + ns.open_substream(vec![1]).unwrap(); + ns.poll().unwrap(); // poll past inbound + ns.poll().unwrap(); // poll outbound + assert_eq!(ns.is_outbound_closed(), false); + assert!(format!("{:?}", ns).contains("outbound_substreams: 1")); + } + + #[test] + fn poll_returns_incoming_substream() { + let addr = future::ok("/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr")); + let mut muxer = DummyMuxer::new(); + // ensure muxer.poll_inbound() returns Async::Ready(Some(subs)) + muxer.set_inbound_connection_state(DummyConnectionState::Opened); + let mut ns = NodeStream::<_, _, Vec>::new(muxer, addr); + assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { + assert_matches!(node_event, NodeEvent::InboundSubstream{ substream: _ }); + }); + } +} diff --git a/core/src/tests/dummy_muxer.rs b/core/src/tests/dummy_muxer.rs new file mode 100644 index 00000000..bfd2852f --- /dev/null +++ b/core/src/tests/dummy_muxer.rs @@ -0,0 +1,102 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! `DummyMuxer` is a `StreamMuxer` to be used in tests. It implements a bare-bones +//! version of the trait along with a way to setup the muxer to behave in the +//! desired way when testing other components. + +extern crate futures; + +use std::io::Error as IoError; +use muxing::StreamMuxer; +use futures::prelude::*; + +/// Substream type +pub struct DummySubstream {} + +/// OutboundSubstream type +pub struct DummyOutboundSubstream {} + +/// Control the muxer state by setting the "connection" state as to set up a mock +/// muxer for higher level components. +#[derive(Debug, PartialEq)] +pub enum DummyConnectionState { + Pending, // use this to trigger the Async::NotReady code path + Closed, // use this to trigger the Async::Ready(None) code path + Opened, // use this to trigger the Async::Ready(Some(_)) code path +} +#[derive(Debug)] +struct DummyConnection { + state: DummyConnectionState +} + +/// `DummyMuxer` implements `StreamMuxer` and methods to control its behavior when used in tests +#[derive(Debug)] +pub struct DummyMuxer{ + in_connection: DummyConnection, + out_connection: DummyConnection, +} + +impl DummyMuxer { + /// Create a new `DummyMuxer` where the inbound substream is set to `Pending` + /// and the (single) outbound substream to `Closed`. + pub fn new() -> Self { + DummyMuxer { + in_connection: DummyConnection{ state: DummyConnectionState::Pending }, + out_connection: DummyConnection{ state: DummyConnectionState::Closed }, + } + } + /// Set the muxer state inbound "connection" state + pub fn set_inbound_connection_state(&mut self, state: DummyConnectionState) { + self.in_connection.state = state + } + /// Set the muxer state outbound "connection" state + pub fn set_outbound_connection_state(&mut self, state: DummyConnectionState) { + self.out_connection.state = state + } +} + +impl StreamMuxer for DummyMuxer { + type Substream = DummySubstream; + type OutboundSubstream = DummyOutboundSubstream; + fn poll_inbound(&self) -> Poll, IoError> { + match self.in_connection.state { + DummyConnectionState::Pending => Ok(Async::NotReady), + DummyConnectionState::Closed => Ok(Async::Ready(None)), + DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream{}))), + } + } + fn open_outbound(&self) -> Self::OutboundSubstream { Self::OutboundSubstream{} } + fn poll_outbound(&self, _substream: &mut Self::OutboundSubstream) -> Poll, IoError> { + match self.out_connection.state { + DummyConnectionState::Pending => Ok(Async::NotReady), + DummyConnectionState::Closed => Ok(Async::Ready(None)), + DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream{}))), + } + } + fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {} + fn read_substream(&self, _substream: &mut Self::Substream, _buf: &mut [u8]) -> Result { unreachable!() } + fn write_substream(&self, _substream: &mut Self::Substream, _buf: &[u8]) -> Result { unreachable!() } + fn flush_substream(&self, _substream: &mut Self::Substream) -> Result<(), IoError> { unreachable!() } + fn shutdown_substream(&self, _substream: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() } + fn destroy_substream(&self, _substream: Self::Substream) {} + fn close_inbound(&self) {} + fn close_outbound(&self) {} +} diff --git a/core/src/tests/mod.rs b/core/src/tests/mod.rs new file mode 100644 index 00000000..f678169d --- /dev/null +++ b/core/src/tests/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#[cfg(test)] +pub(crate) mod dummy_muxer;