diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 8e9d6d44..e5009d69 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -32,6 +32,8 @@ use fnv::FnvHashMap; use futures::prelude::*; use std::{collections::hash_map::Entry, error, fmt, mem}; +mod tests; + // TODO: make generic over PeerId /// Implementation of `Stream` that handles a collection of nodes. @@ -513,361 +515,3 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> { self.inner.close(); } } - - -#[cfg(test)] -mod tests { - use super::*; - use assert_matches::assert_matches; - use futures::future; - use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; - use crate::tests::dummy_handler::{Handler, InEvent, OutEvent, HandlerState}; - use tokio::runtime::current_thread::Runtime; - use tokio::runtime::Builder; - use crate::nodes::NodeHandlerEvent; - use std::{io, sync::Arc}; - use parking_lot::Mutex; - - type TestCollectionStream = CollectionStream; - - #[test] - fn has_connection_is_false_before_a_connection_has_been_made() { - let cs = TestCollectionStream::new(); - let peer_id = PeerId::random(); - assert!(!cs.has_connection(&peer_id)); - } - - #[test] - fn connections_is_empty_before_connecting() { - let cs = TestCollectionStream::new(); - assert!(cs.connections().next().is_none()); - } - - #[test] - fn retrieving_a_peer_is_none_if_peer_is_missing_or_not_connected() { - let mut cs = TestCollectionStream::new(); - let peer_id = PeerId::random(); - assert!(cs.peer_mut(&peer_id).is_none()); - - let handler = Handler::default(); - let fut = future::ok((peer_id.clone(), DummyMuxer::new())); - cs.add_reach_attempt(fut, handler); - assert!(cs.peer_mut(&peer_id).is_none()); // task is pending - } - - #[test] - fn collection_stream_reaches_the_nodes() { - let mut cs = TestCollectionStream::new(); - let peer_id = PeerId::random(); - - let mut muxer = DummyMuxer::new(); - muxer.set_inbound_connection_state(DummyConnectionState::Pending); - muxer.set_outbound_connection_state(DummyConnectionState::Opened); - - let fut = future::ok((peer_id, muxer)); - cs.add_reach_attempt(fut, Handler::default()); - let mut rt = Runtime::new().unwrap(); - let mut poll_count = 0; - let fut = future::poll_fn(move || -> Poll<(), ()> { - poll_count += 1; - let event = cs.poll(); - match poll_count { - 1 => assert_matches!(event, Async::NotReady), - 2 => { - assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(_))); - return Ok(Async::Ready(())); // stop - } - _ => unreachable!() - } - Ok(Async::NotReady) - }); - rt.block_on(fut).unwrap(); - } - - #[test] - fn accepting_a_node_yields_new_entry() { - let mut cs = TestCollectionStream::new(); - let peer_id = PeerId::random(); - let fut = future::ok((peer_id.clone(), DummyMuxer::new())); - cs.add_reach_attempt(fut, Handler::default()); - - let mut rt = Runtime::new().unwrap(); - let mut poll_count = 0; - let fut = future::poll_fn(move || -> Poll<(), ()> { - poll_count += 1; - { - let event = cs.poll(); - match poll_count { - 1 => { - assert_matches!(event, Async::NotReady); - return Ok(Async::NotReady) - } - 2 => { - assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - assert_matches!(reach_ev.parent, CollectionStream{..}); - let (accept_ev, accepted_peer_id) = reach_ev.accept(); - assert_eq!(accepted_peer_id, peer_id); - assert_matches!(accept_ev, CollectionNodeAccept::NewEntry); - }); - } - _ => unreachable!() - } - } - assert!(cs.peer_mut(&peer_id).is_some(), "peer is not in the list"); - assert!(cs.has_connection(&peer_id), "peer is not connected"); - assert_eq!(cs.connections().collect::>(), vec![&peer_id]); - Ok(Async::Ready(())) - }); - rt.block_on(fut).expect("running the future works"); - } - - #[test] - fn events_in_a_node_reaches_the_collection_stream() { - let cs = Arc::new(Mutex::new(TestCollectionStream::new())); - let task_peer_id = PeerId::random(); - - let mut handler = Handler::default(); - handler.state = Some(HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("init"))))); - let handler_states = vec![ - HandlerState::Err, - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 3") ))), - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 2") ))), - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))), - ]; - handler.next_states = handler_states; - - let mut muxer = DummyMuxer::new(); - muxer.set_inbound_connection_state(DummyConnectionState::Pending); - muxer.set_outbound_connection_state(DummyConnectionState::Opened); - - let fut = future::ok((task_peer_id.clone(), muxer)); - cs.lock().add_reach_attempt(fut, handler); - - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::NotReady); - Ok(Async::Ready(())) - })).expect("tokio works"); - - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { - assert_matches!(event, OutEvent::Custom("init")); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { - assert_matches!(event, OutEvent::Custom("from handler 1")); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { - assert_matches!(event, OutEvent::Custom("from handler 2")); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - } - - #[test] - fn task_closed_with_error_while_task_is_pending_yields_reach_error() { - let cs = Arc::new(Mutex::new(TestCollectionStream::new())); - let task_inner_fut = future::err(std::io::Error::new(std::io::ErrorKind::Other, "inner fut error")); - let reach_attempt_id = cs.lock().add_reach_attempt(task_inner_fut, Handler::default()); - - let mut rt = Builder::new().core_threads(1).build().unwrap(); - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::NotReady); - Ok(Async::Ready(())) - })).expect("tokio works"); - - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::Ready(collection_ev) => { - assert_matches!(collection_ev, CollectionEvent::ReachError {id, error, ..} => { - assert_eq!(id, reach_attempt_id); - assert_eq!(error.to_string(), "inner fut error"); - }); - - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - } - - #[test] - fn task_closed_with_error_when_task_is_connected_yields_node_error() { - let cs = Arc::new(Mutex::new(TestCollectionStream::new())); - let peer_id = PeerId::random(); - let muxer = DummyMuxer::new(); - let task_inner_fut = future::ok((peer_id.clone(), muxer)); - let mut handler = Handler::default(); - handler.next_states = vec![HandlerState::Err]; // triggered when sending a NextState event - - cs.lock().add_reach_attempt(task_inner_fut, handler); - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - // Kick it off - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::NotReady); - // send an event so the Handler errors in two polls - cs.broadcast_event(&InEvent::NextState); - Ok(Async::Ready(())) - })).expect("tokio works"); - - // Accept the new node - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - // NodeReached, accept the connection so the task transitions from Pending to Connected - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - assert!(cs.lock().has_connection(&peer_id)); - - // Assert the node errored - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::Ready(collection_ev) => { - assert_matches!(collection_ev, CollectionEvent::NodeError{..}); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - } - - #[test] - fn task_closed_ok_when_task_is_connected_yields_node_closed() { - let cs = Arc::new(Mutex::new(TestCollectionStream::new())); - let peer_id = PeerId::random(); - let muxer = DummyMuxer::new(); - let task_inner_fut = future::ok((peer_id.clone(), muxer)); - let mut handler = Handler::default(); - handler.next_states = vec![HandlerState::Ready(None)]; // triggered when sending a NextState event - - cs.lock().add_reach_attempt(task_inner_fut, handler); - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - // Kick it off - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::NotReady); - // send an event so the Handler errors in two polls - cs.broadcast_event(&InEvent::NextState); - Ok(Async::Ready(())) - })).expect("tokio works"); - - // Accept the new node - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - // NodeReached, accept the connection so the task transitions from Pending to Connected - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - assert!(cs.lock().has_connection(&peer_id)); - - // Next poll, the Handler returns Async::Ready(None) because of the - // NextState message sent before. - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - // Node is closed normally: TaskClosed, Ok(()) - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event }) => { - assert_eq!(peer_id_in_event, peer_id); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - } - - #[test] - fn interrupting_a_pending_connection_attempt_is_ok() { - let mut cs = TestCollectionStream::new(); - let fut = future::empty(); - let reach_id = cs.add_reach_attempt(fut, Handler::default()); - let interrupt = cs.interrupt(reach_id); - assert!(interrupt.is_ok()); - } - - #[test] - fn interrupting_a_connection_attempt_twice_is_err() { - let mut cs = TestCollectionStream::new(); - let fut = future::empty(); - let reach_id = cs.add_reach_attempt(fut, Handler::default()); - assert!(cs.interrupt(reach_id).is_ok()); - assert_matches!(cs.interrupt(reach_id), Err(InterruptError::ReachAttemptNotFound)) - } - - #[test] - fn interrupting_an_established_connection_is_err() { - let cs = Arc::new(Mutex::new(TestCollectionStream::new())); - let peer_id = PeerId::random(); - let muxer = DummyMuxer::new(); - let task_inner_fut = future::ok((peer_id.clone(), muxer)); - let handler = Handler::default(); - - let reach_id = cs.lock().add_reach_attempt(task_inner_fut, handler); - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - // Kick it off - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::NotReady); - // send an event so the Handler errors in two polls - Ok(Async::Ready(())) - })).expect("tokio works"); - - // Accept the new node - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - // NodeReached, accept the connection so the task transitions from Pending to Connected - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - assert!(cs.lock().has_connection(&peer_id), "Connection was not established"); - - assert_matches!(cs.lock().interrupt(reach_id), Err(InterruptError::AlreadyReached)); - } -} diff --git a/core/src/nodes/collection/tests.rs b/core/src/nodes/collection/tests.rs new file mode 100644 index 00000000..e29b378f --- /dev/null +++ b/core/src/nodes/collection/tests.rs @@ -0,0 +1,375 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg(test)] + +use super::*; +use assert_matches::assert_matches; +use futures::future; +use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; +use crate::tests::dummy_handler::{Handler, InEvent, OutEvent, HandlerState}; +use tokio::runtime::current_thread::Runtime; +use tokio::runtime::Builder; +use crate::nodes::NodeHandlerEvent; +use std::{io, sync::Arc}; +use parking_lot::Mutex; + +type TestCollectionStream = CollectionStream; + +#[test] +fn has_connection_is_false_before_a_connection_has_been_made() { + let cs = TestCollectionStream::new(); + let peer_id = PeerId::random(); + assert!(!cs.has_connection(&peer_id)); +} + +#[test] +fn connections_is_empty_before_connecting() { + let cs = TestCollectionStream::new(); + assert!(cs.connections().next().is_none()); +} + +#[test] +fn retrieving_a_peer_is_none_if_peer_is_missing_or_not_connected() { + let mut cs = TestCollectionStream::new(); + let peer_id = PeerId::random(); + assert!(cs.peer_mut(&peer_id).is_none()); + + let handler = Handler::default(); + let fut = future::ok((peer_id.clone(), DummyMuxer::new())); + cs.add_reach_attempt(fut, handler); + assert!(cs.peer_mut(&peer_id).is_none()); // task is pending +} + +#[test] +fn collection_stream_reaches_the_nodes() { + let mut cs = TestCollectionStream::new(); + let peer_id = PeerId::random(); + + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Pending); + muxer.set_outbound_connection_state(DummyConnectionState::Opened); + + let fut = future::ok((peer_id, muxer)); + cs.add_reach_attempt(fut, Handler::default()); + let mut rt = Runtime::new().unwrap(); + let mut poll_count = 0; + let fut = future::poll_fn(move || -> Poll<(), ()> { + poll_count += 1; + let event = cs.poll(); + match poll_count { + 1 => assert_matches!(event, Async::NotReady), + 2 => { + assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(_))); + return Ok(Async::Ready(())); // stop + } + _ => unreachable!() + } + Ok(Async::NotReady) + }); + rt.block_on(fut).unwrap(); +} + +#[test] +fn accepting_a_node_yields_new_entry() { + let mut cs = TestCollectionStream::new(); + let peer_id = PeerId::random(); + let fut = future::ok((peer_id.clone(), DummyMuxer::new())); + cs.add_reach_attempt(fut, Handler::default()); + + let mut rt = Runtime::new().unwrap(); + let mut poll_count = 0; + let fut = future::poll_fn(move || -> Poll<(), ()> { + poll_count += 1; + { + let event = cs.poll(); + match poll_count { + 1 => { + assert_matches!(event, Async::NotReady); + return Ok(Async::NotReady) + } + 2 => { + assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + assert_matches!(reach_ev.parent, CollectionStream{..}); + let (accept_ev, accepted_peer_id) = reach_ev.accept(); + assert_eq!(accepted_peer_id, peer_id); + assert_matches!(accept_ev, CollectionNodeAccept::NewEntry); + }); + } + _ => unreachable!() + } + } + assert!(cs.peer_mut(&peer_id).is_some(), "peer is not in the list"); + assert!(cs.has_connection(&peer_id), "peer is not connected"); + assert_eq!(cs.connections().collect::>(), vec![&peer_id]); + Ok(Async::Ready(())) + }); + rt.block_on(fut).expect("running the future works"); +} + +#[test] +fn events_in_a_node_reaches_the_collection_stream() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let task_peer_id = PeerId::random(); + + let mut handler = Handler::default(); + handler.state = Some(HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("init"))))); + let handler_states = vec![ + HandlerState::Err, + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 3") ))), + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 2") ))), + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))), + ]; + handler.next_states = handler_states; + + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Pending); + muxer.set_outbound_connection_state(DummyConnectionState::Opened); + + let fut = future::ok((task_peer_id.clone(), muxer)); + cs.lock().add_reach_attempt(fut, handler); + + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(event, OutEvent::Custom("init")); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(event, OutEvent::Custom("from handler 1")); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(event, OutEvent::Custom("from handler 2")); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); +} + +#[test] +fn task_closed_with_error_while_task_is_pending_yields_reach_error() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let task_inner_fut = future::err(std::io::Error::new(std::io::ErrorKind::Other, "inner fut error")); + let reach_attempt_id = cs.lock().add_reach_attempt(task_inner_fut, Handler::default()); + + let mut rt = Builder::new().core_threads(1).build().unwrap(); + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::Ready(collection_ev) => { + assert_matches!(collection_ev, CollectionEvent::ReachError {id, error, ..} => { + assert_eq!(id, reach_attempt_id); + assert_eq!(error.to_string(), "inner fut error"); + }); + + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + +} + +#[test] +fn task_closed_with_error_when_task_is_connected_yields_node_error() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + let task_inner_fut = future::ok((peer_id.clone(), muxer)); + let mut handler = Handler::default(); + handler.next_states = vec![HandlerState::Err]; // triggered when sending a NextState event + + cs.lock().add_reach_attempt(task_inner_fut, handler); + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Kick it off + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + // send an event so the Handler errors in two polls + cs.broadcast_event(&InEvent::NextState); + Ok(Async::Ready(())) + })).expect("tokio works"); + + // Accept the new node + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // NodeReached, accept the connection so the task transitions from Pending to Connected + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + assert!(cs.lock().has_connection(&peer_id)); + + // Assert the node errored + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::Ready(collection_ev) => { + assert_matches!(collection_ev, CollectionEvent::NodeError{..}); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); +} + +#[test] +fn task_closed_ok_when_task_is_connected_yields_node_closed() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + let task_inner_fut = future::ok((peer_id.clone(), muxer)); + let mut handler = Handler::default(); + handler.next_states = vec![HandlerState::Ready(None)]; // triggered when sending a NextState event + + cs.lock().add_reach_attempt(task_inner_fut, handler); + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Kick it off + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + // send an event so the Handler errors in two polls + cs.broadcast_event(&InEvent::NextState); + Ok(Async::Ready(())) + })).expect("tokio works"); + + // Accept the new node + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // NodeReached, accept the connection so the task transitions from Pending to Connected + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + assert!(cs.lock().has_connection(&peer_id)); + + // Next poll, the Handler returns Async::Ready(None) because of the + // NextState message sent before. + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // Node is closed normally: TaskClosed, Ok(()) + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event }) => { + assert_eq!(peer_id_in_event, peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); +} + +#[test] +fn interrupting_a_pending_connection_attempt_is_ok() { + let mut cs = TestCollectionStream::new(); + let fut = future::empty(); + let reach_id = cs.add_reach_attempt(fut, Handler::default()); + let interrupt = cs.interrupt(reach_id); + assert!(interrupt.is_ok()); +} + +#[test] +fn interrupting_a_connection_attempt_twice_is_err() { + let mut cs = TestCollectionStream::new(); + let fut = future::empty(); + let reach_id = cs.add_reach_attempt(fut, Handler::default()); + assert!(cs.interrupt(reach_id).is_ok()); + assert_matches!(cs.interrupt(reach_id), Err(InterruptError::ReachAttemptNotFound)) +} + +#[test] +fn interrupting_an_established_connection_is_err() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + let task_inner_fut = future::ok((peer_id.clone(), muxer)); + let handler = Handler::default(); + + let reach_id = cs.lock().add_reach_attempt(task_inner_fut, handler); + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Kick it off + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + // send an event so the Handler errors in two polls + Ok(Async::Ready(())) + })).expect("tokio works"); + + // Accept the new node + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // NodeReached, accept the connection so the task transitions from Pending to Connected + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + assert!(cs.lock().has_connection(&peer_id), "Connection was not established"); + + assert_matches!(cs.lock().interrupt(reach_id), Err(InterruptError::AlreadyReached)); +} diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index 97c923b3..c753a541 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -23,6 +23,8 @@ use crate::nodes::node::{NodeEvent, NodeStream, Substream}; use futures::{prelude::*, stream::Fuse}; use std::{error, fmt, io}; +mod tests; + /// Handler for the substreams of a node. // TODO: right now it is possible for a node handler to be built, then shut down right after if we // realize we dialed the wrong peer for example; this could be surprising and should either @@ -354,353 +356,3 @@ where THandlerErr: error::Error + 'static } } } - -#[cfg(test)] -mod tests { - use super::*; - use assert_matches::assert_matches; - use tokio::runtime::current_thread; - use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; - use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode}; - use std::{io, marker::PhantomData}; - - struct TestBuilder { - muxer: DummyMuxer, - handler: Handler, - want_open_substream: bool, - substream_user_data: usize, - } - - impl TestBuilder { - fn new() -> Self { - TestBuilder { - muxer: DummyMuxer::new(), - handler: Handler::default(), - want_open_substream: false, - substream_user_data: 0, - } - } - - fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self { - self.muxer.set_inbound_connection_state(state); - self - } - - fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self { - self.muxer.set_outbound_connection_state(state); - self - } - - fn with_handler_state(&mut self, state: HandlerState) -> &mut Self { - self.handler.state = Some(state); - self - } - - fn with_open_substream(&mut self, user_data: usize) -> &mut Self { - self.want_open_substream = true; - self.substream_user_data = user_data; - self - } - - fn handled_node(&mut self) -> TestHandledNode { - let mut h = HandledNode::new(self.muxer.clone(), self.handler.clone()); - if self.want_open_substream { - h.node.get_mut().open_substream(self.substream_user_data).expect("open substream should work"); - } - h - } - } - - // Set the state of the `Handler` after `inject_outbound_closed` is called - fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) { - handled_node.handler.next_outbound_state = Some(next_state); - } - - #[test] - fn proper_shutdown() { - struct ShutdownHandler { - did_substream_attempt: bool, - inbound_closed: bool, - substream_attempt_cancelled: bool, - shutdown_called: bool, - marker: PhantomData - } - impl NodeHandler for ShutdownHandler { - type InEvent = (); - type OutEvent = (); - type Substream = T; - type Error = io::Error; - type OutboundOpenInfo = (); - fn inject_substream(&mut self, _: Self::Substream, _: NodeHandlerEndpoint) { panic!() } - fn inject_inbound_closed(&mut self) { - assert!(!self.inbound_closed); - self.inbound_closed = true; - } - fn inject_outbound_closed(&mut self, _: ()) { - assert!(!self.substream_attempt_cancelled); - self.substream_attempt_cancelled = true; - } - fn inject_event(&mut self, _: Self::InEvent) { panic!() } - fn shutdown(&mut self) { - assert!(self.inbound_closed); - assert!(self.substream_attempt_cancelled); - self.shutdown_called = true; - } - fn poll(&mut self) -> Poll, io::Error> { - if self.shutdown_called { - Ok(Async::Ready(NodeHandlerEvent::Shutdown)) - } else if !self.did_substream_attempt { - self.did_substream_attempt = true; - Ok(Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(()))) - } else { - Ok(Async::NotReady) - } - } - } - - impl Drop for ShutdownHandler { - fn drop(&mut self) { - if self.did_substream_attempt { - assert!(self.shutdown_called); - } - } - } - - // Test that `shutdown()` is properly called on the handler once a node stops. - let mut muxer = DummyMuxer::new(); - muxer.set_inbound_connection_state(DummyConnectionState::Closed); - muxer.set_outbound_connection_state(DummyConnectionState::Closed); - let handled = HandledNode::new(muxer, ShutdownHandler { - did_substream_attempt: false, - inbound_closed: false, - substream_attempt_cancelled: false, - shutdown_called: false, - marker: PhantomData, - }); - - current_thread::Runtime::new().unwrap().block_on(handled.for_each(|_| Ok(()))).unwrap(); - } - - #[test] - fn can_inject_event() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .handled_node(); - - let event = InEvent::Custom("banana"); - handled.inject_event(event.clone()); - assert_eq!(handled.handler().events, vec![event]); - } - - #[test] - fn knows_if_inbound_is_closed() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop - .handled_node(); - handled.poll().expect("poll failed"); - assert!(!handled.is_inbound_open()) - } - - #[test] - fn knows_if_outbound_is_closed() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop - .with_open_substream(987) // without at least one substream we do not poll_outbound so we never get the event - .handled_node(); - - handled.poll().expect("poll failed"); - assert!(!handled.is_outbound_open()); - } - - #[test] - fn is_shutting_down_is_true_when_called_shutdown_on_the_handled_node() { - let mut handled = TestBuilder::new() - .with_handler_state(HandlerState::Ready(None)) // Stop the loop towards the end of the first run - .handled_node(); - assert!(!handled.is_shutting_down()); - handled.poll().expect("poll should work"); - handled.shutdown(); - assert!(handled.is_shutting_down()); - } - - #[test] - fn is_shutting_down_is_true_when_in_and_outbounds_are_closed() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_open_substream(123) // avoid infinite loop - .handled_node(); - - handled.poll().expect("poll should work"); - - // Shutting down (in- and outbound are closed, and the handler is shutdown) - assert!(handled.is_shutting_down()); - } - - #[test] - fn is_shutting_down_is_true_when_handler_is_gone() { - // when in-/outbound NodeStreams are open or Async::Ready(None) we reach the handlers `poll()` and initiate shutdown. - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Pending) - .with_handler_state(HandlerState::Ready(None)) // avoid infinite loop - .handled_node(); - - handled.poll().expect("poll should work"); - - assert!(handled.is_shutting_down()); - } - - #[test] - fn is_shutting_down_is_true_when_handler_is_gone_even_if_in_and_outbounds_are_open() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Opened) - .with_muxer_outbound_state(DummyConnectionState::Opened) - .with_open_substream(123) - .with_handler_state(HandlerState::Ready(None)) - .handled_node(); - - handled.poll().expect("poll should work"); - - assert!(handled.is_shutting_down()); - } - - #[test] - fn poll_with_unready_node_stream_polls_handler() { - let mut handled = TestBuilder::new() - // make NodeStream return NotReady - .with_muxer_inbound_state(DummyConnectionState::Pending) - // make Handler return return Ready(None) so we break the infinite loop - .with_handler_state(HandlerState::Ready(None)) - .handled_node(); - - assert_matches!(handled.poll(), Ok(Async::Ready(None))); - } - - #[test] - fn poll_with_unready_node_stream_and_handler_emits_custom_event() { - let expected_event = Some(NodeHandlerEvent::Custom(OutEvent::Custom("pineapple"))); - let mut handled = TestBuilder::new() - // make NodeStream return NotReady - .with_muxer_inbound_state(DummyConnectionState::Pending) - // make Handler return return Ready(Some(…)) - .with_handler_state(HandlerState::Ready(expected_event)) - .handled_node(); - - assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => { - assert_matches!(event, OutEvent::Custom("pineapple")) - }); - } - - #[test] - fn handler_emits_outbound_closed_when_opening_new_substream_on_closed_node() { - let open_event = Some(NodeHandlerEvent::OutboundSubstreamRequest(456)); - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Ready(open_event)) - .handled_node(); - - set_next_handler_outbound_state( - &mut handled, - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear")))) - ); - handled.poll().expect("poll works"); - assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]); - } - - #[test] - fn poll_returns_not_ready_when_node_stream_and_handler_is_not_ready() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_open_substream(12) - .with_handler_state(HandlerState::NotReady) - .handled_node(); - - // Under the hood, this is what happens when calling `poll()`: - // - we reach `node.poll_inbound()` and because the connection is - // closed, `inbound_finished` is set to true. - // - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls - // `inject_inbound_close`, but that's irrelevant here) - // - back in `poll()` we call `handler.poll()` which does nothing because - // `HandlerState` is `NotReady`: loop continues - // - polls the node again which now skips the inbound block because - // `inbound_finished` is true. - // - Now `poll_outbound()` is called which returns `Async::Ready(None)` - // and sets `outbound_finished` to true. …calls destroy_outbound and - // yields Ready(OutboundClosed) …so the HandledNode calls - // `inject_outbound_closed`. - // - Now we have `inbound_finished` and `outbound_finished` set (and no - // more outbound substreams). - // - Next we poll the handler again which again does nothing because - // HandlerState is NotReady (and the node is still there) - // - HandledNode polls the node again: we skip inbound and there are no - // more outbound substreams so we skip that too; the addr is now - // Resolved so that part is skipped too - // - We reach the last section and the NodeStream yields Async::Ready(None) - // - Back in HandledNode the Async::Ready(None) triggers a shutdown - // – …and causes the Handler to yield Async::Ready(None) - // – which in turn makes the HandledNode to yield Async::Ready(None) as well - assert_matches!(handled.poll(), Ok(Async::Ready(None))); - assert_eq!(handled.handler().events, vec![ - InEvent::InboundClosed, InEvent::OutboundClosed - ]); - } - - #[test] - fn poll_yields_inbound_closed_event() { - let mut h = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Err) // stop the loop - .handled_node(); - - assert_eq!(h.handler().events, vec![]); - let _ = h.poll(); - assert_eq!(h.handler().events, vec![InEvent::InboundClosed]); - } - - #[test] - fn poll_yields_outbound_closed_event() { - let mut h = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_open_substream(32) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Err) // stop the loop - .handled_node(); - - assert_eq!(h.handler().events, vec![]); - let _ = h.poll(); - assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]); - } - - #[test] - fn poll_yields_outbound_substream() { - let mut h = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Opened) - .with_open_substream(1) - .with_handler_state(HandlerState::Err) // stop the loop - .handled_node(); - - assert_eq!(h.handler().events, vec![]); - let _ = h.poll(); - assert_eq!(h.handler().events, vec![InEvent::Substream(Some(1))]); - } - - #[test] - fn poll_yields_inbound_substream() { - let mut h = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Opened) - .with_muxer_outbound_state(DummyConnectionState::Pending) - .with_handler_state(HandlerState::Err) // stop the loop - .handled_node(); - - assert_eq!(h.handler().events, vec![]); - let _ = h.poll(); - assert_eq!(h.handler().events, vec![InEvent::Substream(None)]); - } -} diff --git a/core/src/nodes/handled_node/tests.rs b/core/src/nodes/handled_node/tests.rs new file mode 100644 index 00000000..337b71ff --- /dev/null +++ b/core/src/nodes/handled_node/tests.rs @@ -0,0 +1,368 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg(test)] + +use super::*; +use assert_matches::assert_matches; +use tokio::runtime::current_thread; +use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; +use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode}; +use std::{io, marker::PhantomData}; + +struct TestBuilder { + muxer: DummyMuxer, + handler: Handler, + want_open_substream: bool, + substream_user_data: usize, +} + +impl TestBuilder { + fn new() -> Self { + TestBuilder { + muxer: DummyMuxer::new(), + handler: Handler::default(), + want_open_substream: false, + substream_user_data: 0, + } + } + + fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self { + self.muxer.set_inbound_connection_state(state); + self + } + + fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self { + self.muxer.set_outbound_connection_state(state); + self + } + + fn with_handler_state(&mut self, state: HandlerState) -> &mut Self { + self.handler.state = Some(state); + self + } + + fn with_open_substream(&mut self, user_data: usize) -> &mut Self { + self.want_open_substream = true; + self.substream_user_data = user_data; + self + } + + fn handled_node(&mut self) -> TestHandledNode { + let mut h = HandledNode::new(self.muxer.clone(), self.handler.clone()); + if self.want_open_substream { + h.node.get_mut().open_substream(self.substream_user_data).expect("open substream should work"); + } + h + } +} + +// Set the state of the `Handler` after `inject_outbound_closed` is called +fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) { + handled_node.handler.next_outbound_state = Some(next_state); +} + +#[test] +fn proper_shutdown() { + struct ShutdownHandler { + did_substream_attempt: bool, + inbound_closed: bool, + substream_attempt_cancelled: bool, + shutdown_called: bool, + marker: PhantomData + } + impl NodeHandler for ShutdownHandler { + type InEvent = (); + type OutEvent = (); + type Substream = T; + type Error = io::Error; + type OutboundOpenInfo = (); + fn inject_substream(&mut self, _: Self::Substream, _: NodeHandlerEndpoint) { panic!() } + fn inject_inbound_closed(&mut self) { + assert!(!self.inbound_closed); + self.inbound_closed = true; + } + fn inject_outbound_closed(&mut self, _: ()) { + assert!(!self.substream_attempt_cancelled); + self.substream_attempt_cancelled = true; + } + fn inject_event(&mut self, _: Self::InEvent) { panic!() } + fn shutdown(&mut self) { + assert!(self.inbound_closed); + assert!(self.substream_attempt_cancelled); + self.shutdown_called = true; + } + fn poll(&mut self) -> Poll, io::Error> { + if self.shutdown_called { + Ok(Async::Ready(NodeHandlerEvent::Shutdown)) + } else if !self.did_substream_attempt { + self.did_substream_attempt = true; + Ok(Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(()))) + } else { + Ok(Async::NotReady) + } + } + } + + impl Drop for ShutdownHandler { + fn drop(&mut self) { + if self.did_substream_attempt { + assert!(self.shutdown_called); + } + } + } + + // Test that `shutdown()` is properly called on the handler once a node stops. + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Closed); + muxer.set_outbound_connection_state(DummyConnectionState::Closed); + let handled = HandledNode::new(muxer, ShutdownHandler { + did_substream_attempt: false, + inbound_closed: false, + substream_attempt_cancelled: false, + shutdown_called: false, + marker: PhantomData, + }); + + current_thread::Runtime::new().unwrap().block_on(handled.for_each(|_| Ok(()))).unwrap(); +} + +#[test] +fn can_inject_event() { + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Closed) + .handled_node(); + + let event = InEvent::Custom("banana"); + handled.inject_event(event.clone()); + assert_eq!(handled.handler().events, vec![event]); +} + +#[test] +fn knows_if_inbound_is_closed() { + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Closed) + .with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop + .handled_node(); + handled.poll().expect("poll failed"); + assert!(!handled.is_inbound_open()) +} + +#[test] +fn knows_if_outbound_is_closed() { + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Pending) + .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop + .with_open_substream(987) // without at least one substream we do not poll_outbound so we never get the event + .handled_node(); + + handled.poll().expect("poll failed"); + assert!(!handled.is_outbound_open()); +} + +#[test] +fn is_shutting_down_is_true_when_called_shutdown_on_the_handled_node() { + let mut handled = TestBuilder::new() + .with_handler_state(HandlerState::Ready(None)) // Stop the loop towards the end of the first run + .handled_node(); + assert!(!handled.is_shutting_down()); + handled.poll().expect("poll should work"); + handled.shutdown(); + assert!(handled.is_shutting_down()); +} + +#[test] +fn is_shutting_down_is_true_when_in_and_outbounds_are_closed() { + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Closed) + .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_open_substream(123) // avoid infinite loop + .handled_node(); + + handled.poll().expect("poll should work"); + + // Shutting down (in- and outbound are closed, and the handler is shutdown) + assert!(handled.is_shutting_down()); +} + +#[test] +fn is_shutting_down_is_true_when_handler_is_gone() { + // when in-/outbound NodeStreams are open or Async::Ready(None) we reach the handlers `poll()` and initiate shutdown. + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Pending) + .with_muxer_outbound_state(DummyConnectionState::Pending) + .with_handler_state(HandlerState::Ready(None)) // avoid infinite loop + .handled_node(); + + handled.poll().expect("poll should work"); + + assert!(handled.is_shutting_down()); +} + +#[test] +fn is_shutting_down_is_true_when_handler_is_gone_even_if_in_and_outbounds_are_open() { + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Opened) + .with_muxer_outbound_state(DummyConnectionState::Opened) + .with_open_substream(123) + .with_handler_state(HandlerState::Ready(None)) + .handled_node(); + + handled.poll().expect("poll should work"); + + assert!(handled.is_shutting_down()); +} + +#[test] +fn poll_with_unready_node_stream_polls_handler() { + let mut handled = TestBuilder::new() + // make NodeStream return NotReady + .with_muxer_inbound_state(DummyConnectionState::Pending) + // make Handler return return Ready(None) so we break the infinite loop + .with_handler_state(HandlerState::Ready(None)) + .handled_node(); + + assert_matches!(handled.poll(), Ok(Async::Ready(None))); +} + +#[test] +fn poll_with_unready_node_stream_and_handler_emits_custom_event() { + let expected_event = Some(NodeHandlerEvent::Custom(OutEvent::Custom("pineapple"))); + let mut handled = TestBuilder::new() + // make NodeStream return NotReady + .with_muxer_inbound_state(DummyConnectionState::Pending) + // make Handler return return Ready(Some(…)) + .with_handler_state(HandlerState::Ready(expected_event)) + .handled_node(); + + assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => { + assert_matches!(event, OutEvent::Custom("pineapple")) + }); +} + +#[test] +fn handler_emits_outbound_closed_when_opening_new_substream_on_closed_node() { + let open_event = Some(NodeHandlerEvent::OutboundSubstreamRequest(456)); + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Pending) + .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_handler_state(HandlerState::Ready(open_event)) + .handled_node(); + + set_next_handler_outbound_state( + &mut handled, + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear")))) + ); + handled.poll().expect("poll works"); + assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]); +} + +#[test] +fn poll_returns_not_ready_when_node_stream_and_handler_is_not_ready() { + let mut handled = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Closed) + .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_open_substream(12) + .with_handler_state(HandlerState::NotReady) + .handled_node(); + + // Under the hood, this is what happens when calling `poll()`: + // - we reach `node.poll_inbound()` and because the connection is + // closed, `inbound_finished` is set to true. + // - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls + // `inject_inbound_close`, but that's irrelevant here) + // - back in `poll()` we call `handler.poll()` which does nothing because + // `HandlerState` is `NotReady`: loop continues + // - polls the node again which now skips the inbound block because + // `inbound_finished` is true. + // - Now `poll_outbound()` is called which returns `Async::Ready(None)` + // and sets `outbound_finished` to true. …calls destroy_outbound and + // yields Ready(OutboundClosed) …so the HandledNode calls + // `inject_outbound_closed`. + // - Now we have `inbound_finished` and `outbound_finished` set (and no + // more outbound substreams). + // - Next we poll the handler again which again does nothing because + // HandlerState is NotReady (and the node is still there) + // - HandledNode polls the node again: we skip inbound and there are no + // more outbound substreams so we skip that too; the addr is now + // Resolved so that part is skipped too + // - We reach the last section and the NodeStream yields Async::Ready(None) + // - Back in HandledNode the Async::Ready(None) triggers a shutdown + // – …and causes the Handler to yield Async::Ready(None) + // – which in turn makes the HandledNode to yield Async::Ready(None) as well + assert_matches!(handled.poll(), Ok(Async::Ready(None))); + assert_eq!(handled.handler().events, vec![ + InEvent::InboundClosed, InEvent::OutboundClosed + ]); +} + +#[test] +fn poll_yields_inbound_closed_event() { + let mut h = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Closed) + .with_handler_state(HandlerState::Err) // stop the loop + .handled_node(); + + assert_eq!(h.handler().events, vec![]); + let _ = h.poll(); + assert_eq!(h.handler().events, vec![InEvent::InboundClosed]); +} + +#[test] +fn poll_yields_outbound_closed_event() { + let mut h = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Pending) + .with_open_substream(32) + .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_handler_state(HandlerState::Err) // stop the loop + .handled_node(); + + assert_eq!(h.handler().events, vec![]); + let _ = h.poll(); + assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]); +} + +#[test] +fn poll_yields_outbound_substream() { + let mut h = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Pending) + .with_muxer_outbound_state(DummyConnectionState::Opened) + .with_open_substream(1) + .with_handler_state(HandlerState::Err) // stop the loop + .handled_node(); + + assert_eq!(h.handler().events, vec![]); + let _ = h.poll(); + assert_eq!(h.handler().events, vec![InEvent::Substream(Some(1))]); +} + +#[test] +fn poll_yields_inbound_substream() { + let mut h = TestBuilder::new() + .with_muxer_inbound_state(DummyConnectionState::Opened) + .with_muxer_outbound_state(DummyConnectionState::Pending) + .with_handler_state(HandlerState::Err) // stop the loop + .handled_node(); + + assert_eq!(h.handler().events, vec![]); + let _ = h.poll(); + assert_eq!(h.handler().events, vec![InEvent::Substream(None)]); +} diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 61fcff32..42c9bdfd 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -38,6 +38,8 @@ use std::{ use tokio_executor; use void::Void; +mod tests; + // TODO: make generic over PeerId // Implementor notes @@ -491,352 +493,3 @@ where } } } - -#[cfg(test)] -mod tests { - use super::*; - - use std::io; - - use assert_matches::assert_matches; - use futures::future::{self, FutureResult}; - use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; - use crate::nodes::handled_node::NodeHandlerEvent; - use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode}; - use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; - use tokio::runtime::Builder; - use tokio::runtime::current_thread::Runtime; - use void::Void; - use crate::PeerId; - - type TestNodeTask = NodeTask< - FutureResult<(PeerId, DummyMuxer), io::Error>, - DummyMuxer, - Handler, - InEvent, - OutEvent, - io::Error, - >; - - struct NodeTaskTestBuilder { - task_id: TaskId, - inner_node: Option, - inner_fut: Option>, - } - - impl NodeTaskTestBuilder { - fn new() -> Self { - NodeTaskTestBuilder { - task_id: TaskId(123), - inner_node: None, - inner_fut: { - let peer_id = PeerId::random(); - Some(future::ok((peer_id, DummyMuxer::new()))) - }, - } - } - - fn with_inner_fut(&mut self, fut: FutureResult<(PeerId, DummyMuxer), io::Error>) -> &mut Self{ - self.inner_fut = Some(fut); - self - } - - fn with_task_id(&mut self, id: usize) -> &mut Self { - self.task_id = TaskId(id); - self - } - - fn node_task(&mut self) -> ( - TestNodeTask, - UnboundedSender, - UnboundedReceiver<(InToExtMessage, TaskId)>, - ) { - let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage, TaskId)>(); - let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::(); - let inner = if self.inner_node.is_some() { - NodeTaskInner::Node(self.inner_node.take().unwrap()) - } else { - NodeTaskInner::Future { - future: self.inner_fut.take().unwrap(), - handler: Handler::default(), - events_buffer: Vec::new(), - } - }; - let node_task = NodeTask { - inner: inner, - events_tx: events_from_node_task_tx.clone(), // events TO the outside - in_events_rx: events_to_node_task_rx.fuse(), // events FROM the outside - id: self.task_id, - }; - (node_task, events_to_node_task_tx, events_from_node_task_rx) - } - } - - type TestHandledNodesTasks = HandledNodesTasks; - - struct HandledNodeTaskTestBuilder { - muxer: DummyMuxer, - handler: Handler, - task_count: usize, - } - - impl HandledNodeTaskTestBuilder { - fn new() -> Self { - HandledNodeTaskTestBuilder { - muxer: DummyMuxer::new(), - handler: Handler::default(), - task_count: 0, - } - } - - fn with_tasks(&mut self, amt: usize) -> &mut Self { - self.task_count = amt; - self - } - fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self { - self.muxer.set_inbound_connection_state(state); - self - } - fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self { - self.muxer.set_outbound_connection_state(state); - self - } - fn with_handler_state(&mut self, state: HandlerState) -> &mut Self { - self.handler.state = Some(state); - self - } - fn with_handler_states(&mut self, states: Vec) -> &mut Self { - self.handler.next_states = states; - self - } - fn handled_nodes_tasks(&mut self) -> (TestHandledNodesTasks, Vec) { - let mut handled_nodes = HandledNodesTasks::new(); - let peer_id = PeerId::random(); - let mut task_ids = Vec::new(); - for _i in 0..self.task_count { - let fut = future::ok((peer_id.clone(), self.muxer.clone())); - task_ids.push( - handled_nodes.add_reach_attempt(fut, self.handler.clone()) - ); - } - (handled_nodes, task_ids) - } - } - - - // Tests for NodeTask - - #[test] - fn task_emits_event_when_things_happen_in_the_node() { - let (node_task, tx, mut rx) = NodeTaskTestBuilder::new() - .with_task_id(890) - .node_task(); - - tx.unbounded_send(InEvent::Custom("beef")).expect("send to NodeTask should work"); - let mut rt = Runtime::new().unwrap(); - rt.spawn(node_task); - let events = rt.block_on(rx.by_ref().take(2).collect()).expect("reading on rx should work"); - - assert_matches!(events[0], (InToExtMessage::NodeReached(_), TaskId(890))); - assert_matches!(events[1], (InToExtMessage::NodeEvent(ref outevent), TaskId(890)) => { - assert_matches!(outevent, OutEvent::Custom(beef) => { - assert_eq!(beef, &"beef"); - }) - }); - } - - #[test] - fn task_exits_when_node_errors() { - let mut rt = Runtime::new().unwrap(); - let (node_task, _tx, rx) = NodeTaskTestBuilder::new() - .with_inner_fut(future::err(io::Error::new(io::ErrorKind::Other, "nah"))) - .with_task_id(345) - .node_task(); - - rt.spawn(node_task); - let events = rt.block_on(rx.collect()).expect("rx failed"); - assert!(events.len() == 1); - assert_matches!(events[0], (InToExtMessage::TaskClosed{..}, TaskId(345))); - } - - #[test] - fn task_exits_when_node_is_done() { - let mut rt = Runtime::new().unwrap(); - let fut = { - let peer_id = PeerId::random(); - let mut muxer = DummyMuxer::new(); - muxer.set_inbound_connection_state(DummyConnectionState::Closed); - muxer.set_outbound_connection_state(DummyConnectionState::Closed); - future::ok((peer_id, muxer)) - }; - let (node_task, tx, rx) = NodeTaskTestBuilder::new() - .with_inner_fut(fut) - .with_task_id(345) - .node_task(); - - // Even though we set up the muxer outbound state to be `Closed` we - // still need to open a substream or the outbound state will never - // be checked (see https://github.com/libp2p/rust-libp2p/issues/609). - // We do not have a HandledNode yet, so we can't simply call - // `open_substream`. Instead we send a message to the NodeTask, - // which will be buffered until the inner future resolves, then - // it'll call `inject_event` on the handler. In the test Handler, - // inject_event will set the next state so that it yields an - // OutboundSubstreamRequest. - // Back up in the HandledNode, at the next iteration we'll - // open_substream() and iterate again. This time, node.poll() will - // poll the muxer inbound (closed) and also outbound (because now - // there's an entry in the outbound_streams) which will be Closed - // (because we set up the muxer state so) and thus yield - // Async::Ready(None) which in turn makes the NodeStream yield an - // Async::Ready(OutboundClosed) to the HandledNode. - // Now we're at the point where poll_inbound, poll_outbound and - // address are all skipped and there is nothing left to do: we yield - // Async::Ready(None) from the NodeStream. In the HandledNode, - // Async::Ready(None) triggers a shutdown of the Handler so that it - // also yields Async::Ready(None). Finally, the NodeTask gets a - // Async::Ready(None) and sends a TaskClosed and returns - // Async::Ready(()). QED. - - let create_outbound_substream_event = InEvent::Substream(Some(135)); - tx.unbounded_send(create_outbound_substream_event).expect("send msg works"); - rt.spawn(node_task); - let events = rt.block_on(rx.collect()).expect("rx failed"); - - assert_eq!(events.len(), 2); - assert_matches!(events[0].0, InToExtMessage::NodeReached(PeerId{..})); - assert_matches!(events[1].0, InToExtMessage::TaskClosed(Ok(()), _)); - } - - - // Tests for HandledNodeTasks - - #[test] - fn query_for_tasks() { - let (mut handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new() - .with_tasks(3) - .handled_nodes_tasks(); - - assert_eq!(task_ids.len(), 3); - assert_eq!(handled_nodes.task(TaskId(2)).unwrap().id(), task_ids[2]); - assert!(handled_nodes.task(TaskId(545534)).is_none()); - } - - #[test] - fn send_event_to_task() { - let (mut handled_nodes, _) = HandledNodeTaskTestBuilder::new() - .with_tasks(1) - .handled_nodes_tasks(); - - let task_id = { - let mut task = handled_nodes.task(TaskId(0)).expect("can fetch a Task"); - task.send_event(InEvent::Custom("banana")); - task.id() - }; - - let mut rt = Builder::new().core_threads(1).build().unwrap(); - let mut events = rt.block_on(handled_nodes.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..}); - - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: event_task_id, event} => { - assert_eq!(event_task_id, task_id); - assert_matches!(event, OutEvent::Custom("banana")); - }); - } - - #[test] - fn iterate_over_all_tasks() { - let (handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new() - .with_tasks(3) - .handled_nodes_tasks(); - - let mut tasks: Vec = handled_nodes.tasks().collect(); - assert!(tasks.len() == 3); - tasks.sort_by_key(|t| t.0 ); - assert_eq!(tasks, task_ids); - } - - #[test] - fn add_reach_attempt_prepares_a_new_task() { - let mut handled_nodes = HandledNodesTasks::new(); - assert_eq!(handled_nodes.tasks().count(), 0); - assert_eq!(handled_nodes.to_spawn.len(), 0); - - handled_nodes.add_reach_attempt( future::empty::<_, Void>(), Handler::default() ); - - assert_eq!(handled_nodes.tasks().count(), 1); - assert_eq!(handled_nodes.to_spawn.len(), 1); - } - - #[test] - fn running_handled_tasks_reaches_the_nodes() { - let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new() - .with_tasks(5) - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Err) // stop the loop - .handled_nodes_tasks(); - - let mut rt = Runtime::new().unwrap(); - let mut events: (Option>, TestHandledNodesTasks); - // we're running on a single thread so events are sequential: first - // we get a NodeReached, then a TaskClosed - for i in 0..5 { - events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); - assert_matches!(events, (Some(HandledNodesEvent::NodeReached{..}), ref hnt) => { - assert_matches!(hnt, HandledNodesTasks{..}); - assert_eq!(hnt.tasks().count(), 5 - i); - }); - handled_nodes_tasks = events.1; - events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); - assert_matches!(events, (Some(HandledNodesEvent::TaskClosed{..}), _)); - handled_nodes_tasks = events.1; - } - } - - #[test] - fn events_in_tasks_are_emitted() { - // States are pop()'d so they are set in reverse order by the Handler - let handler_states = vec![ - HandlerState::Err, - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler2") ))), - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler") ))), - ]; - - let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new() - .with_tasks(1) - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Opened) - .with_handler_states(handler_states) - .handled_nodes_tasks(); - - let tx = { - let mut task0 = handled_nodes_tasks.task(TaskId(0)).unwrap(); - let tx = task0.inner.get_mut(); - tx.clone() - }; - - let mut rt = Builder::new().core_threads(1).build().unwrap(); - let mut events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..}); - - tx.unbounded_send(InEvent::NextState).expect("send works"); - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => { - assert_matches!(event, OutEvent::Custom("from handler")); - }); - - tx.unbounded_send(InEvent::NextState).expect("send works"); - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => { - assert_matches!(event, OutEvent::Custom("from handler2")); - }); - - tx.unbounded_send(InEvent::NextState).expect("send works"); - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::TaskClosed{id: _, result, handler: _} => { - assert_matches!(result, Err(_)); - }); - } -} diff --git a/core/src/nodes/handled_node_tasks/tests.rs b/core/src/nodes/handled_node_tasks/tests.rs new file mode 100644 index 00000000..ac00a757 --- /dev/null +++ b/core/src/nodes/handled_node_tasks/tests.rs @@ -0,0 +1,367 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg(test)] + +use super::*; + +use std::io; + +use assert_matches::assert_matches; +use futures::future::{self, FutureResult}; +use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use crate::nodes::handled_node::NodeHandlerEvent; +use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode}; +use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; +use tokio::runtime::Builder; +use tokio::runtime::current_thread::Runtime; +use void::Void; +use crate::PeerId; + +type TestNodeTask = NodeTask< + FutureResult<(PeerId, DummyMuxer), io::Error>, + DummyMuxer, + Handler, + InEvent, + OutEvent, + io::Error, +>; + +struct NodeTaskTestBuilder { + task_id: TaskId, + inner_node: Option, + inner_fut: Option>, +} + +impl NodeTaskTestBuilder { + fn new() -> Self { + NodeTaskTestBuilder { + task_id: TaskId(123), + inner_node: None, + inner_fut: { + let peer_id = PeerId::random(); + Some(future::ok((peer_id, DummyMuxer::new()))) + }, + } + } + + fn with_inner_fut(&mut self, fut: FutureResult<(PeerId, DummyMuxer), io::Error>) -> &mut Self{ + self.inner_fut = Some(fut); + self + } + + fn with_task_id(&mut self, id: usize) -> &mut Self { + self.task_id = TaskId(id); + self + } + + fn node_task(&mut self) -> ( + TestNodeTask, + UnboundedSender, + UnboundedReceiver<(InToExtMessage, TaskId)>, + ) { + let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage, TaskId)>(); + let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::(); + let inner = if self.inner_node.is_some() { + NodeTaskInner::Node(self.inner_node.take().unwrap()) + } else { + NodeTaskInner::Future { + future: self.inner_fut.take().unwrap(), + handler: Handler::default(), + events_buffer: Vec::new(), + } + }; + let node_task = NodeTask { + inner: inner, + events_tx: events_from_node_task_tx.clone(), // events TO the outside + in_events_rx: events_to_node_task_rx.fuse(), // events FROM the outside + id: self.task_id, + }; + (node_task, events_to_node_task_tx, events_from_node_task_rx) + } +} + +type TestHandledNodesTasks = HandledNodesTasks; + +struct HandledNodeTaskTestBuilder { + muxer: DummyMuxer, + handler: Handler, + task_count: usize, +} + +impl HandledNodeTaskTestBuilder { + fn new() -> Self { + HandledNodeTaskTestBuilder { + muxer: DummyMuxer::new(), + handler: Handler::default(), + task_count: 0, + } + } + + fn with_tasks(&mut self, amt: usize) -> &mut Self { + self.task_count = amt; + self + } + fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self { + self.muxer.set_inbound_connection_state(state); + self + } + fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self { + self.muxer.set_outbound_connection_state(state); + self + } + fn with_handler_state(&mut self, state: HandlerState) -> &mut Self { + self.handler.state = Some(state); + self + } + fn with_handler_states(&mut self, states: Vec) -> &mut Self { + self.handler.next_states = states; + self + } + fn handled_nodes_tasks(&mut self) -> (TestHandledNodesTasks, Vec) { + let mut handled_nodes = HandledNodesTasks::new(); + let peer_id = PeerId::random(); + let mut task_ids = Vec::new(); + for _i in 0..self.task_count { + let fut = future::ok((peer_id.clone(), self.muxer.clone())); + task_ids.push( + handled_nodes.add_reach_attempt(fut, self.handler.clone()) + ); + } + (handled_nodes, task_ids) + } +} + + +// Tests for NodeTask + +#[test] +fn task_emits_event_when_things_happen_in_the_node() { + let (node_task, tx, mut rx) = NodeTaskTestBuilder::new() + .with_task_id(890) + .node_task(); + + tx.unbounded_send(InEvent::Custom("beef")).expect("send to NodeTask should work"); + let mut rt = Runtime::new().unwrap(); + rt.spawn(node_task); + let events = rt.block_on(rx.by_ref().take(2).collect()).expect("reading on rx should work"); + + assert_matches!(events[0], (InToExtMessage::NodeReached(_), TaskId(890))); + assert_matches!(events[1], (InToExtMessage::NodeEvent(ref outevent), TaskId(890)) => { + assert_matches!(outevent, OutEvent::Custom(beef) => { + assert_eq!(beef, &"beef"); + }) + }); +} + +#[test] +fn task_exits_when_node_errors() { + let mut rt = Runtime::new().unwrap(); + let (node_task, _tx, rx) = NodeTaskTestBuilder::new() + .with_inner_fut(future::err(io::Error::new(io::ErrorKind::Other, "nah"))) + .with_task_id(345) + .node_task(); + + rt.spawn(node_task); + let events = rt.block_on(rx.collect()).expect("rx failed"); + assert!(events.len() == 1); + assert_matches!(events[0], (InToExtMessage::TaskClosed{..}, TaskId(345))); +} + +#[test] +fn task_exits_when_node_is_done() { + let mut rt = Runtime::new().unwrap(); + let fut = { + let peer_id = PeerId::random(); + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Closed); + muxer.set_outbound_connection_state(DummyConnectionState::Closed); + future::ok((peer_id, muxer)) + }; + let (node_task, tx, rx) = NodeTaskTestBuilder::new() + .with_inner_fut(fut) + .with_task_id(345) + .node_task(); + + // Even though we set up the muxer outbound state to be `Closed` we + // still need to open a substream or the outbound state will never + // be checked (see https://github.com/libp2p/rust-libp2p/issues/609). + // We do not have a HandledNode yet, so we can't simply call + // `open_substream`. Instead we send a message to the NodeTask, + // which will be buffered until the inner future resolves, then + // it'll call `inject_event` on the handler. In the test Handler, + // inject_event will set the next state so that it yields an + // OutboundSubstreamRequest. + // Back up in the HandledNode, at the next iteration we'll + // open_substream() and iterate again. This time, node.poll() will + // poll the muxer inbound (closed) and also outbound (because now + // there's an entry in the outbound_streams) which will be Closed + // (because we set up the muxer state so) and thus yield + // Async::Ready(None) which in turn makes the NodeStream yield an + // Async::Ready(OutboundClosed) to the HandledNode. + // Now we're at the point where poll_inbound, poll_outbound and + // address are all skipped and there is nothing left to do: we yield + // Async::Ready(None) from the NodeStream. In the HandledNode, + // Async::Ready(None) triggers a shutdown of the Handler so that it + // also yields Async::Ready(None). Finally, the NodeTask gets a + // Async::Ready(None) and sends a TaskClosed and returns + // Async::Ready(()). QED. + + let create_outbound_substream_event = InEvent::Substream(Some(135)); + tx.unbounded_send(create_outbound_substream_event).expect("send msg works"); + rt.spawn(node_task); + let events = rt.block_on(rx.collect()).expect("rx failed"); + + assert_eq!(events.len(), 2); + assert_matches!(events[0].0, InToExtMessage::NodeReached(PeerId{..})); + assert_matches!(events[1].0, InToExtMessage::TaskClosed(Ok(()), _)); +} + + +// Tests for HandledNodeTasks + +#[test] +fn query_for_tasks() { + let (mut handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new() + .with_tasks(3) + .handled_nodes_tasks(); + + assert_eq!(task_ids.len(), 3); + assert_eq!(handled_nodes.task(TaskId(2)).unwrap().id(), task_ids[2]); + assert!(handled_nodes.task(TaskId(545534)).is_none()); +} + +#[test] +fn send_event_to_task() { + let (mut handled_nodes, _) = HandledNodeTaskTestBuilder::new() + .with_tasks(1) + .handled_nodes_tasks(); + + let task_id = { + let mut task = handled_nodes.task(TaskId(0)).expect("can fetch a Task"); + task.send_event(InEvent::Custom("banana")); + task.id() + }; + + let mut rt = Builder::new().core_threads(1).build().unwrap(); + let mut events = rt.block_on(handled_nodes.into_future()).unwrap(); + assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..}); + + events = rt.block_on(events.1.into_future()).unwrap(); + assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: event_task_id, event} => { + assert_eq!(event_task_id, task_id); + assert_matches!(event, OutEvent::Custom("banana")); + }); +} + +#[test] +fn iterate_over_all_tasks() { + let (handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new() + .with_tasks(3) + .handled_nodes_tasks(); + + let mut tasks: Vec = handled_nodes.tasks().collect(); + assert!(tasks.len() == 3); + tasks.sort_by_key(|t| t.0 ); + assert_eq!(tasks, task_ids); +} + +#[test] +fn add_reach_attempt_prepares_a_new_task() { + let mut handled_nodes = HandledNodesTasks::new(); + assert_eq!(handled_nodes.tasks().count(), 0); + assert_eq!(handled_nodes.to_spawn.len(), 0); + + handled_nodes.add_reach_attempt( future::empty::<_, Void>(), Handler::default() ); + + assert_eq!(handled_nodes.tasks().count(), 1); + assert_eq!(handled_nodes.to_spawn.len(), 1); +} + +#[test] +fn running_handled_tasks_reaches_the_nodes() { + let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new() + .with_tasks(5) + .with_muxer_inbound_state(DummyConnectionState::Closed) + .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_handler_state(HandlerState::Err) // stop the loop + .handled_nodes_tasks(); + + let mut rt = Runtime::new().unwrap(); + let mut events: (Option>, TestHandledNodesTasks); + // we're running on a single thread so events are sequential: first + // we get a NodeReached, then a TaskClosed + for i in 0..5 { + events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); + assert_matches!(events, (Some(HandledNodesEvent::NodeReached{..}), ref hnt) => { + assert_matches!(hnt, HandledNodesTasks{..}); + assert_eq!(hnt.tasks().count(), 5 - i); + }); + handled_nodes_tasks = events.1; + events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); + assert_matches!(events, (Some(HandledNodesEvent::TaskClosed{..}), _)); + handled_nodes_tasks = events.1; + } +} + +#[test] +fn events_in_tasks_are_emitted() { + // States are pop()'d so they are set in reverse order by the Handler + let handler_states = vec![ + HandlerState::Err, + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler2") ))), + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler") ))), + ]; + + let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new() + .with_tasks(1) + .with_muxer_inbound_state(DummyConnectionState::Pending) + .with_muxer_outbound_state(DummyConnectionState::Opened) + .with_handler_states(handler_states) + .handled_nodes_tasks(); + + let tx = { + let mut task0 = handled_nodes_tasks.task(TaskId(0)).unwrap(); + let tx = task0.inner.get_mut(); + tx.clone() + }; + + let mut rt = Builder::new().core_threads(1).build().unwrap(); + let mut events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); + assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..}); + + tx.unbounded_send(InEvent::NextState).expect("send works"); + events = rt.block_on(events.1.into_future()).unwrap(); + assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => { + assert_matches!(event, OutEvent::Custom("from handler")); + }); + + tx.unbounded_send(InEvent::NextState).expect("send works"); + events = rt.block_on(events.1.into_future()).unwrap(); + assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => { + assert_matches!(event, OutEvent::Custom("from handler2")); + }); + + tx.unbounded_send(InEvent::NextState).expect("send works"); + events = rt.block_on(events.1.into_future()).unwrap(); + assert_matches!(events.0.unwrap(), HandledNodesEvent::TaskClosed{id: _, result, handler: _} => { + assert_matches!(result, Err(_)); + }); +} diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 5a091b3d..a1e63995 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -48,6 +48,8 @@ use std::{ io::{Error as IoError, ErrorKind as IoErrorKind} }; +mod tests; + /// Implementation of `Stream` that handles the nodes. #[derive(Debug)] pub struct RawSwarm @@ -1358,436 +1360,3 @@ where }) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::tests::dummy_transport::DummyTransport; - use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent}; - use crate::tests::dummy_transport::ListenerState; - use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; - use crate::nodes::NodeHandlerEvent; - use assert_matches::assert_matches; - use parking_lot::Mutex; - use std::sync::Arc; - use tokio::runtime::{Builder, Runtime}; - - #[test] - fn query_transport() { - let transport = DummyTransport::new(); - let transport2 = transport.clone(); - let raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); - assert_eq!(raw_swarm.transport(), &transport2); - } - - #[test] - fn starts_listening() { - let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); - let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); - let addr2 = addr.clone(); - assert!(raw_swarm.listen_on(addr).is_ok()); - let listeners = raw_swarm.listeners().collect::>(); - assert_eq!(listeners.len(), 1); - assert_eq!(listeners[0], &addr2); - } - - #[test] - fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() { - // the DummyTransport nat_traversal increments the port number by one for Ip4 addresses - let transport = DummyTransport::new(); - let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); - let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); - // An unrelated outside address is returned as-is, no transform - let outside_addr1 = "/memory".parse::().expect("bad multiaddr"); - - let addr2 = "/ip4/127.0.0.2/tcp/1234".parse::().expect("bad multiaddr"); - let outside_addr2 = "/ip4/127.0.0.2/tcp/1234".parse::().expect("bad multiaddr"); - - raw_swarm.listen_on(addr1).unwrap(); - raw_swarm.listen_on(addr2).unwrap(); - - let natted = raw_swarm - .nat_traversal(&outside_addr1) - .map(|a| a.to_string()) - .collect::>(); - - assert!(natted.is_empty()); - - let natted = raw_swarm - .nat_traversal(&outside_addr2) - .map(|a| a.to_string()) - .collect::>(); - - assert_eq!(natted, vec!["/ip4/127.0.0.2/tcp/1234"]) - } - - #[test] - fn successful_dial_reaches_a_node() { - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); - let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); - let dial_res = swarm.dial(addr, Handler::default()); - assert!(dial_res.is_ok()); - - // Poll the swarm until we get a `NodeReached` then assert on the peer: - // it's there and it's connected. - let swarm = Arc::new(Mutex::new(swarm)); - - let mut rt = Runtime::new().unwrap(); - let mut peer_id : Option = None; - // Drive forward until we're Connected - while peer_id.is_none() { - let swarm_fut = swarm.clone(); - peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { - let mut swarm = swarm_fut.lock(); - let poll_res = swarm.poll(); - match poll_res { - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), - _ => Ok(Async::Ready(None)) - } - })).expect("tokio works"); - } - - let mut swarm = swarm.lock(); - let peer = swarm.peer(peer_id.unwrap()); - assert_matches!(peer, Peer::Connected(PeerConnected{..})); - } - - #[test] - fn num_incoming_negotiated() { - let mut transport = DummyTransport::new(); - let peer_id = PeerId::random(); - let muxer = DummyMuxer::new(); - - // Set up listener to see an incoming connection - transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer))))); - - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); - swarm.listen_on("/memory".parse().unwrap()).unwrap(); - - // no incoming yet - assert_eq!(swarm.num_incoming_negotiated(), 0); - - let mut rt = Runtime::new().unwrap(); - let swarm = Arc::new(Mutex::new(swarm)); - let swarm_fut = swarm.clone(); - let fut = future::poll_fn(move || -> Poll<_, ()> { - let mut swarm_fut = swarm_fut.lock(); - assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => { - incoming.accept(Handler::default()); - }); - - Ok(Async::Ready(())) - }); - rt.block_on(fut).expect("tokio works"); - let swarm = swarm.lock(); - // Now there's an incoming connection - assert_eq!(swarm.num_incoming_negotiated(), 1); - } - - #[test] - fn broadcasted_events_reach_active_nodes() { - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); - let mut muxer = DummyMuxer::new(); - muxer.set_inbound_connection_state(DummyConnectionState::Pending); - muxer.set_outbound_connection_state(DummyConnectionState::Opened); - let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); - let mut handler = Handler::default(); - handler.next_states = vec![HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))),]; - let dial_result = swarm.dial(addr, handler); - assert!(dial_result.is_ok()); - - swarm.broadcast_event(&InEvent::NextState); - let swarm = Arc::new(Mutex::new(swarm)); - let mut rt = Runtime::new().unwrap(); - let mut peer_id : Option = None; - while peer_id.is_none() { - let swarm_fut = swarm.clone(); - peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { - let mut swarm = swarm_fut.lock(); - let poll_res = swarm.poll(); - match poll_res { - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), - _ => Ok(Async::Ready(None)) - } - })).expect("tokio works"); - } - - let mut keep_polling = true; - while keep_polling { - let swarm_fut = swarm.clone(); - keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - match swarm.poll() { - Async::Ready(event) => { - assert_matches!(event, RawSwarmEvent::NodeEvent { peer_id: _, event: inner_event } => { - // The event we sent reached the node and triggered sending the out event we told it to return - assert_matches!(inner_event, OutEvent::Custom("from handler 1")); - }); - Ok(Async::Ready(false)) - }, - _ => Ok(Async::Ready(true)) - } - })).expect("tokio works"); - } - } - - #[test] - fn querying_for_pending_peer() { - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); - let peer_id = PeerId::random(); - let peer = swarm.peer(peer_id.clone()); - assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. })); - let addr = "/memory".parse().expect("bad multiaddr"); - let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default()); - assert!(pending_peer.is_ok()); - assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } )); - } - - #[test] - fn querying_for_unknown_peer() { - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); - let peer_id = PeerId::random(); - let peer = swarm.peer(peer_id.clone()); - assert_matches!(peer, Peer::NotConnected( PeerNotConnected { nodes: _, peer_id: node_peer_id }) => { - assert_eq!(node_peer_id, peer_id); - }); - } - - #[test] - fn querying_for_connected_peer() { - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); - - // Dial a node - let addr = "/ip4/127.0.0.1/tcp/1234".parse().expect("bad multiaddr"); - swarm.dial(addr, Handler::default()).expect("dialing works"); - - let swarm = Arc::new(Mutex::new(swarm)); - let mut rt = Runtime::new().unwrap(); - // Drive it forward until we connect; extract the new PeerId. - let mut peer_id : Option = None; - while peer_id.is_none() { - let swarm_fut = swarm.clone(); - peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { - let mut swarm = swarm_fut.lock(); - let poll_res = swarm.poll(); - match poll_res { - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), - _ => Ok(Async::Ready(None)) - } - })).expect("tokio works"); - } - - // We're connected. - let mut swarm = swarm.lock(); - let peer = swarm.peer(peer_id.unwrap()); - assert_matches!(peer, Peer::Connected( PeerConnected { .. } )); - } - - #[test] - fn poll_with_closed_listener() { - let mut transport = DummyTransport::new(); - // Set up listener to be closed - transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(None))); - - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); - swarm.listen_on("/memory".parse().unwrap()).unwrap(); - - let mut rt = Runtime::new().unwrap(); - let swarm = Arc::new(Mutex::new(swarm)); - - let swarm_fut = swarm.clone(); - let fut = future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::ListenerClosed { .. } )); - Ok(Async::Ready(())) - }); - rt.block_on(fut).expect("tokio works"); - } - - #[test] - fn unknown_peer_that_is_unreachable_yields_unknown_peer_dial_error() { - let mut transport = DummyTransport::new(); - transport.make_dial_fail(); - let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); - let addr = "/memory".parse::().expect("bad multiaddr"); - let handler = Handler::default(); - let dial_result = swarm.dial(addr, handler); - assert!(dial_result.is_ok()); - - let swarm = Arc::new(Mutex::new(swarm)); - let mut rt = Runtime::new().unwrap(); - // Drive it forward until we hear back from the node. - let mut keep_polling = true; - while keep_polling { - let swarm_fut = swarm.clone(); - keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - match swarm.poll() { - Async::NotReady => Ok(Async::Ready(true)), - Async::Ready(event) => { - assert_matches!(event, RawSwarmEvent::UnknownPeerDialError { .. } ); - Ok(Async::Ready(false)) - }, - } - })).expect("tokio works"); - } - } - - #[test] - fn known_peer_that_is_unreachable_yields_dial_error() { - let mut transport = DummyTransport::new(); - let peer_id = PeerId::random(); - transport.set_next_peer_id(&peer_id); - transport.make_dial_fail(); - let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()))); - - { - let swarm1 = swarm.clone(); - let mut swarm1 = swarm1.lock(); - let peer = swarm1.peer(peer_id.clone()); - assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. })); - let addr = "/memory".parse::().expect("bad multiaddr"); - let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default()); - assert!(pending_peer.is_ok()); - assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } )); - } - let mut rt = Runtime::new().unwrap(); - // Drive it forward until we hear back from the node. - let mut keep_polling = true; - while keep_polling { - let swarm_fut = swarm.clone(); - let peer_id = peer_id.clone(); - keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - match swarm.poll() { - Async::NotReady => Ok(Async::Ready(true)), - Async::Ready(event) => { - let failed_peer_id = assert_matches!( - event, - RawSwarmEvent::DialError { remain_addrs_attempt: _, peer_id: failed_peer_id, .. } => failed_peer_id - ); - assert_eq!(peer_id, failed_peer_id); - Ok(Async::Ready(false)) - }, - } - })).expect("tokio works"); - } - } - - #[test] - fn yields_node_error_when_there_is_an_error_after_successful_connect() { - let mut transport = DummyTransport::new(); - let peer_id = PeerId::random(); - transport.set_next_peer_id(&peer_id); - let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()))); - - { - // Set up an outgoing connection with a PeerId we know - let swarm1 = swarm.clone(); - let mut swarm1 = swarm1.lock(); - let peer = swarm1.peer(peer_id.clone()); - let addr = "/unix/reachable".parse().expect("bad multiaddr"); - let mut handler = Handler::default(); - // Force an error - handler.next_states = vec![ HandlerState::Err ]; - peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); - } - - // Ensure we run on a single thread - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - // Drive it forward until we connect to the node. - let mut keep_polling = true; - while keep_polling { - let swarm_fut = swarm.clone(); - keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - // Push the Handler into an error state on the next poll - swarm.broadcast_event(&InEvent::NextState); - match swarm.poll() { - Async::NotReady => Ok(Async::Ready(true)), - Async::Ready(event) => { - assert_matches!(event, RawSwarmEvent::Connected { .. }); - // We're connected, we can move on - Ok(Async::Ready(false)) - }, - } - })).expect("tokio works"); - } - - // Poll again. It is going to be a NodeError because of how the - // handler's next state was set up. - let swarm_fut = swarm.clone(); - let expected_peer_id = peer_id.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => { - assert_eq!(peer_id, expected_peer_id); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - } - - #[test] - fn yields_node_closed_when_the_node_closes_after_successful_connect() { - let mut transport = DummyTransport::new(); - let peer_id = PeerId::random(); - transport.set_next_peer_id(&peer_id); - let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()))); - - { - // Set up an outgoing connection with a PeerId we know - let swarm1 = swarm.clone(); - let mut swarm1 = swarm1.lock(); - let peer = swarm1.peer(peer_id.clone()); - let addr = "/unix/reachable".parse().expect("bad multiaddr"); - let mut handler = Handler::default(); - // Force handler to close - handler.next_states = vec![ HandlerState::Ready(None) ]; - peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); - } - - // Ensure we run on a single thread - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - // Drive it forward until we connect to the node. - let mut keep_polling = true; - while keep_polling { - let swarm_fut = swarm.clone(); - keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - // Push the Handler into the closed state on the next poll - swarm.broadcast_event(&InEvent::NextState); - match swarm.poll() { - Async::NotReady => Ok(Async::Ready(true)), - Async::Ready(event) => { - assert_matches!(event, RawSwarmEvent::Connected { .. }); - // We're connected, we can move on - Ok(Async::Ready(false)) - }, - } - })).expect("tokio works"); - } - - // Poll again. It is going to be a NodeClosed because of how the - // handler's next state was set up. - let swarm_fut = swarm.clone(); - let expected_peer_id = peer_id.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => { - assert_eq!(peer_id, expected_peer_id); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - } - - #[test] - fn local_prio_equivalence_relation() { - for _ in 0..1000 { - let a = PeerId::random(); - let b = PeerId::random(); - assert_ne!(has_dial_prio(&a, &b), has_dial_prio(&b, &a)); - } - } -} diff --git a/core/src/nodes/raw_swarm/tests.rs b/core/src/nodes/raw_swarm/tests.rs new file mode 100644 index 00000000..53ad97a5 --- /dev/null +++ b/core/src/nodes/raw_swarm/tests.rs @@ -0,0 +1,451 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg(test)] + +use super::*; +use crate::tests::dummy_transport::DummyTransport; +use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent}; +use crate::tests::dummy_transport::ListenerState; +use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; +use crate::nodes::NodeHandlerEvent; +use assert_matches::assert_matches; +use parking_lot::Mutex; +use std::sync::Arc; +use tokio::runtime::{Builder, Runtime}; + +#[test] +fn query_transport() { + let transport = DummyTransport::new(); + let transport2 = transport.clone(); + let raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); + assert_eq!(raw_swarm.transport(), &transport2); +} + +#[test] +fn starts_listening() { + let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let addr2 = addr.clone(); + assert!(raw_swarm.listen_on(addr).is_ok()); + let listeners = raw_swarm.listeners().collect::>(); + assert_eq!(listeners.len(), 1); + assert_eq!(listeners[0], &addr2); +} + +#[test] +fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() { + // the DummyTransport nat_traversal increments the port number by one for Ip4 addresses + let transport = DummyTransport::new(); + let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); + let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + // An unrelated outside address is returned as-is, no transform + let outside_addr1 = "/memory".parse::().expect("bad multiaddr"); + + let addr2 = "/ip4/127.0.0.2/tcp/1234".parse::().expect("bad multiaddr"); + let outside_addr2 = "/ip4/127.0.0.2/tcp/1234".parse::().expect("bad multiaddr"); + + raw_swarm.listen_on(addr1).unwrap(); + raw_swarm.listen_on(addr2).unwrap(); + + let natted = raw_swarm + .nat_traversal(&outside_addr1) + .map(|a| a.to_string()) + .collect::>(); + + assert!(natted.is_empty()); + + let natted = raw_swarm + .nat_traversal(&outside_addr2) + .map(|a| a.to_string()) + .collect::>(); + + assert_eq!(natted, vec!["/ip4/127.0.0.2/tcp/1234"]) +} + +#[test] +fn successful_dial_reaches_a_node() { + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let dial_res = swarm.dial(addr, Handler::default()); + assert!(dial_res.is_ok()); + + // Poll the swarm until we get a `NodeReached` then assert on the peer: + // it's there and it's connected. + let swarm = Arc::new(Mutex::new(swarm)); + + let mut rt = Runtime::new().unwrap(); + let mut peer_id : Option = None; + // Drive forward until we're Connected + while peer_id.is_none() { + let swarm_fut = swarm.clone(); + peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { + let mut swarm = swarm_fut.lock(); + let poll_res = swarm.poll(); + match poll_res { + Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + _ => Ok(Async::Ready(None)) + } + })).expect("tokio works"); + } + + let mut swarm = swarm.lock(); + let peer = swarm.peer(peer_id.unwrap()); + assert_matches!(peer, Peer::Connected(PeerConnected{..})); +} + +#[test] +fn num_incoming_negotiated() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + + // Set up listener to see an incoming connection + transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer))))); + + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); + swarm.listen_on("/memory".parse().unwrap()).unwrap(); + + // no incoming yet + assert_eq!(swarm.num_incoming_negotiated(), 0); + + let mut rt = Runtime::new().unwrap(); + let swarm = Arc::new(Mutex::new(swarm)); + let swarm_fut = swarm.clone(); + let fut = future::poll_fn(move || -> Poll<_, ()> { + let mut swarm_fut = swarm_fut.lock(); + assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => { + incoming.accept(Handler::default()); + }); + + Ok(Async::Ready(())) + }); + rt.block_on(fut).expect("tokio works"); + let swarm = swarm.lock(); + // Now there's an incoming connection + assert_eq!(swarm.num_incoming_negotiated(), 1); +} + +#[test] +fn broadcasted_events_reach_active_nodes() { + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Pending); + muxer.set_outbound_connection_state(DummyConnectionState::Opened); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let mut handler = Handler::default(); + handler.next_states = vec![HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))),]; + let dial_result = swarm.dial(addr, handler); + assert!(dial_result.is_ok()); + + swarm.broadcast_event(&InEvent::NextState); + let swarm = Arc::new(Mutex::new(swarm)); + let mut rt = Runtime::new().unwrap(); + let mut peer_id : Option = None; + while peer_id.is_none() { + let swarm_fut = swarm.clone(); + peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { + let mut swarm = swarm_fut.lock(); + let poll_res = swarm.poll(); + match poll_res { + Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + _ => Ok(Async::Ready(None)) + } + })).expect("tokio works"); + } + + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + match swarm.poll() { + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::NodeEvent { peer_id: _, event: inner_event } => { + // The event we sent reached the node and triggered sending the out event we told it to return + assert_matches!(inner_event, OutEvent::Custom("from handler 1")); + }); + Ok(Async::Ready(false)) + }, + _ => Ok(Async::Ready(true)) + } + })).expect("tokio works"); + } +} + +#[test] +fn querying_for_pending_peer() { + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); + let peer_id = PeerId::random(); + let peer = swarm.peer(peer_id.clone()); + assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. })); + let addr = "/memory".parse().expect("bad multiaddr"); + let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default()); + assert!(pending_peer.is_ok()); + assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } )); +} + +#[test] +fn querying_for_unknown_peer() { + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); + let peer_id = PeerId::random(); + let peer = swarm.peer(peer_id.clone()); + assert_matches!(peer, Peer::NotConnected( PeerNotConnected { nodes: _, peer_id: node_peer_id }) => { + assert_eq!(node_peer_id, peer_id); + }); +} + +#[test] +fn querying_for_connected_peer() { + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random()); + + // Dial a node + let addr = "/ip4/127.0.0.1/tcp/1234".parse().expect("bad multiaddr"); + swarm.dial(addr, Handler::default()).expect("dialing works"); + + let swarm = Arc::new(Mutex::new(swarm)); + let mut rt = Runtime::new().unwrap(); + // Drive it forward until we connect; extract the new PeerId. + let mut peer_id : Option = None; + while peer_id.is_none() { + let swarm_fut = swarm.clone(); + peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { + let mut swarm = swarm_fut.lock(); + let poll_res = swarm.poll(); + match poll_res { + Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + _ => Ok(Async::Ready(None)) + } + })).expect("tokio works"); + } + + // We're connected. + let mut swarm = swarm.lock(); + let peer = swarm.peer(peer_id.unwrap()); + assert_matches!(peer, Peer::Connected( PeerConnected { .. } )); +} + +#[test] +fn poll_with_closed_listener() { + let mut transport = DummyTransport::new(); + // Set up listener to be closed + transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(None))); + + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); + swarm.listen_on("/memory".parse().unwrap()).unwrap(); + + let mut rt = Runtime::new().unwrap(); + let swarm = Arc::new(Mutex::new(swarm)); + + let swarm_fut = swarm.clone(); + let fut = future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::ListenerClosed { .. } )); + Ok(Async::Ready(())) + }); + rt.block_on(fut).expect("tokio works"); +} + +#[test] +fn unknown_peer_that_is_unreachable_yields_unknown_peer_dial_error() { + let mut transport = DummyTransport::new(); + transport.make_dial_fail(); + let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()); + let addr = "/memory".parse::().expect("bad multiaddr"); + let handler = Handler::default(); + let dial_result = swarm.dial(addr, handler); + assert!(dial_result.is_ok()); + + let swarm = Arc::new(Mutex::new(swarm)); + let mut rt = Runtime::new().unwrap(); + // Drive it forward until we hear back from the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::UnknownPeerDialError { .. } ); + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } +} + +#[test] +fn known_peer_that_is_unreachable_yields_dial_error() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + transport.set_next_peer_id(&peer_id); + transport.make_dial_fail(); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()))); + + { + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. })); + let addr = "/memory".parse::().expect("bad multiaddr"); + let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default()); + assert!(pending_peer.is_ok()); + assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } )); + } + let mut rt = Runtime::new().unwrap(); + // Drive it forward until we hear back from the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + let peer_id = peer_id.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + let failed_peer_id = assert_matches!( + event, + RawSwarmEvent::DialError { remain_addrs_attempt: _, peer_id: failed_peer_id, .. } => failed_peer_id + ); + assert_eq!(peer_id, failed_peer_id); + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } +} + +#[test] +fn yields_node_error_when_there_is_an_error_after_successful_connect() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + transport.set_next_peer_id(&peer_id); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()))); + + { + // Set up an outgoing connection with a PeerId we know + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + let addr = "/unix/reachable".parse().expect("bad multiaddr"); + let mut handler = Handler::default(); + // Force an error + handler.next_states = vec![ HandlerState::Err ]; + peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); + } + + // Ensure we run on a single thread + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Drive it forward until we connect to the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + // Push the Handler into an error state on the next poll + swarm.broadcast_event(&InEvent::NextState); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::Connected { .. }); + // We're connected, we can move on + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + + // Poll again. It is going to be a NodeError because of how the + // handler's next state was set up. + let swarm_fut = swarm.clone(); + let expected_peer_id = peer_id.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => { + assert_eq!(peer_id, expected_peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); +} + +#[test] +fn yields_node_closed_when_the_node_closes_after_successful_connect() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + transport.set_next_peer_id(&peer_id); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()))); + + { + // Set up an outgoing connection with a PeerId we know + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + let addr = "/unix/reachable".parse().expect("bad multiaddr"); + let mut handler = Handler::default(); + // Force handler to close + handler.next_states = vec![ HandlerState::Ready(None) ]; + peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); + } + + // Ensure we run on a single thread + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Drive it forward until we connect to the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + // Push the Handler into the closed state on the next poll + swarm.broadcast_event(&InEvent::NextState); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::Connected { .. }); + // We're connected, we can move on + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + + // Poll again. It is going to be a NodeClosed because of how the + // handler's next state was set up. + let swarm_fut = swarm.clone(); + let expected_peer_id = peer_id.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => { + assert_eq!(peer_id, expected_peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); +} + +#[test] +fn local_prio_equivalence_relation() { + for _ in 0..1000 { + let a = PeerId::random(); + let b = PeerId::random(); + assert_ne!(has_dial_prio(&a, &b), has_dial_prio(&b, &a)); + } +}