mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-26 16:21:39 +00:00
Tests for HandledNodesTasks (#584)
* Add unit tests for core::nodes::NodeStream * Move DummyMuxer to core/tests * Address grumbles * Impl Debug for SubstreamRef<P> * Add test for poll() * Don't need to open a substream * pretty printer test * More tests for NodeStream poll() * ListenerStream unit tests: transport() and listeners() * Tests for nodes/listeners.rs * Add a few tests to help illustrate the "drowning" behaviour of busy listeners * Tests for HandledNode * Address grumbles * Remove non-project specific stuff * Address grumbles * Prefer freestanding function * Untangle the code for old shutdown test from the new tests Add HandlerState and use it in TestBuilder Shorter test names * WIP – tests pass * Use a newtype to lighten up the function signatures a bit Test NotReady case * Cleanup Event enum Track events as they reach the Handler Describe complex test logic * Assert on the event trace * More tests for poll() * Switch to using usize as the OutboundOpenInfo so we can assert on event contents More tests for poll() * whitespace * whitespace and spelling * WIP tests for handled_node_tasks:Task * wip * Move Handler related code to dummy_handler * Sort out the events going to/from node * WIP tests for poll * Add a TestBuilder for NodeTask tests More NodeTask tests * Fixes broken test after upstream changes * Clarify the behaviour of is_shutting_down * Fix broken test * Test for task exit when muxers in- and outbound are closed * Add question about impossible Async::NotReady * Fix tests after recent changes on master * Upstream changes * Tests for HandledNodesTasks * Add test for HandledNodesTasks poll * Test we reach all nodes and then closed all nodes * Test event emission by HandledNodesTasks * Rustfmt * More rustfmt * Less noise * cleanup * Address grumbles * Better debug impl for HandledNodesTasks * Remove tests for Task we don't need Test Task.send_event() and id() using a HandledNodesTasks * Rename test builders * Don't organise tests in submodules * whitespace * Revert changes to Debug impl for HandledNodesTasks
This commit is contained in:
@ -277,7 +277,9 @@ where
|
||||
if self.node.get_ref().is_outbound_open() {
|
||||
match self.node.get_mut().open_substream(user_data) {
|
||||
Ok(()) => (),
|
||||
Err(user_data) => self.handler.inject_outbound_closed(user_data),
|
||||
Err(user_data) => {
|
||||
self.handler.inject_outbound_closed(user_data)
|
||||
},
|
||||
}
|
||||
} else {
|
||||
self.handler.inject_outbound_closed(user_data);
|
||||
@ -305,7 +307,7 @@ mod tests {
|
||||
use super::*;
|
||||
use tokio::runtime::current_thread;
|
||||
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
|
||||
use tests::dummy_handler::{Handler, HandlerState, Event};
|
||||
use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent};
|
||||
use std::marker::PhantomData;
|
||||
|
||||
// Concrete `HandledNode`
|
||||
@ -433,7 +435,7 @@ mod tests {
|
||||
.with_muxer_inbound_state(DummyConnectionState::Closed)
|
||||
.handled_node();
|
||||
|
||||
let event = Event::Custom("banana");
|
||||
let event = InEvent::Custom("banana");
|
||||
handled.inject_event(event.clone());
|
||||
assert_eq!(handled.handler().events, vec![event]);
|
||||
}
|
||||
@ -528,7 +530,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn poll_with_unready_node_stream_and_handler_emits_custom_event() {
|
||||
let expected_event = Some(NodeHandlerEvent::Custom(Event::Custom("pineapple")));
|
||||
let expected_event = Some(NodeHandlerEvent::Custom(OutEvent::Custom("pineapple")));
|
||||
let mut handled = TestBuilder::new()
|
||||
// make NodeStream return NotReady
|
||||
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||
@ -537,7 +539,7 @@ mod tests {
|
||||
.handled_node();
|
||||
|
||||
assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => {
|
||||
assert_matches!(event, Event::Custom("pineapple"))
|
||||
assert_matches!(event, OutEvent::Custom("pineapple"))
|
||||
});
|
||||
}
|
||||
|
||||
@ -552,10 +554,10 @@ mod tests {
|
||||
|
||||
set_next_handler_outbound_state(
|
||||
&mut handled,
|
||||
HandlerState::Ready(Some(NodeHandlerEvent::Custom(Event::Custom("pear"))))
|
||||
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear"))))
|
||||
);
|
||||
handled.poll().expect("poll works");
|
||||
assert_eq!(handled.handler().events, vec![Event::OutboundClosed]);
|
||||
assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -572,7 +574,7 @@ mod tests {
|
||||
// closed, `inbound_finished` is set to true.
|
||||
// - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls
|
||||
// `inject_inbound_close`, but that's irrelevant here)
|
||||
// - back in `poll()` we cal `handler.poll()` which does nothing because
|
||||
// - back in `poll()` we call `handler.poll()` which does nothing because
|
||||
// `HandlerState` is `NotReady`: loop continues
|
||||
// - polls the node again which now skips the inbound block because
|
||||
// `inbound_finished` is true.
|
||||
@ -593,7 +595,7 @@ mod tests {
|
||||
// – which in turn makes the HandledNode to yield Async::Ready(None) as well
|
||||
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
|
||||
assert_eq!(handled.handler().events, vec![
|
||||
Event::InboundClosed, Event::OutboundClosed
|
||||
InEvent::InboundClosed, InEvent::OutboundClosed
|
||||
]);
|
||||
}
|
||||
|
||||
@ -606,7 +608,7 @@ mod tests {
|
||||
|
||||
assert_eq!(h.handler().events, vec![]);
|
||||
let _ = h.poll();
|
||||
assert_eq!(h.handler().events, vec![Event::InboundClosed]);
|
||||
assert_eq!(h.handler().events, vec![InEvent::InboundClosed]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -620,7 +622,7 @@ mod tests {
|
||||
|
||||
assert_eq!(h.handler().events, vec![]);
|
||||
let _ = h.poll();
|
||||
assert_eq!(h.handler().events, vec![Event::OutboundClosed]);
|
||||
assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -634,7 +636,7 @@ mod tests {
|
||||
|
||||
assert_eq!(h.handler().events, vec![]);
|
||||
let _ = h.poll();
|
||||
assert_eq!(h.handler().events, vec![Event::Substream(Some(1))]);
|
||||
assert_eq!(h.handler().events, vec![InEvent::Substream(Some(1))]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -647,6 +649,6 @@ mod tests {
|
||||
|
||||
assert_eq!(h.handler().events, vec![]);
|
||||
let _ = h.poll();
|
||||
assert_eq!(h.handler().events, vec![Event::Substream(None)]);
|
||||
assert_eq!(h.handler().events, vec![InEvent::Substream(None)]);
|
||||
}
|
||||
}
|
||||
|
@ -39,8 +39,8 @@ use PeerId;
|
||||
// This collection of nodes spawns a task for each individual node to process. This means that
|
||||
// events happen on the background at the same time as the `HandledNodesTasks` is being polled.
|
||||
//
|
||||
// In order to make the API non-racy and avoid issues, we totally separate the state in the
|
||||
// `HandledNodesTasks` and the states that the task nodes can access. They are only allowed to
|
||||
// In order to make the API non-racy and avoid issues, we completely separate the state in the
|
||||
// `HandledNodesTasks` from the states that the task nodes can access. They are only allowed to
|
||||
// exchange messages. The state in the `HandledNodesTasks` is therefore delayed compared to the
|
||||
// tasks, and is updated only when `poll()` is called.
|
||||
//
|
||||
@ -56,6 +56,7 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent, THandler> {
|
||||
/// the task. It is possible that we receive messages from tasks that used to be in this list
|
||||
/// but no longer are, in which case we should ignore them.
|
||||
tasks: FnvHashMap<TaskId, mpsc::UnboundedSender<TInEvent>>,
|
||||
|
||||
/// Identifier for the next task to spawn.
|
||||
next_task_id: TaskId,
|
||||
|
||||
@ -172,7 +173,7 @@ impl<TInEvent, TOutEvent, THandler> HandledNodesTasks<TInEvent, TOutEvent, THand
|
||||
{
|
||||
for sender in self.tasks.values() {
|
||||
// Note: it is possible that sending an event fails if the background task has already
|
||||
// finished, but the local state hasn't reflected that yet becaues it hasn't been
|
||||
// finished, but the local state hasn't reflected that yet because it hasn't been
|
||||
// polled. This is not an error situation.
|
||||
let _ = sender.unbounded_send(event.clone());
|
||||
}
|
||||
@ -200,7 +201,6 @@ impl<TInEvent, TOutEvent, THandler> HandledNodesTasks<TInEvent, TOutEvent, THand
|
||||
for to_spawn in self.to_spawn.drain() {
|
||||
tokio_executor::spawn(to_spawn);
|
||||
}
|
||||
|
||||
loop {
|
||||
match self.events_rx.poll() {
|
||||
Ok(Async::Ready(Some((message, task_id)))) => {
|
||||
@ -369,7 +369,6 @@ where
|
||||
Err(_) => unreachable!("An UnboundedReceiver never errors"),
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether dialing succeeded.
|
||||
match future.poll() {
|
||||
Ok(Async::Ready((peer_id, muxer))) => {
|
||||
@ -404,7 +403,7 @@ where
|
||||
match self.in_events_rx.poll() {
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(Some(event))) => {
|
||||
node.inject_event(event);
|
||||
node.inject_event(event)
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
// Node closed by the external API; start shutdown process.
|
||||
@ -450,3 +449,350 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use std::io;
|
||||
|
||||
use futures::future::{self, FutureResult};
|
||||
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use nodes::handled_node::NodeHandlerEvent;
|
||||
use rand::random;
|
||||
use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode};
|
||||
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
|
||||
use tokio::runtime::Builder;
|
||||
use tokio::runtime::current_thread::Runtime;
|
||||
use {PeerId, PublicKey};
|
||||
|
||||
type TestNodeTask = NodeTask<
|
||||
FutureResult<(PeerId, DummyMuxer), IoError>,
|
||||
DummyMuxer,
|
||||
Handler,
|
||||
InEvent,
|
||||
OutEvent,
|
||||
>;
|
||||
|
||||
struct NodeTaskTestBuilder {
|
||||
task_id: TaskId,
|
||||
inner_node: Option<TestHandledNode>,
|
||||
inner_fut: Option<FutureResult<(PeerId, DummyMuxer), IoError>>,
|
||||
}
|
||||
|
||||
impl NodeTaskTestBuilder {
|
||||
fn new() -> Self {
|
||||
NodeTaskTestBuilder {
|
||||
task_id: TaskId(123),
|
||||
inner_node: None,
|
||||
inner_fut: {
|
||||
let peer_id = PublicKey::Rsa((0 .. 2048).map(|_| -> u8 { random() }).collect()).into_peer_id();
|
||||
Some(future::ok((peer_id, DummyMuxer::new())))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn with_inner_fut(&mut self, fut: FutureResult<(PeerId, DummyMuxer), IoError>) -> &mut Self{
|
||||
self.inner_fut = Some(fut);
|
||||
self
|
||||
}
|
||||
|
||||
fn with_task_id(&mut self, id: usize) -> &mut Self {
|
||||
self.task_id = TaskId(id);
|
||||
self
|
||||
}
|
||||
|
||||
fn node_task(&mut self) -> (
|
||||
TestNodeTask,
|
||||
UnboundedSender<InEvent>,
|
||||
UnboundedReceiver<(InToExtMessage<OutEvent, Handler>, TaskId)>,
|
||||
) {
|
||||
let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage<OutEvent, Handler>, TaskId)>();
|
||||
let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::<InEvent>();
|
||||
let inner = if self.inner_node.is_some() {
|
||||
NodeTaskInner::Node(self.inner_node.take().unwrap())
|
||||
} else {
|
||||
NodeTaskInner::Future {
|
||||
future: self.inner_fut.take().unwrap(),
|
||||
handler: Handler::default(),
|
||||
events_buffer: Vec::new(),
|
||||
}
|
||||
};
|
||||
let node_task = NodeTask {
|
||||
inner: inner,
|
||||
events_tx: events_from_node_task_tx.clone(), // events TO the outside
|
||||
in_events_rx: events_to_node_task_rx.fuse(), // events FROM the outside
|
||||
id: self.task_id,
|
||||
};
|
||||
(node_task, events_to_node_task_tx, events_from_node_task_rx)
|
||||
}
|
||||
}
|
||||
|
||||
type TestHandledNodesTasks = HandledNodesTasks<InEvent, OutEvent, Handler>;
|
||||
|
||||
struct HandledNodeTaskTestBuilder {
|
||||
muxer: DummyMuxer,
|
||||
handler: Handler,
|
||||
task_count: usize,
|
||||
}
|
||||
|
||||
impl HandledNodeTaskTestBuilder {
|
||||
fn new() -> Self {
|
||||
HandledNodeTaskTestBuilder {
|
||||
muxer: DummyMuxer::new(),
|
||||
handler: Handler::default(),
|
||||
task_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_tasks(&mut self, amt: usize) -> &mut Self {
|
||||
self.task_count = amt;
|
||||
self
|
||||
}
|
||||
fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
|
||||
self.muxer.set_inbound_connection_state(state);
|
||||
self
|
||||
}
|
||||
fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
|
||||
self.muxer.set_outbound_connection_state(state);
|
||||
self
|
||||
}
|
||||
fn with_handler_state(&mut self, state: HandlerState) -> &mut Self {
|
||||
self.handler.state = Some(state);
|
||||
self
|
||||
}
|
||||
fn with_handler_states(&mut self, states: Vec<HandlerState>) -> &mut Self {
|
||||
self.handler.next_states = states;
|
||||
self
|
||||
}
|
||||
fn handled_nodes_tasks(&mut self) -> (TestHandledNodesTasks, Vec<TaskId>) {
|
||||
let mut handled_nodes = HandledNodesTasks::new();
|
||||
let peer_id = PublicKey::Rsa((0 .. 2048).map(|_| -> u8 { random() }).collect()).into_peer_id();
|
||||
let mut task_ids = Vec::new();
|
||||
for _i in 0..self.task_count {
|
||||
let fut = future::ok((peer_id.clone(), self.muxer.clone()));
|
||||
task_ids.push(
|
||||
handled_nodes.add_reach_attempt(fut, self.handler.clone())
|
||||
);
|
||||
}
|
||||
(handled_nodes, task_ids)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Tests for NodeTask
|
||||
|
||||
#[test]
|
||||
fn task_emits_event_when_things_happen_in_the_node() {
|
||||
let (node_task, tx, mut rx) = NodeTaskTestBuilder::new()
|
||||
.with_task_id(890)
|
||||
.node_task();
|
||||
|
||||
tx.unbounded_send(InEvent::Custom("beef")).expect("send to NodeTask should work");
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
rt.spawn(node_task);
|
||||
let events = rt.block_on(rx.by_ref().take(2).collect()).expect("reading on rx should work");
|
||||
|
||||
assert_matches!(events[0], (InToExtMessage::NodeReached(_), TaskId(890)));
|
||||
assert_matches!(events[1], (InToExtMessage::NodeEvent(ref outevent), TaskId(890)) => {
|
||||
assert_matches!(outevent, OutEvent::Custom(beef) => {
|
||||
assert_eq!(beef, &"beef");
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn task_exits_when_node_errors() {
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let (node_task, _tx, rx) = NodeTaskTestBuilder::new()
|
||||
.with_inner_fut(future::err(io::Error::new(io::ErrorKind::Other, "nah")))
|
||||
.with_task_id(345)
|
||||
.node_task();
|
||||
|
||||
rt.spawn(node_task);
|
||||
let events = rt.block_on(rx.collect()).expect("rx failed");
|
||||
assert!(events.len() == 1);
|
||||
assert_matches!(events[0], (InToExtMessage::TaskClosed{..}, TaskId(345)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn task_exits_when_node_is_done() {
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let fut = {
|
||||
let peer_id = PublicKey::Rsa((0 .. 2048).map(|_| -> u8 { random() }).collect()).into_peer_id();
|
||||
let mut muxer = DummyMuxer::new();
|
||||
muxer.set_inbound_connection_state(DummyConnectionState::Closed);
|
||||
muxer.set_outbound_connection_state(DummyConnectionState::Closed);
|
||||
future::ok((peer_id, muxer))
|
||||
};
|
||||
let (node_task, tx, rx) = NodeTaskTestBuilder::new()
|
||||
.with_inner_fut(fut)
|
||||
.with_task_id(345)
|
||||
.node_task();
|
||||
|
||||
// Even though we set up the muxer outbound state to be `Closed` we
|
||||
// still need to open a substream or the outbound state will never
|
||||
// be checked (see https://github.com/libp2p/rust-libp2p/issues/609).
|
||||
// We do not have a HandledNode yet, so we can't simply call
|
||||
// `open_substream`. Instead we send a message to the NodeTask,
|
||||
// which will be buffered until the inner future resolves, then
|
||||
// it'll call `inject_event` on the handler. In the test Handler,
|
||||
// inject_event will set the next state so that it yields an
|
||||
// OutboundSubstreamRequest.
|
||||
// Back up in the HandledNode, at the next iteration we'll
|
||||
// open_substream() and iterate again. This time, node.poll() will
|
||||
// poll the muxer inbound (closed) and also outbound (because now
|
||||
// there's an entry in the outbound_streams) which will be Closed
|
||||
// (because we set up the muxer state so) and thus yield
|
||||
// Async::Ready(None) which in turn makes the NodeStream yield an
|
||||
// Async::Ready(OutboundClosed) to the HandledNode.
|
||||
// Now we're at the point where poll_inbound, poll_outbound and
|
||||
// address are all skipped and there is nothing left to do: we yield
|
||||
// Async::Ready(None) from the NodeStream. In the HandledNode,
|
||||
// Async::Ready(None) triggers a shutdown of the Handler so that it
|
||||
// also yields Async::Ready(None). Finally, the NodeTask gets a
|
||||
// Async::Ready(None) and sends a TaskClosed and returns
|
||||
// Async::Ready(()). Qed.
|
||||
|
||||
let create_outbound_substream_event = InEvent::Substream(Some(135));
|
||||
tx.unbounded_send(create_outbound_substream_event).expect("send msg works");
|
||||
rt.spawn(node_task);
|
||||
let events = rt.block_on(rx.collect()).expect("rx failed");
|
||||
|
||||
assert_eq!(events.len(), 2);
|
||||
assert_matches!(events[0].0, InToExtMessage::NodeReached(PeerId{..}));
|
||||
assert_matches!(events[1].0, InToExtMessage::TaskClosed(Ok(()), _));
|
||||
}
|
||||
|
||||
|
||||
// Tests for HandledNodeTasks
|
||||
|
||||
#[test]
|
||||
fn query_for_tasks() {
|
||||
let (mut handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new()
|
||||
.with_tasks(3)
|
||||
.handled_nodes_tasks();
|
||||
|
||||
assert_eq!(task_ids.len(), 3);
|
||||
assert_eq!(handled_nodes.task(TaskId(2)).unwrap().id(), task_ids[2]);
|
||||
assert!(handled_nodes.task(TaskId(545534)).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_event_to_task() {
|
||||
let (mut handled_nodes, _) = HandledNodeTaskTestBuilder::new()
|
||||
.with_tasks(1)
|
||||
.handled_nodes_tasks();
|
||||
|
||||
let task_id = {
|
||||
let mut task = handled_nodes.task(TaskId(0)).expect("can fetch a Task");
|
||||
task.send_event(InEvent::Custom("banana"));
|
||||
task.id()
|
||||
};
|
||||
|
||||
let mut rt = Builder::new().core_threads(1).build().unwrap();
|
||||
let mut events = rt.block_on(handled_nodes.into_future()).unwrap();
|
||||
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..});
|
||||
|
||||
events = rt.block_on(events.1.into_future()).unwrap();
|
||||
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: event_task_id, event} => {
|
||||
assert_eq!(event_task_id, task_id);
|
||||
assert_matches!(event, OutEvent::Custom("banana"));
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iterate_over_all_tasks() {
|
||||
let (handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new()
|
||||
.with_tasks(3)
|
||||
.handled_nodes_tasks();
|
||||
|
||||
let mut tasks: Vec<TaskId> = handled_nodes.tasks().collect();
|
||||
assert!(tasks.len() == 3);
|
||||
tasks.sort_by_key(|t| t.0 );
|
||||
assert_eq!(tasks, task_ids);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_reach_attempt_prepares_a_new_task() {
|
||||
let mut handled_nodes = HandledNodesTasks::new();
|
||||
assert_eq!(handled_nodes.tasks().count(), 0);
|
||||
assert_eq!(handled_nodes.to_spawn.len(), 0);
|
||||
|
||||
handled_nodes.add_reach_attempt( future::empty(), Handler::default() );
|
||||
|
||||
assert_eq!(handled_nodes.tasks().count(), 1);
|
||||
assert_eq!(handled_nodes.to_spawn.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn running_handled_tasks_reaches_the_nodes() {
|
||||
let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new()
|
||||
.with_tasks(5)
|
||||
.with_muxer_inbound_state(DummyConnectionState::Closed)
|
||||
.with_muxer_outbound_state(DummyConnectionState::Closed)
|
||||
.with_handler_state(HandlerState::Err) // stop the loop
|
||||
.handled_nodes_tasks();
|
||||
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let mut events: (Option<HandledNodesEvent<_,_>>, TestHandledNodesTasks);
|
||||
// we're running on a single thread so events are sequential: first
|
||||
// we get a NodeReached, then a TaskClosed
|
||||
for i in 0..5 {
|
||||
events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
|
||||
assert_matches!(events, (Some(HandledNodesEvent::NodeReached{..}), ref hnt) => {
|
||||
assert_matches!(hnt, HandledNodesTasks{..});
|
||||
assert_eq!(hnt.tasks().count(), 5 - i);
|
||||
});
|
||||
handled_nodes_tasks = events.1;
|
||||
events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
|
||||
assert_matches!(events, (Some(HandledNodesEvent::TaskClosed{..}), _));
|
||||
handled_nodes_tasks = events.1;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn events_in_tasks_are_emitted() {
|
||||
// States are pop()'d so they are set in reverse order by the Handler
|
||||
let handler_states = vec![
|
||||
HandlerState::Err,
|
||||
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler2") ))),
|
||||
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler") ))),
|
||||
];
|
||||
|
||||
let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new()
|
||||
.with_tasks(1)
|
||||
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||
.with_muxer_outbound_state(DummyConnectionState::Opened)
|
||||
.with_handler_states(handler_states)
|
||||
.handled_nodes_tasks();
|
||||
|
||||
let tx = {
|
||||
let mut task0 = handled_nodes_tasks.task(TaskId(0)).unwrap();
|
||||
let tx = task0.inner.get_mut();
|
||||
tx.clone()
|
||||
};
|
||||
|
||||
let mut rt = Builder::new().core_threads(1).build().unwrap();
|
||||
let mut events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
|
||||
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..});
|
||||
|
||||
tx.unbounded_send(InEvent::NextState).expect("send works");
|
||||
events = rt.block_on(events.1.into_future()).unwrap();
|
||||
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => {
|
||||
assert_matches!(event, OutEvent::Custom("from handler"));
|
||||
});
|
||||
|
||||
tx.unbounded_send(InEvent::NextState).expect("send works");
|
||||
events = rt.block_on(events.1.into_future()).unwrap();
|
||||
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => {
|
||||
assert_matches!(event, OutEvent::Custom("from handler2"));
|
||||
});
|
||||
|
||||
tx.unbounded_send(InEvent::NextState).expect("send works");
|
||||
events = rt.block_on(events.1.into_future()).unwrap();
|
||||
assert_matches!(events.0.unwrap(), HandledNodesEvent::TaskClosed{id: _, result, handler: _} => {
|
||||
assert_matches!(result, Err(_));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -25,14 +25,19 @@ use std::io::{self, Error as IoError};
|
||||
use super::dummy_muxer::DummyMuxer;
|
||||
use futures::prelude::*;
|
||||
use muxing::SubstreamRef;
|
||||
use nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
|
||||
use nodes::handled_node::{HandledNode, NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub(crate) struct Handler {
|
||||
pub events: Vec<Event>,
|
||||
/// Inspect events passed through the Handler
|
||||
pub events: Vec<InEvent>,
|
||||
/// Current state of the Handler
|
||||
pub state: Option<HandlerState>,
|
||||
/// Next state for outbound streams of the Handler
|
||||
pub next_outbound_state: Option<HandlerState>,
|
||||
/// Vec of states the Handler will assume
|
||||
pub next_states: Vec<HandlerState>,
|
||||
}
|
||||
|
||||
impl Default for Handler {
|
||||
@ -40,6 +45,7 @@ impl Default for Handler {
|
||||
Handler {
|
||||
events: Vec::new(),
|
||||
state: None,
|
||||
next_states: Vec::new(),
|
||||
next_outbound_state: None,
|
||||
}
|
||||
}
|
||||
@ -48,21 +54,36 @@ impl Default for Handler {
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub(crate) enum HandlerState {
|
||||
NotReady,
|
||||
Ready(Option<NodeHandlerEvent<usize, Event>>),
|
||||
Ready(Option<NodeHandlerEvent<usize, OutEvent>>),
|
||||
Err,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub(crate) enum Event {
|
||||
pub(crate) enum InEvent {
|
||||
/// A custom inbound event
|
||||
Custom(&'static str),
|
||||
/// A substream request with a dummy payload
|
||||
Substream(Option<usize>),
|
||||
/// Request closing of the outbound substream
|
||||
OutboundClosed,
|
||||
/// Request closing of the inbound substreams
|
||||
InboundClosed,
|
||||
/// Request the handler to move to the next state
|
||||
NextState,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub(crate) enum OutEvent {
|
||||
/// A message from the Handler upwards in the stack
|
||||
Custom(&'static str),
|
||||
}
|
||||
|
||||
// Concrete `HandledNode` parametrised for the test helpers
|
||||
pub(crate) type TestHandledNode = HandledNode<DummyMuxer, Handler>;
|
||||
|
||||
impl NodeHandler for Handler {
|
||||
type InEvent = Event;
|
||||
type OutEvent = Event;
|
||||
type InEvent = InEvent;
|
||||
type OutEvent = OutEvent;
|
||||
type OutboundOpenInfo = usize;
|
||||
type Substream = SubstreamRef<Arc<DummyMuxer>>;
|
||||
fn inject_substream(
|
||||
@ -74,25 +95,42 @@ impl NodeHandler for Handler {
|
||||
NodeHandlerEndpoint::Dialer(user_data) => Some(user_data),
|
||||
NodeHandlerEndpoint::Listener => None,
|
||||
};
|
||||
self.events.push(Event::Substream(user_data));
|
||||
self.events.push(InEvent::Substream(user_data));
|
||||
}
|
||||
fn inject_inbound_closed(&mut self) {
|
||||
self.events.push(Event::InboundClosed);
|
||||
self.events.push(InEvent::InboundClosed);
|
||||
}
|
||||
fn inject_outbound_closed(&mut self, _: usize) {
|
||||
self.events.push(Event::OutboundClosed);
|
||||
self.events.push(InEvent::OutboundClosed);
|
||||
if let Some(ref state) = self.next_outbound_state {
|
||||
self.state = Some(state.clone());
|
||||
}
|
||||
}
|
||||
fn inject_event(&mut self, inevent: Self::InEvent) {
|
||||
self.events.push(inevent)
|
||||
self.events.push(inevent.clone());
|
||||
match inevent {
|
||||
InEvent::Custom(s) => {
|
||||
self.state = Some(HandlerState::Ready(Some(NodeHandlerEvent::Custom(
|
||||
OutEvent::Custom(s),
|
||||
))))
|
||||
}
|
||||
InEvent::Substream(Some(user_data)) => {
|
||||
self.state = Some(HandlerState::Ready(Some(
|
||||
NodeHandlerEvent::OutboundSubstreamRequest(user_data),
|
||||
)))
|
||||
}
|
||||
InEvent::NextState => {
|
||||
let next_state = self.next_states.pop();
|
||||
self.state = next_state
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
fn shutdown(&mut self) {
|
||||
self.state = Some(HandlerState::Ready(None));
|
||||
}
|
||||
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<usize, Event>>, IoError> {
|
||||
match self.state {
|
||||
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<usize, OutEvent>>, IoError> {
|
||||
match self.state.take() {
|
||||
Some(ref state) => match state {
|
||||
HandlerState::NotReady => Ok(Async::NotReady),
|
||||
HandlerState::Ready(None) => Ok(Async::Ready(None)),
|
||||
|
@ -22,9 +22,9 @@
|
||||
//! version of the trait along with a way to setup the muxer to behave in the
|
||||
//! desired way when testing other components.
|
||||
|
||||
use std::io::Error as IoError;
|
||||
use muxing::{StreamMuxer, Shutdown};
|
||||
use futures::prelude::*;
|
||||
use muxing::{Shutdown, StreamMuxer};
|
||||
use std::io::Error as IoError;
|
||||
|
||||
/// Substream type
|
||||
#[derive(Debug)]
|
||||
@ -39,17 +39,17 @@ pub struct DummyOutboundSubstream {}
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum DummyConnectionState {
|
||||
Pending, // use this to trigger the Async::NotReady code path
|
||||
Closed, // use this to trigger the Async::Ready(None) code path
|
||||
Opened, // use this to trigger the Async::Ready(Some(_)) code path
|
||||
Closed, // use this to trigger the Async::Ready(None) code path
|
||||
Opened, // use this to trigger the Async::Ready(Some(_)) code path
|
||||
}
|
||||
#[derive(Debug, Clone)]
|
||||
struct DummyConnection {
|
||||
state: DummyConnectionState
|
||||
state: DummyConnectionState,
|
||||
}
|
||||
|
||||
/// `DummyMuxer` implements `StreamMuxer` and methods to control its behavior when used in tests
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DummyMuxer{
|
||||
pub struct DummyMuxer {
|
||||
in_connection: DummyConnection,
|
||||
out_connection: DummyConnection,
|
||||
}
|
||||
@ -59,8 +59,12 @@ impl DummyMuxer {
|
||||
/// and the (single) outbound substream to `Closed`.
|
||||
pub fn new() -> Self {
|
||||
DummyMuxer {
|
||||
in_connection: DummyConnection{ state: DummyConnectionState::Pending },
|
||||
out_connection: DummyConnection{ state: DummyConnectionState::Closed },
|
||||
in_connection: DummyConnection {
|
||||
state: DummyConnectionState::Pending,
|
||||
},
|
||||
out_connection: DummyConnection {
|
||||
state: DummyConnectionState::Closed,
|
||||
},
|
||||
}
|
||||
}
|
||||
/// Set the muxer state inbound "connection" state
|
||||
@ -80,23 +84,40 @@ impl StreamMuxer for DummyMuxer {
|
||||
match self.in_connection.state {
|
||||
DummyConnectionState::Pending => Ok(Async::NotReady),
|
||||
DummyConnectionState::Closed => Ok(Async::Ready(None)),
|
||||
DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream{}))),
|
||||
DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream {}))),
|
||||
}
|
||||
}
|
||||
fn open_outbound(&self) -> Self::OutboundSubstream { Self::OutboundSubstream{} }
|
||||
fn poll_outbound(&self, _substream: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> {
|
||||
fn open_outbound(&self) -> Self::OutboundSubstream {
|
||||
Self::OutboundSubstream {}
|
||||
}
|
||||
fn poll_outbound(
|
||||
&self,
|
||||
_substream: &mut Self::OutboundSubstream,
|
||||
) -> Poll<Option<Self::Substream>, IoError> {
|
||||
match self.out_connection.state {
|
||||
DummyConnectionState::Pending => Ok(Async::NotReady),
|
||||
DummyConnectionState::Closed => Ok(Async::Ready(None)),
|
||||
DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream{}))),
|
||||
DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream {}))),
|
||||
}
|
||||
}
|
||||
fn destroy_outbound(&self, _: Self::OutboundSubstream) {}
|
||||
fn read_substream(&self, _: &mut Self::Substream, _buf: &mut [u8]) -> Poll<usize, IoError> { unreachable!() }
|
||||
fn write_substream(&self, _: &mut Self::Substream, _buf: &[u8]) -> Poll<usize, IoError> { unreachable!() }
|
||||
fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() }
|
||||
fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { unreachable!() }
|
||||
fn read_substream(&self, _: &mut Self::Substream, _buf: &mut [u8]) -> Poll<usize, IoError> {
|
||||
unreachable!()
|
||||
}
|
||||
fn write_substream(&self, _: &mut Self::Substream, _buf: &[u8]) -> Poll<usize, IoError> {
|
||||
unreachable!()
|
||||
}
|
||||
fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> {
|
||||
unreachable!()
|
||||
}
|
||||
fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> {
|
||||
unreachable!()
|
||||
}
|
||||
fn destroy_substream(&self, _: Self::Substream) {}
|
||||
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { Ok(Async::Ready(())) }
|
||||
fn flush_all(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) }
|
||||
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
fn flush_all(&self) -> Poll<(), IoError> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
@ -23,16 +23,18 @@
|
||||
//! an initial state to facilitate testing.
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::{future::{self, FutureResult}, stream};
|
||||
use {Multiaddr, Transport};
|
||||
use futures::{
|
||||
future::{self, FutureResult},
|
||||
stream,
|
||||
};
|
||||
use std::io;
|
||||
|
||||
use {Multiaddr, Transport};
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
pub(crate) enum ListenerState {
|
||||
/// The `usize` indexes items produced by the listener
|
||||
Ok(Async<Option<usize>>),
|
||||
Error
|
||||
Error,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
@ -40,20 +42,25 @@ pub(crate) struct DummyTransport {
|
||||
listener_state: ListenerState,
|
||||
}
|
||||
impl DummyTransport {
|
||||
pub(crate) fn new() -> Self { DummyTransport{ listener_state: ListenerState::Ok(Async::NotReady) }}
|
||||
pub(crate) fn new() -> Self {
|
||||
DummyTransport {
|
||||
listener_state: ListenerState::Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
pub(crate) fn set_initial_listener_state(&mut self, state: ListenerState) {
|
||||
self.listener_state = state;
|
||||
}
|
||||
}
|
||||
impl Transport for DummyTransport {
|
||||
type Output = usize;
|
||||
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=io::Error> + Send>;
|
||||
type Listener =
|
||||
Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = io::Error> + Send>;
|
||||
type ListenerUpgrade = FutureResult<Self::Output, io::Error>;
|
||||
type Dial = Box<Future<Item=Self::Output, Error=io::Error> + Send>;
|
||||
type Dial = Box<Future<Item = Self::Output, Error = io::Error> + Send>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
|
||||
where
|
||||
Self: Sized
|
||||
Self: Sized,
|
||||
{
|
||||
let addr2 = addr.clone();
|
||||
match self.listener_state {
|
||||
@ -63,24 +70,24 @@ impl Transport for DummyTransport {
|
||||
Async::NotReady => {
|
||||
let stream = stream::poll_fn(|| Ok(Async::NotReady)).map(tupelize);
|
||||
(Box::new(stream), addr2)
|
||||
},
|
||||
}
|
||||
Async::Ready(Some(n)) => {
|
||||
let stream = stream::iter_ok(n..).map(tupelize);
|
||||
(Box::new(stream), addr2)
|
||||
},
|
||||
}
|
||||
Async::Ready(None) => {
|
||||
let stream = stream::empty();
|
||||
(Box::new(stream), addr2)
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
ListenerState::Error => Err( (self, addr2) )
|
||||
ListenerState::Error => Err((self, addr2)),
|
||||
}
|
||||
}
|
||||
|
||||
fn dial(self, _addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
|
||||
where
|
||||
Self: Sized
|
||||
Self: Sized,
|
||||
{
|
||||
unimplemented!();
|
||||
}
|
||||
|
Reference in New Issue
Block a user