Improve nodes state (#917)

* Improve the state consistency in src/nodes

* Add a user data parameter to tasks

* Remove the tasks HashMap in CollectionStream

* Add TODO
This commit is contained in:
Pierre Krieger 2019-02-14 13:46:52 +01:00 committed by GitHub
parent b7fa7f38b1
commit 1100325e63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 131 additions and 252 deletions

View File

@ -30,20 +30,21 @@ use crate::{
};
use fnv::FnvHashMap;
use futures::prelude::*;
use std::{collections::hash_map::Entry, error, fmt, hash::Hash, mem};
use std::{error, fmt, hash::Hash, mem};
mod tests;
/// Implementation of `Stream` that handles a collection of nodes.
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId = PeerId> {
/// Object that handles the tasks.
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>,
///
/// The user data contains the state of the task. If `Connected`, then a corresponding entry
/// must be present in `nodes`.
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TPeerId>, TPeerId>,
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
/// must always be in the `Connected` state.
nodes: FnvHashMap<TPeerId, TaskId>,
/// List of tasks and their state. If `Connected`, then a corresponding entry must be present
/// in `nodes`.
tasks: FnvHashMap<TaskId, TaskState<TPeerId>>,
}
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
@ -52,18 +53,7 @@ where
TPeerId: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let mut list = f.debug_list();
for (id, task) in &self.tasks {
match *task {
TaskState::Pending => {
list.entry(&format!("Pending({:?})", ReachAttemptId(*id)))
},
TaskState::Connected(ref peer_id) => {
list.entry(&format!("Connected({:?})", peer_id))
}
};
}
list.finish()
f.debug_tuple("CollectionStream").finish()
}
}
@ -200,20 +190,16 @@ where
pub fn accept(self) -> (CollectionNodeAccept, TPeerId) {
// Set the state of the task to `Connected`.
let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id);
let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone()));
debug_assert!(_former_state == Some(TaskState::Pending));
*self.parent.inner.task(self.id)
.expect("A CollectionReachEvent is only ever created from a valid attempt; QED")
.user_data_mut() = TaskState::Connected(self.peer_id.clone());
// It is possible that we already have a task connected to the same peer. In this
// case, we need to emit a `NodeReplaced` event.
let ret_value = if let Some(former_task_id) = former_task_id {
self.parent.inner.task(former_task_id)
.expect("whenever we receive a TaskClosed event or close a node, we remove the \
corresponding entry from self.nodes; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks; QED")
.close();
let _former_other_state = self.parent.tasks.remove(&former_task_id);
debug_assert!(_former_other_state == Some(TaskState::Connected(self.peer_id.clone())));
let tasks = &mut self.parent.inner;
let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) {
debug_assert!(*former_task.user_data() == TaskState::Connected(self.peer_id.clone()));
former_task.close();
// TODO: we unfortunately have to clone the peer id here
(CollectionNodeAccept::ReplacedExisting, self.peer_id.clone())
} else {
@ -256,14 +242,13 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop fo
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
{
fn drop(&mut self) {
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
self.parent.inner.task(self.id)
let task = self.parent.inner.task(self.id)
.expect("we create the CollectionReachEvent with a valid task id; the \
CollectionReachEvent mutably borrows the collection, therefore nothing \
can delete this task during the lifetime of the CollectionReachEvent; \
therefore the task is still valid when we delete it; QED")
.close();
therefore the task is still valid when we delete it; QED");
debug_assert!(if let TaskState::Pending = task.user_data() { true } else { false });
task.close();
}
}
@ -291,7 +276,6 @@ where
CollectionStream {
inner: HandledNodesTasks::new(),
nodes: Default::default(),
tasks: Default::default(),
}
}
@ -314,31 +298,22 @@ where
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
TPeerId: Send + 'static,
{
let id = self.inner.add_reach_attempt(future, handler);
self.tasks.insert(id, TaskState::Pending);
ReachAttemptId(id)
ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler))
}
/// Interrupts a reach attempt.
///
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), InterruptError> {
match self.tasks.entry(id.0) {
Entry::Vacant(_) => Err(InterruptError::ReachAttemptNotFound),
Entry::Occupied(entry) => {
match entry.get() {
match self.inner.task(id.0) {
None => Err(InterruptError::ReachAttemptNotFound),
Some(task) => {
match task.user_data() {
TaskState::Connected(_) => return Err(InterruptError::AlreadyReached),
TaskState::Pending => (),
};
entry.remove();
self.inner.task(id.0)
.expect("whenever we receive a TaskClosed event or interrupt a task, we \
remove the corresponding entry from self.tasks; therefore all \
elements in self.tasks are valid tasks in the \
HandledNodesTasks; QED")
.close();
task.close();
Ok(())
}
}
@ -366,7 +341,6 @@ where
match self.inner.task(task) {
Some(inner) => Some(PeerMut {
inner,
tasks: &mut self.tasks,
nodes: &mut self.nodes,
}),
None => None,
@ -401,31 +375,31 @@ where
};
match item {
HandledNodesEvent::TaskClosed { id, result, handler } => {
match (self.tasks.remove(&id), result, handler) {
(Some(TaskState::Pending), Err(TaskClosedEvent::Reach(err)), Some(handler)) => {
HandledNodesEvent::TaskClosed { id, result, handler, user_data } => {
match (user_data, result, handler) {
(TaskState::Pending, Err(TaskClosedEvent::Reach(err)), Some(handler)) => {
Async::Ready(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: err,
handler,
})
},
(Some(TaskState::Pending), Ok(()), _) => {
(TaskState::Pending, Ok(()), _) => {
panic!("The API of HandledNodesTasks guarantees that a task cannot \
gracefully closed before being connected to a node, in which case \
its state should be Connected and not Pending; QED");
},
(Some(TaskState::Pending), Err(TaskClosedEvent::Node(_)), _) => {
(TaskState::Pending, Err(TaskClosedEvent::Node(_)), _) => {
panic!("We switch the task state to Connected once we're connected, and \
a TaskClosedEvent::Node can only happen after we're \
connected; QED");
},
(Some(TaskState::Pending), Err(TaskClosedEvent::Reach(_)), None) => {
(TaskState::Pending, Err(TaskClosedEvent::Reach(_)), None) => {
// TODO: this could be improved in the API of HandledNodesTasks
panic!("The HandledNodesTasks is guaranteed to always return the handler \
when producing a TaskClosedEvent::Reach error");
},
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
(TaskState::Connected(peer_id), Ok(()), _handler) => {
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
@ -433,7 +407,7 @@ where
peer_id,
})
},
(Some(TaskState::Connected(peer_id)), Err(TaskClosedEvent::Node(err)), _handler) => {
(TaskState::Connected(peer_id), Err(TaskClosedEvent::Node(err)), _handler) => {
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
@ -442,28 +416,24 @@ where
error: err,
})
},
(Some(TaskState::Connected(_)), Err(TaskClosedEvent::Reach(_)), _) => {
(TaskState::Connected(_), Err(TaskClosedEvent::Reach(_)), _) => {
panic!("A TaskClosedEvent::Reach can only happen before we are connected \
to a node; therefore the TaskState won't be Connected; QED");
},
(None, _, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner; \
when we add a task in self.inner we add a corresponding entry in \
self.tasks, and remove the entry only when the task is closed; \
QED")
},
}
},
HandledNodesEvent::NodeReached { id, peer_id } => {
HandledNodesEvent::NodeReached { task, peer_id } => {
let id = task.id();
drop(task);
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
parent: self,
id,
peer_id,
}))
},
HandledNodesEvent::NodeEvent { id, event } => {
let peer_id = match self.tasks.get(&id) {
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
HandledNodesEvent::NodeEvent { task, event } => {
let peer_id = match task.user_data() {
TaskState::Connected(peer_id) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task; \
when we receive a NodeReached event, we ensure that the entry in \
@ -507,8 +477,7 @@ impl error::Error for InterruptError {}
/// Access to a peer in the collection.
pub struct PeerMut<'a, TInEvent, TPeerId = PeerId> {
inner: HandledNodesTask<'a, TInEvent>,
tasks: &'a mut FnvHashMap<TaskId, TaskState<TPeerId>>,
inner: HandledNodesTask<'a, TInEvent, TaskState<TPeerId>>,
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
}
@ -526,13 +495,12 @@ where
///
/// No further event will be generated for this node.
pub fn close(self) {
let task_state = self.tasks.remove(&self.inner.id());
if let Some(TaskState::Connected(peer_id)) = task_state {
if let TaskState::Connected(peer_id) = self.inner.user_data() {
let old_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(old_task_id, Some(self.inner.id()));
} else {
panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \
nodes always matched a Connected entry in tasks; QED");
nodes always matched a Connected entry in the tasks; QED");
};
self.inner.close();

View File

@ -107,7 +107,6 @@ fn accepting_a_node_yields_new_entry() {
}
2 => {
assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
assert_matches!(reach_ev.parent, CollectionStream{..});
let (accept_ev, accepted_peer_id) = reach_ev.accept();
assert_eq!(accepted_peer_id, peer_id);
assert_matches!(accept_ev, CollectionNodeAccept::NewEntry);

View File

@ -36,7 +36,6 @@ use std::{
mem
};
use tokio_executor;
use void::Void;
mod tests;
@ -57,11 +56,11 @@ mod tests;
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
/// Implementation of `Stream` that handles a collection of nodes.
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId = PeerId> {
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
/// the task. It is possible that we receive messages from tasks that used to be in this list
/// but no longer are, in which case we should ignore them.
tasks: FnvHashMap<TaskId, mpsc::UnboundedSender<TInEvent>>,
tasks: FnvHashMap<TaskId, (mpsc::UnboundedSender<TInEvent>, TUserData)>,
/// Identifier for the next task to spawn.
next_task_id: TaskId,
@ -76,12 +75,14 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THand
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
}
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
where
TUserData: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_list()
.entries(self.tasks.keys().cloned())
f.debug_map()
.entries(self.tasks.iter().map(|(id, (_, ud))| (id, ud)))
.finish()
}
}
@ -145,7 +146,7 @@ where T: NodeHandler
/// Event that can happen on the `HandledNodesTasks`.
#[derive(Debug)]
pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId = PeerId> {
pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
/// A task has been closed.
///
/// This happens once the node handler closes or an error happens.
@ -153,6 +154,8 @@ pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPee
TaskClosed {
/// Identifier of the task that closed.
id: TaskId,
/// The user data that was associated with the task.
user_data: TUserData,
/// What happened.
result: Result<(), TaskClosedEvent<TReachErr, THandlerErr>>,
/// If the task closed before reaching the node, this contains the handler that was passed
@ -162,16 +165,16 @@ pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPee
/// A task has successfully connected to a node.
NodeReached {
/// Identifier of the task that succeeded.
id: TaskId,
/// The task that succeeded.
task: Task<'a, TInEvent, TUserData>,
/// Identifier of the node.
peer_id: TPeerId,
},
/// A task has produced an event.
NodeEvent {
/// Identifier of the task that produced the event.
id: TaskId,
/// The task that produced the event.
task: Task<'a, TInEvent, TUserData>,
/// The produced event.
event: TOutEvent,
},
@ -181,8 +184,8 @@ pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPee
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(usize);
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
{
/// Creates a new empty collection.
#[inline]
@ -202,7 +205,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
///
/// This method spawns a task dedicated to resolving this future and processing the node's
/// events.
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: TIntoHandler) -> TaskId
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId
where
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
TIntoHandler: IntoNodeHandler<TPeerId> + Send + 'static,
@ -220,7 +223,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::unbounded();
self.tasks.insert(task_id, tx);
self.tasks.insert(task_id, (tx, user_data));
let task = Box::new(NodeTask {
inner: NodeTaskInner::Future {
@ -241,7 +244,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,
{
for sender in self.tasks.values() {
for (sender, _) in self.tasks.values() {
// Note: it is possible that sending an event fails if the background task has already
// finished, but the local state hasn't reflected that yet because it hasn't been
// polled. This is not an error situation.
@ -253,7 +256,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
///
/// Returns `None` if the task id is invalid.
#[inline]
pub fn task(&mut self, id: TaskId) -> Option<Task<'_, TInEvent>> {
pub fn task(&mut self, id: TaskId) -> Option<Task<'_, TInEvent, TUserData>> {
match self.tasks.entry(id) {
Entry::Occupied(inner) => Some(Task { inner }),
Entry::Vacant(_) => None,
@ -267,39 +270,58 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
}
/// Provides an API similar to `Stream`, except that it cannot produce an error.
pub fn poll(&mut self) -> Async<HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>> {
pub fn poll(&mut self) -> Async<HandledNodesEvent<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>> {
let (message, task_id) = match self.poll_inner() {
Async::Ready(r) => r,
Async::NotReady => return Async::NotReady,
};
Async::Ready(match message {
InToExtMessage::NodeEvent(event) => {
HandledNodesEvent::NodeEvent {
task: match self.tasks.entry(task_id) {
Entry::Occupied(inner) => Task { inner },
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
},
event
}
},
InToExtMessage::NodeReached(peer_id) => {
HandledNodesEvent::NodeReached {
task: match self.tasks.entry(task_id) {
Entry::Occupied(inner) => Task { inner },
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
},
peer_id
}
},
InToExtMessage::TaskClosed(result, handler) => {
let (_, user_data) = self.tasks.remove(&task_id)
.expect("poll_inner only returns valid TaskIds; QED");
HandledNodesEvent::TaskClosed {
id: task_id, result, handler, user_data,
}
},
})
}
/// Since non-lexical lifetimes still don't work very well in Rust at the moment, we have to
/// split `poll()` in two. This method returns an `InToExtMessage` that is guaranteed to come
/// from an alive task.
// TODO: look into merging with `poll()`
fn poll_inner(&mut self) -> Async<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)> {
for to_spawn in self.to_spawn.drain() {
tokio_executor::spawn(to_spawn);
}
loop {
match self.events_rx.poll() {
Ok(Async::Ready(Some((message, task_id)))) => {
// If the task id is no longer in `self.tasks`, that means that the user called
// `close()` on this task earlier. Therefore no new event should be generated
// for this task.
if !self.tasks.contains_key(&task_id) {
continue;
};
match message {
InToExtMessage::NodeEvent(event) => {
break Async::Ready(HandledNodesEvent::NodeEvent {
id: task_id,
event,
});
},
InToExtMessage::NodeReached(peer_id) => {
break Async::Ready(HandledNodesEvent::NodeReached {
id: task_id,
peer_id,
});
},
InToExtMessage::TaskClosed(result, handler) => {
let _ = self.tasks.remove(&task_id);
break Async::Ready(HandledNodesEvent::TaskClosed {
id: task_id, result, handler
});
},
if self.tasks.contains_key(&task_id) {
break Async::Ready((message, task_id));
}
}
Ok(Async::NotReady) => {
@ -316,11 +338,11 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
}
/// Access to a task in the collection.
pub struct Task<'a, TInEvent> {
inner: OccupiedEntry<'a, TaskId, mpsc::UnboundedSender<TInEvent>>,
pub struct Task<'a, TInEvent, TUserData> {
inner: OccupiedEntry<'a, TaskId, (mpsc::UnboundedSender<TInEvent>, TUserData)>,
}
impl<'a, TInEvent> Task<'a, TInEvent> {
impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> {
/// Sends an event to the given node.
// TODO: report back on delivery
#[inline]
@ -328,7 +350,17 @@ impl<'a, TInEvent> Task<'a, TInEvent> {
// It is possible that the sender is closed if the background task has already finished
// but the local state hasn't been updated yet because we haven't been polled in the
// meanwhile.
let _ = self.inner.get_mut().unbounded_send(event);
let _ = self.inner.get_mut().0.unbounded_send(event);
}
/// Returns the user data associated with the task.
pub fn user_data(&self) -> &TUserData {
&self.inner.get().1
}
/// Returns the user data associated with the task.
pub fn user_data_mut(&mut self) -> &mut TUserData {
&mut self.inner.get_mut().1
}
/// Returns the task id.
@ -345,26 +377,18 @@ impl<'a, TInEvent> Task<'a, TInEvent> {
}
}
impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> {
impl<'a, TInEvent, TUserData> fmt::Debug for Task<'a, TInEvent, TUserData>
where
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_tuple("Task")
.field(&self.id())
.field(self.user_data())
.finish()
}
}
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> Stream for
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
{
type Item = HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>;
type Error = Void; // TODO: use ! once stable
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(self.poll().map(Option::Some))
}
}
/// Message to transmit from a task to the public API.
#[derive(Debug)]
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {

View File

@ -27,10 +27,8 @@ use std::io;
use assert_matches::assert_matches;
use futures::future::{self, FutureResult};
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use crate::nodes::handled_node::NodeHandlerEvent;
use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode};
use crate::tests::dummy_handler::{Handler, InEvent, OutEvent, TestHandledNode};
use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use tokio::runtime::Builder;
use tokio::runtime::current_thread::Runtime;
use void::Void;
use crate::PeerId;
@ -99,7 +97,7 @@ impl NodeTaskTestBuilder {
}
}
type TestHandledNodesTasks = HandledNodesTasks<InEvent, OutEvent, Handler, io::Error, io::Error>;
type TestHandledNodesTasks = HandledNodesTasks<InEvent, OutEvent, Handler, io::Error, io::Error, ()>;
struct HandledNodeTaskTestBuilder {
muxer: DummyMuxer,
@ -120,22 +118,6 @@ impl HandledNodeTaskTestBuilder {
self.task_count = amt;
self
}
fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_inbound_connection_state(state);
self
}
fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_outbound_connection_state(state);
self
}
fn with_handler_state(&mut self, state: HandlerState) -> &mut Self {
self.handler.state = Some(state);
self
}
fn with_handler_states(&mut self, states: Vec<HandlerState>) -> &mut Self {
self.handler.next_states = states;
self
}
fn handled_nodes_tasks(&mut self) -> (TestHandledNodesTasks, Vec<TaskId>) {
let mut handled_nodes = HandledNodesTasks::new();
let peer_id = PeerId::random();
@ -143,7 +125,7 @@ impl HandledNodeTaskTestBuilder {
for _i in 0..self.task_count {
let fut = future::ok((peer_id.clone(), self.muxer.clone()));
task_ids.push(
handled_nodes.add_reach_attempt(fut, self.handler.clone())
handled_nodes.add_reach_attempt(fut, (), self.handler.clone())
);
}
(handled_nodes, task_ids)
@ -249,29 +231,6 @@ fn query_for_tasks() {
assert!(handled_nodes.task(TaskId(545534)).is_none());
}
#[test]
fn send_event_to_task() {
let (mut handled_nodes, _) = HandledNodeTaskTestBuilder::new()
.with_tasks(1)
.handled_nodes_tasks();
let task_id = {
let mut task = handled_nodes.task(TaskId(0)).expect("can fetch a Task");
task.send_event(InEvent::Custom("banana"));
task.id()
};
let mut rt = Builder::new().core_threads(1).build().unwrap();
let mut events = rt.block_on(handled_nodes.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..});
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: event_task_id, event} => {
assert_eq!(event_task_id, task_id);
assert_matches!(event, OutEvent::Custom("banana"));
});
}
#[test]
fn iterate_over_all_tasks() {
let (handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new()
@ -286,83 +245,12 @@ fn iterate_over_all_tasks() {
#[test]
fn add_reach_attempt_prepares_a_new_task() {
let mut handled_nodes: HandledNodesTasks<_, _, _, _, _> = HandledNodesTasks::new();
let mut handled_nodes: HandledNodesTasks<_, _, _, _, _, _> = HandledNodesTasks::new();
assert_eq!(handled_nodes.tasks().count(), 0);
assert_eq!(handled_nodes.to_spawn.len(), 0);
handled_nodes.add_reach_attempt( future::empty::<_, Void>(), Handler::default() );
handled_nodes.add_reach_attempt(future::empty::<_, Void>(), (), Handler::default());
assert_eq!(handled_nodes.tasks().count(), 1);
assert_eq!(handled_nodes.to_spawn.len(), 1);
}
#[test]
fn running_handled_tasks_reaches_the_nodes() {
let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new()
.with_tasks(5)
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Err) // stop the loop
.handled_nodes_tasks();
let mut rt = Runtime::new().unwrap();
let mut events: (Option<HandledNodesEvent<_, _, _, _>>, TestHandledNodesTasks);
// we're running on a single thread so events are sequential: first
// we get a NodeReached, then a TaskClosed
for i in 0..5 {
events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
assert_matches!(events, (Some(HandledNodesEvent::NodeReached{..}), ref hnt) => {
assert_matches!(hnt, HandledNodesTasks{..});
assert_eq!(hnt.tasks().count(), 5 - i);
});
handled_nodes_tasks = events.1;
events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
assert_matches!(events, (Some(HandledNodesEvent::TaskClosed{..}), _));
handled_nodes_tasks = events.1;
}
}
#[test]
fn events_in_tasks_are_emitted() {
// States are pop()'d so they are set in reverse order by the Handler
let handler_states = vec![
HandlerState::Err,
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler2") ))),
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler") ))),
];
let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new()
.with_tasks(1)
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Opened)
.with_handler_states(handler_states)
.handled_nodes_tasks();
let tx = {
let mut task0 = handled_nodes_tasks.task(TaskId(0)).unwrap();
let tx = task0.inner.get_mut();
tx.clone()
};
let mut rt = Builder::new().core_threads(1).build().unwrap();
let mut events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..});
tx.unbounded_send(InEvent::NextState).expect("send works");
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => {
assert_matches!(event, OutEvent::Custom("from handler"));
});
tx.unbounded_send(InEvent::NextState).expect("send works");
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => {
assert_matches!(event, OutEvent::Custom("from handler2"));
});
tx.unbounded_send(InEvent::NextState).expect("send works");
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::TaskClosed{id: _, result, handler: _} => {
assert_matches!(result, Err(_));
});
}

View File

@ -30,7 +30,7 @@ use tokio_timer::{self, Delay};
use void::{Void, unreachable};
/// Delay between the moment we connect and the first time we identify.
const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500);
const DELAY_TO_FIRST_ID: Duration = Duration::from_secs(3600);
/// After an identification succeeded, wait this long before the next time.
const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60);
/// After we failed to identify the remote, try again after the given delay.