diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 6ba78435..52f3f31a 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -469,3 +469,362 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> { self.inner.close(); } } + + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::{self}; + use tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; + use tests::dummy_handler::{Handler, InEvent, OutEvent, HandlerState}; + use PublicKey; + use tokio::runtime::current_thread::Runtime; + use tokio::runtime::Builder; + use nodes::NodeHandlerEvent; + use std::sync::Arc; + use parking_lot::Mutex; + use void::Void; + + type TestCollectionStream = CollectionStream; + + #[test] + fn has_connection_is_false_before_a_connection_has_been_made() { + let cs = TestCollectionStream::new(); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + assert!(!cs.has_connection(&peer_id)); + } + + #[test] + fn connections_is_empty_before_connecting() { + let cs = TestCollectionStream::new(); + assert!(cs.connections().next().is_none()); + } + + #[test] + fn retrieving_a_peer_is_none_if_peer_is_missing_or_not_connected() { + let mut cs = TestCollectionStream::new(); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + assert!(cs.peer_mut(&peer_id).is_none()); + + let handler = Handler::default(); + let fut = future::ok::<_, Void>((peer_id.clone(), DummyMuxer::new())); + cs.add_reach_attempt(fut, handler); + assert!(cs.peer_mut(&peer_id).is_none()); // task is pending + } + + #[test] + fn collection_stream_reaches_the_nodes() { + let mut cs = TestCollectionStream::new(); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Pending); + muxer.set_outbound_connection_state(DummyConnectionState::Opened); + + let fut = future::ok::<_, Void>((peer_id, muxer)); + cs.add_reach_attempt(fut, Handler::default()); + let mut rt = Runtime::new().unwrap(); + let mut poll_count = 0; + let fut = future::poll_fn(move || -> Poll<(), ()> { + poll_count += 1; + let event = cs.poll(); + match poll_count { + 1 => assert_matches!(event, Async::NotReady), + 2 => { + assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(_))); + return Ok(Async::Ready(())); // stop + } + _ => unreachable!() + } + Ok(Async::NotReady) + }); + rt.block_on(fut).unwrap(); + } + + #[test] + fn accepting_a_node_yields_new_entry() { + let mut cs = TestCollectionStream::new(); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + let fut = future::ok::<_, Void>((peer_id.clone(), DummyMuxer::new())); + cs.add_reach_attempt(fut, Handler::default()); + + let mut rt = Runtime::new().unwrap(); + let mut poll_count = 0; + let fut = future::poll_fn(move || -> Poll<(), ()> { + poll_count += 1; + { + let event = cs.poll(); + match poll_count { + 1 => { + assert_matches!(event, Async::NotReady); + return Ok(Async::NotReady) + } + 2 => { + assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + assert_matches!(reach_ev.parent, CollectionStream{..}); + let (accept_ev, accepted_peer_id) = reach_ev.accept(); + assert_eq!(accepted_peer_id, peer_id); + assert_matches!(accept_ev, CollectionNodeAccept::NewEntry); + }); + } + _ => unreachable!() + } + } + assert!(cs.peer_mut(&peer_id).is_some(), "peer is not in the list"); + assert!(cs.has_connection(&peer_id), "peer is not connected"); + assert_eq!(cs.connections().collect::>(), vec![&peer_id]); + Ok(Async::Ready(())) + }); + rt.block_on(fut).expect("running the future works"); + } + + #[test] + fn events_in_a_node_reaches_the_collection_stream() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let task_peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + + let mut handler = Handler::default(); + handler.state = Some(HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("init"))))); + let handler_states = vec![ + HandlerState::Err, + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 3") ))), + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 2") ))), + HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))), + ]; + handler.next_states = handler_states; + + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Pending); + muxer.set_outbound_connection_state(DummyConnectionState::Opened); + + let fut = future::ok::<_, Void>((task_peer_id.clone(), muxer)); + cs.lock().add_reach_attempt(fut, handler); + + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(event, OutEvent::Custom("init")); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(event, OutEvent::Custom("from handler 1")); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + cs.broadcast_event(&InEvent::NextState); + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(event, OutEvent::Custom("from handler 2")); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + } + + #[test] + fn task_closed_with_error_while_task_is_pending_yields_reach_error() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let task_inner_fut = future::err(std::io::Error::new(std::io::ErrorKind::Other, "inner fut error")); + let reach_attempt_id = cs.lock().add_reach_attempt(task_inner_fut, Handler::default()); + + let mut rt = Builder::new().core_threads(1).build().unwrap(); + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + Ok(Async::Ready(())) + })).expect("tokio works"); + + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::Ready(collection_ev) => { + assert_matches!(collection_ev, CollectionEvent::ReachError {id, error, ..} => { + assert_eq!(id, reach_attempt_id); + assert_eq!(error.to_string(), "inner fut error"); + }); + + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + } + + #[test] + fn task_closed_with_error_when_task_is_connected_yields_node_error() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + let muxer = DummyMuxer::new(); + let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer)); + let mut handler = Handler::default(); + handler.next_states = vec![HandlerState::Err]; // triggered when sending a NextState event + + cs.lock().add_reach_attempt(task_inner_fut, handler); + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Kick it off + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + // send an event so the Handler errors in two polls + cs.broadcast_event(&InEvent::NextState); + Ok(Async::Ready(())) + })).expect("tokio works"); + + // Accept the new node + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // NodeReached, accept the connection so the task transitions from Pending to Connected + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + assert!(cs.lock().has_connection(&peer_id)); + + // Assert the node errored + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::Ready(collection_ev) => { + assert_matches!(collection_ev, CollectionEvent::NodeError{..}); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + } + + #[test] + fn task_closed_ok_when_task_is_connected_yields_node_closed() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + let muxer = DummyMuxer::new(); + let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer)); + let mut handler = Handler::default(); + handler.next_states = vec![HandlerState::Ready(None)]; // triggered when sending a NextState event + + cs.lock().add_reach_attempt(task_inner_fut, handler); + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Kick it off + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + // send an event so the Handler errors in two polls + cs.broadcast_event(&InEvent::NextState); + Ok(Async::Ready(())) + })).expect("tokio works"); + + // Accept the new node + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // NodeReached, accept the connection so the task transitions from Pending to Connected + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + assert!(cs.lock().has_connection(&peer_id)); + + // Next poll, the Handler returns Async::Ready(None) because of the + // NextState message sent before. + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // Node is closed normally: TaskClosed, Ok(()) + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event }) => { + assert_eq!(peer_id_in_event, peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + } + + #[test] + fn interrupting_a_pending_connection_attempt_is_ok() { + let mut cs = TestCollectionStream::new(); + let fut = future::empty::<_, Void>(); + let reach_id = cs.add_reach_attempt(fut, Handler::default()); + let interrupt = cs.interrupt(reach_id); + assert!(interrupt.is_ok()); + } + + #[test] + fn interrupting_a_connection_attempt_twice_is_err() { + let mut cs = TestCollectionStream::new(); + let fut = future::empty::<_, Void>(); + let reach_id = cs.add_reach_attempt(fut, Handler::default()); + assert!(cs.interrupt(reach_id).is_ok()); + assert!(cs.interrupt(reach_id).is_err()); + } + + #[test] + fn interrupting_an_established_connection_is_err() { + let cs = Arc::new(Mutex::new(TestCollectionStream::new())); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id(); + let muxer = DummyMuxer::new(); + let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer)); + let handler = Handler::default(); + + let reach_id = cs.lock().add_reach_attempt(task_inner_fut, handler); + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Kick it off + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + assert_matches!(cs.poll(), Async::NotReady); + // send an event so the Handler errors in two polls + Ok(Async::Ready(())) + })).expect("tokio works"); + + // Accept the new node + let cs_fut = cs.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut cs = cs_fut.lock(); + // NodeReached, accept the connection so the task transitions from Pending to Connected + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { + reach_ev.accept(); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + + assert!(cs.lock().has_connection(&peer_id), "Connection was not established"); + + assert!(cs.lock().interrupt(reach_id).is_err(), "Could interrupt a reach attempt that already completed"); + } +} diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index 225a5562..941f3fe1 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -321,12 +321,9 @@ mod tests { use super::*; use tokio::runtime::current_thread; use tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; - use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent}; + use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode}; use std::marker::PhantomData; - // Concrete `HandledNode` - type TestHandledNode = HandledNode; - struct TestBuilder { muxer: DummyMuxer, handler: Handler,