Tests for CollectionStream (#588)

* Add unit tests for core::nodes::NodeStream

* Move DummyMuxer to core/tests

* Address grumbles

* Impl Debug for SubstreamRef<P>

* Add test for poll()

* Don't need to open a substream

* pretty printer test

* More tests for NodeStream poll()

* ListenerStream unit tests: transport() and listeners()

* Tests for nodes/listeners.rs

* Add a few tests to help illustrate the "drowning" behaviour of busy listeners

* Tests for HandledNode

* Address grumbles

* Remove non-project specific stuff

* Address grumbles

* Prefer freestanding function

* Untangle the code for old shutdown test from the new tests
Add HandlerState and use it in TestBuilder
Shorter test names

* WIP – tests pass

* Use a newtype to lighten up the function signatures a bit
Test NotReady case

* Cleanup Event enum
Track events as they reach the Handler
Describe complex test logic

* Assert on the event trace

* More tests for poll()

* Switch to using usize as the OutboundOpenInfo so we can assert on event contents
More tests for poll()

* whitespace

* whitespace and spelling

* WIP tests for handled_node_tasks:Task

* wip

* Move Handler related code to dummy_handler

* Sort out the events going to/from node

* WIP tests for poll

* Add a TestBuilder for NodeTask tests
More NodeTask tests

* Fixes broken test after upstream changes

* Clarify the behaviour of is_shutting_down

* Fix broken test

* Test for task exit when muxers in- and outbound are closed

* Add question about impossible Async::NotReady

* Fix tests after recent changes on master

* Upstream changes

* Tests for HandledNodesTasks

* Add test for HandledNodesTasks poll

* Test we reach all nodes and then closed all nodes

* Test event emission by HandledNodesTasks

* Test starting CollectionStream

* Test that we can reach a peer after a successful connection

* Assert collection contains what we expect

* Test handler events get to the collectionstream

* Remaining tests for CollectionStream

* Run tests on single threaded runtime to avoid intermittent failures

* Remove call to shutdown – not needed

* No need to specify tokio version

* Rustfmt

* More rustfmt

* Less noise

* cleanup

* Address grumbles

* Better debug impl for HandledNodesTasks

* Remove tests for Task we don't need
Test Task.send_event() and id() using a HandledNodesTasks

* Rename test builders

* Don't organise tests in submodules

* whitespace

* Revert changes to Debug impl for HandledNodesTasks

* Cleanup

* cleanup

* Address concerns

* Fix tests
This commit is contained in:
David
2018-11-16 12:38:23 +01:00
committed by Pierre Krieger
parent 413fb79208
commit 8b6bdd5554
2 changed files with 360 additions and 4 deletions

View File

@ -469,3 +469,362 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> {
self.inner.close();
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::future::{self};
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use tests::dummy_handler::{Handler, InEvent, OutEvent, HandlerState};
use PublicKey;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Builder;
use nodes::NodeHandlerEvent;
use std::sync::Arc;
use parking_lot::Mutex;
use void::Void;
type TestCollectionStream = CollectionStream<InEvent, OutEvent, Handler>;
#[test]
fn has_connection_is_false_before_a_connection_has_been_made() {
let cs = TestCollectionStream::new();
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
assert!(!cs.has_connection(&peer_id));
}
#[test]
fn connections_is_empty_before_connecting() {
let cs = TestCollectionStream::new();
assert!(cs.connections().next().is_none());
}
#[test]
fn retrieving_a_peer_is_none_if_peer_is_missing_or_not_connected() {
let mut cs = TestCollectionStream::new();
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
assert!(cs.peer_mut(&peer_id).is_none());
let handler = Handler::default();
let fut = future::ok::<_, Void>((peer_id.clone(), DummyMuxer::new()));
cs.add_reach_attempt(fut, handler);
assert!(cs.peer_mut(&peer_id).is_none()); // task is pending
}
#[test]
fn collection_stream_reaches_the_nodes() {
let mut cs = TestCollectionStream::new();
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Opened);
let fut = future::ok::<_, Void>((peer_id, muxer));
cs.add_reach_attempt(fut, Handler::default());
let mut rt = Runtime::new().unwrap();
let mut poll_count = 0;
let fut = future::poll_fn(move || -> Poll<(), ()> {
poll_count += 1;
let event = cs.poll();
match poll_count {
1 => assert_matches!(event, Async::NotReady),
2 => {
assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(_)));
return Ok(Async::Ready(())); // stop
}
_ => unreachable!()
}
Ok(Async::NotReady)
});
rt.block_on(fut).unwrap();
}
#[test]
fn accepting_a_node_yields_new_entry() {
let mut cs = TestCollectionStream::new();
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
let fut = future::ok::<_, Void>((peer_id.clone(), DummyMuxer::new()));
cs.add_reach_attempt(fut, Handler::default());
let mut rt = Runtime::new().unwrap();
let mut poll_count = 0;
let fut = future::poll_fn(move || -> Poll<(), ()> {
poll_count += 1;
{
let event = cs.poll();
match poll_count {
1 => {
assert_matches!(event, Async::NotReady);
return Ok(Async::NotReady)
}
2 => {
assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
assert_matches!(reach_ev.parent, CollectionStream{..});
let (accept_ev, accepted_peer_id) = reach_ev.accept();
assert_eq!(accepted_peer_id, peer_id);
assert_matches!(accept_ev, CollectionNodeAccept::NewEntry);
});
}
_ => unreachable!()
}
}
assert!(cs.peer_mut(&peer_id).is_some(), "peer is not in the list");
assert!(cs.has_connection(&peer_id), "peer is not connected");
assert_eq!(cs.connections().collect::<Vec<&PeerId>>(), vec![&peer_id]);
Ok(Async::Ready(()))
});
rt.block_on(fut).expect("running the future works");
}
#[test]
fn events_in_a_node_reaches_the_collection_stream() {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let task_peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
let mut handler = Handler::default();
handler.state = Some(HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("init")))));
let handler_states = vec![
HandlerState::Err,
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 3") ))),
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 2") ))),
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))),
];
handler.next_states = handler_states;
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Opened);
let fut = future::ok::<_, Void>((task_peer_id.clone(), muxer));
cs.lock().add_reach_attempt(fut, handler);
let mut rt = Builder::new().core_threads(1).build().unwrap();
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::NotReady);
Ok(Async::Ready(()))
})).expect("tokio works");
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept();
});
Ok(Async::Ready(()))
})).expect("tokio works");
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => {
assert_matches!(event, OutEvent::Custom("init"));
});
Ok(Async::Ready(()))
})).expect("tokio works");
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => {
assert_matches!(event, OutEvent::Custom("from handler 1"));
});
Ok(Async::Ready(()))
})).expect("tokio works");
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => {
assert_matches!(event, OutEvent::Custom("from handler 2"));
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn task_closed_with_error_while_task_is_pending_yields_reach_error() {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let task_inner_fut = future::err(std::io::Error::new(std::io::ErrorKind::Other, "inner fut error"));
let reach_attempt_id = cs.lock().add_reach_attempt(task_inner_fut, Handler::default());
let mut rt = Builder::new().core_threads(1).build().unwrap();
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::NotReady);
Ok(Async::Ready(()))
})).expect("tokio works");
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::Ready(collection_ev) => {
assert_matches!(collection_ev, CollectionEvent::ReachError {id, error, ..} => {
assert_eq!(id, reach_attempt_id);
assert_eq!(error.to_string(), "inner fut error");
});
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn task_closed_with_error_when_task_is_connected_yields_node_error() {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
let muxer = DummyMuxer::new();
let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer));
let mut handler = Handler::default();
handler.next_states = vec![HandlerState::Err]; // triggered when sending a NextState event
cs.lock().add_reach_attempt(task_inner_fut, handler);
let mut rt = Builder::new().core_threads(1).build().unwrap();
// Kick it off
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::NotReady);
// send an event so the Handler errors in two polls
cs.broadcast_event(&InEvent::NextState);
Ok(Async::Ready(()))
})).expect("tokio works");
// Accept the new node
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
// NodeReached, accept the connection so the task transitions from Pending to Connected
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept();
});
Ok(Async::Ready(()))
})).expect("tokio works");
assert!(cs.lock().has_connection(&peer_id));
// Assert the node errored
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::Ready(collection_ev) => {
assert_matches!(collection_ev, CollectionEvent::NodeError{..});
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn task_closed_ok_when_task_is_connected_yields_node_closed() {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
let muxer = DummyMuxer::new();
let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer));
let mut handler = Handler::default();
handler.next_states = vec![HandlerState::Ready(None)]; // triggered when sending a NextState event
cs.lock().add_reach_attempt(task_inner_fut, handler);
let mut rt = Builder::new().core_threads(1).build().unwrap();
// Kick it off
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::NotReady);
// send an event so the Handler errors in two polls
cs.broadcast_event(&InEvent::NextState);
Ok(Async::Ready(()))
})).expect("tokio works");
// Accept the new node
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
// NodeReached, accept the connection so the task transitions from Pending to Connected
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept();
});
Ok(Async::Ready(()))
})).expect("tokio works");
assert!(cs.lock().has_connection(&peer_id));
// Next poll, the Handler returns Async::Ready(None) because of the
// NextState message sent before.
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
// Node is closed normally: TaskClosed, Ok(())
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event }) => {
assert_eq!(peer_id_in_event, peer_id);
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn interrupting_a_pending_connection_attempt_is_ok() {
let mut cs = TestCollectionStream::new();
let fut = future::empty::<_, Void>();
let reach_id = cs.add_reach_attempt(fut, Handler::default());
let interrupt = cs.interrupt(reach_id);
assert!(interrupt.is_ok());
}
#[test]
fn interrupting_a_connection_attempt_twice_is_err() {
let mut cs = TestCollectionStream::new();
let fut = future::empty::<_, Void>();
let reach_id = cs.add_reach_attempt(fut, Handler::default());
assert!(cs.interrupt(reach_id).is_ok());
assert!(cs.interrupt(reach_id).is_err());
}
#[test]
fn interrupting_an_established_connection_is_err() {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 1 }).collect()).into_peer_id();
let muxer = DummyMuxer::new();
let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer));
let handler = Handler::default();
let reach_id = cs.lock().add_reach_attempt(task_inner_fut, handler);
let mut rt = Builder::new().core_threads(1).build().unwrap();
// Kick it off
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::NotReady);
// send an event so the Handler errors in two polls
Ok(Async::Ready(()))
})).expect("tokio works");
// Accept the new node
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
// NodeReached, accept the connection so the task transitions from Pending to Connected
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept();
});
Ok(Async::Ready(()))
})).expect("tokio works");
assert!(cs.lock().has_connection(&peer_id), "Connection was not established");
assert!(cs.lock().interrupt(reach_id).is_err(), "Could interrupt a reach attempt that already completed");
}
}

View File

@ -321,12 +321,9 @@ mod tests {
use super::*;
use tokio::runtime::current_thread;
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent};
use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode};
use std::marker::PhantomData;
// Concrete `HandledNode`
type TestHandledNode = HandledNode<DummyMuxer, Handler>;
struct TestBuilder {
muxer: DummyMuxer,
handler: Handler,