diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index ff4d5802..2c0dc823 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -30,20 +30,21 @@ use crate::{ }; use fnv::FnvHashMap; use futures::prelude::*; -use std::{collections::hash_map::Entry, error, fmt, hash::Hash, mem}; +use std::{error, fmt, hash::Hash, mem}; mod tests; /// Implementation of `Stream` that handles a collection of nodes. pub struct CollectionStream { /// Object that handles the tasks. - inner: HandledNodesTasks, + /// + /// The user data contains the state of the task. If `Connected`, then a corresponding entry + /// must be present in `nodes`. + inner: HandledNodesTasks, TPeerId>, + /// List of nodes, with the task id that handles this node. The corresponding entry in `tasks` /// must always be in the `Connected` state. nodes: FnvHashMap, - /// List of tasks and their state. If `Connected`, then a corresponding entry must be present - /// in `nodes`. - tasks: FnvHashMap>, } impl fmt::Debug for @@ -52,18 +53,7 @@ where TPeerId: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - let mut list = f.debug_list(); - for (id, task) in &self.tasks { - match *task { - TaskState::Pending => { - list.entry(&format!("Pending({:?})", ReachAttemptId(*id))) - }, - TaskState::Connected(ref peer_id) => { - list.entry(&format!("Connected({:?})", peer_id)) - } - }; - } - list.finish() + f.debug_tuple("CollectionStream").finish() } } @@ -200,20 +190,16 @@ where pub fn accept(self) -> (CollectionNodeAccept, TPeerId) { // Set the state of the task to `Connected`. let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id); - let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone())); - debug_assert!(_former_state == Some(TaskState::Pending)); + *self.parent.inner.task(self.id) + .expect("A CollectionReachEvent is only ever created from a valid attempt; QED") + .user_data_mut() = TaskState::Connected(self.peer_id.clone()); // It is possible that we already have a task connected to the same peer. In this // case, we need to emit a `NodeReplaced` event. - let ret_value = if let Some(former_task_id) = former_task_id { - self.parent.inner.task(former_task_id) - .expect("whenever we receive a TaskClosed event or close a node, we remove the \ - corresponding entry from self.nodes; therefore all elements in \ - self.nodes are valid tasks in the HandledNodesTasks; QED") - .close(); - let _former_other_state = self.parent.tasks.remove(&former_task_id); - debug_assert!(_former_other_state == Some(TaskState::Connected(self.peer_id.clone()))); - + let tasks = &mut self.parent.inner; + let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) { + debug_assert!(*former_task.user_data() == TaskState::Connected(self.peer_id.clone())); + former_task.close(); // TODO: we unfortunately have to clone the peer id here (CollectionNodeAccept::ReplacedExisting, self.peer_id.clone()) } else { @@ -256,14 +242,13 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop fo CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> { fn drop(&mut self) { - let task_state = self.parent.tasks.remove(&self.id); - debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false }); - self.parent.inner.task(self.id) + let task = self.parent.inner.task(self.id) .expect("we create the CollectionReachEvent with a valid task id; the \ CollectionReachEvent mutably borrows the collection, therefore nothing \ can delete this task during the lifetime of the CollectionReachEvent; \ - therefore the task is still valid when we delete it; QED") - .close(); + therefore the task is still valid when we delete it; QED"); + debug_assert!(if let TaskState::Pending = task.user_data() { true } else { false }); + task.close(); } } @@ -291,7 +276,6 @@ where CollectionStream { inner: HandledNodesTasks::new(), nodes: Default::default(), - tasks: Default::default(), } } @@ -314,31 +298,22 @@ where TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required TPeerId: Send + 'static, { - let id = self.inner.add_reach_attempt(future, handler); - self.tasks.insert(id, TaskState::Pending); - ReachAttemptId(id) + ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler)) } /// Interrupts a reach attempt. /// /// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid. pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), InterruptError> { - match self.tasks.entry(id.0) { - Entry::Vacant(_) => Err(InterruptError::ReachAttemptNotFound), - Entry::Occupied(entry) => { - match entry.get() { + match self.inner.task(id.0) { + None => Err(InterruptError::ReachAttemptNotFound), + Some(task) => { + match task.user_data() { TaskState::Connected(_) => return Err(InterruptError::AlreadyReached), TaskState::Pending => (), }; - entry.remove(); - self.inner.task(id.0) - .expect("whenever we receive a TaskClosed event or interrupt a task, we \ - remove the corresponding entry from self.tasks; therefore all \ - elements in self.tasks are valid tasks in the \ - HandledNodesTasks; QED") - .close(); - + task.close(); Ok(()) } } @@ -366,7 +341,6 @@ where match self.inner.task(task) { Some(inner) => Some(PeerMut { inner, - tasks: &mut self.tasks, nodes: &mut self.nodes, }), None => None, @@ -401,31 +375,31 @@ where }; match item { - HandledNodesEvent::TaskClosed { id, result, handler } => { - match (self.tasks.remove(&id), result, handler) { - (Some(TaskState::Pending), Err(TaskClosedEvent::Reach(err)), Some(handler)) => { + HandledNodesEvent::TaskClosed { id, result, handler, user_data } => { + match (user_data, result, handler) { + (TaskState::Pending, Err(TaskClosedEvent::Reach(err)), Some(handler)) => { Async::Ready(CollectionEvent::ReachError { id: ReachAttemptId(id), error: err, handler, }) }, - (Some(TaskState::Pending), Ok(()), _) => { + (TaskState::Pending, Ok(()), _) => { panic!("The API of HandledNodesTasks guarantees that a task cannot \ gracefully closed before being connected to a node, in which case \ its state should be Connected and not Pending; QED"); }, - (Some(TaskState::Pending), Err(TaskClosedEvent::Node(_)), _) => { + (TaskState::Pending, Err(TaskClosedEvent::Node(_)), _) => { panic!("We switch the task state to Connected once we're connected, and \ a TaskClosedEvent::Node can only happen after we're \ connected; QED"); }, - (Some(TaskState::Pending), Err(TaskClosedEvent::Reach(_)), None) => { + (TaskState::Pending, Err(TaskClosedEvent::Reach(_)), None) => { // TODO: this could be improved in the API of HandledNodesTasks panic!("The HandledNodesTasks is guaranteed to always return the handler \ when producing a TaskClosedEvent::Reach error"); }, - (Some(TaskState::Connected(peer_id)), Ok(()), _handler) => { + (TaskState::Connected(peer_id), Ok(()), _handler) => { debug_assert!(_handler.is_none()); let _node_task_id = self.nodes.remove(&peer_id); debug_assert_eq!(_node_task_id, Some(id)); @@ -433,7 +407,7 @@ where peer_id, }) }, - (Some(TaskState::Connected(peer_id)), Err(TaskClosedEvent::Node(err)), _handler) => { + (TaskState::Connected(peer_id), Err(TaskClosedEvent::Node(err)), _handler) => { debug_assert!(_handler.is_none()); let _node_task_id = self.nodes.remove(&peer_id); debug_assert_eq!(_node_task_id, Some(id)); @@ -442,28 +416,24 @@ where error: err, }) }, - (Some(TaskState::Connected(_)), Err(TaskClosedEvent::Reach(_)), _) => { + (TaskState::Connected(_), Err(TaskClosedEvent::Reach(_)), _) => { panic!("A TaskClosedEvent::Reach can only happen before we are connected \ to a node; therefore the TaskState won't be Connected; QED"); }, - (None, _, _) => { - panic!("self.tasks is always kept in sync with the tasks in self.inner; \ - when we add a task in self.inner we add a corresponding entry in \ - self.tasks, and remove the entry only when the task is closed; \ - QED") - }, } }, - HandledNodesEvent::NodeReached { id, peer_id } => { + HandledNodesEvent::NodeReached { task, peer_id } => { + let id = task.id(); + drop(task); Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent { parent: self, id, peer_id, })) }, - HandledNodesEvent::NodeEvent { id, event } => { - let peer_id = match self.tasks.get(&id) { - Some(TaskState::Connected(peer_id)) => peer_id.clone(), + HandledNodesEvent::NodeEvent { task, event } => { + let peer_id = match task.user_data() { + TaskState::Connected(peer_id) => peer_id.clone(), _ => panic!("we can only receive NodeEvent events from a task after we \ received a corresponding NodeReached event from that same task; \ when we receive a NodeReached event, we ensure that the entry in \ @@ -507,8 +477,7 @@ impl error::Error for InterruptError {} /// Access to a peer in the collection. pub struct PeerMut<'a, TInEvent, TPeerId = PeerId> { - inner: HandledNodesTask<'a, TInEvent>, - tasks: &'a mut FnvHashMap>, + inner: HandledNodesTask<'a, TInEvent, TaskState>, nodes: &'a mut FnvHashMap, } @@ -526,13 +495,12 @@ where /// /// No further event will be generated for this node. pub fn close(self) { - let task_state = self.tasks.remove(&self.inner.id()); - if let Some(TaskState::Connected(peer_id)) = task_state { + if let TaskState::Connected(peer_id) = self.inner.user_data() { let old_task_id = self.nodes.remove(&peer_id); debug_assert_eq!(old_task_id, Some(self.inner.id())); } else { panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \ - nodes always matched a Connected entry in tasks; QED"); + nodes always matched a Connected entry in the tasks; QED"); }; self.inner.close(); diff --git a/core/src/nodes/collection/tests.rs b/core/src/nodes/collection/tests.rs index e29b378f..779d36fb 100644 --- a/core/src/nodes/collection/tests.rs +++ b/core/src/nodes/collection/tests.rs @@ -107,7 +107,6 @@ fn accepting_a_node_yields_new_entry() { } 2 => { assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - assert_matches!(reach_ev.parent, CollectionStream{..}); let (accept_ev, accepted_peer_id) = reach_ev.accept(); assert_eq!(accepted_peer_id, peer_id); assert_matches!(accept_ev, CollectionNodeAccept::NewEntry); diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 428513cd..e4b8f072 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -36,7 +36,6 @@ use std::{ mem }; use tokio_executor; -use void::Void; mod tests; @@ -57,11 +56,11 @@ mod tests; // conditions in the user's code. See similar comments in the documentation of `NodeStream`. /// Implementation of `Stream` that handles a collection of nodes. -pub struct HandledNodesTasks { +pub struct HandledNodesTasks { /// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts /// the task. It is possible that we receive messages from tasks that used to be in this list /// but no longer are, in which case we should ignore them. - tasks: FnvHashMap>, + tasks: FnvHashMap, TUserData)>, /// Identifier for the next task to spawn. next_task_id: TaskId, @@ -76,12 +75,14 @@ pub struct HandledNodesTasks, TaskId)>, } -impl fmt::Debug for - HandledNodesTasks +impl fmt::Debug for + HandledNodesTasks +where + TUserData: fmt::Debug { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_list() - .entries(self.tasks.keys().cloned()) + f.debug_map() + .entries(self.tasks.iter().map(|(id, (_, ud))| (id, ud))) .finish() } } @@ -145,7 +146,7 @@ where T: NodeHandler /// Event that can happen on the `HandledNodesTasks`. #[derive(Debug)] -pub enum HandledNodesEvent { +pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> { /// A task has been closed. /// /// This happens once the node handler closes or an error happens. @@ -153,6 +154,8 @@ pub enum HandledNodesEvent>, /// If the task closed before reaching the node, this contains the handler that was passed @@ -162,16 +165,16 @@ pub enum HandledNodesEvent, /// Identifier of the node. peer_id: TPeerId, }, /// A task has produced an event. NodeEvent { - /// Identifier of the task that produced the event. - id: TaskId, + /// The task that produced the event. + task: Task<'a, TInEvent, TUserData>, /// The produced event. event: TOutEvent, }, @@ -181,8 +184,8 @@ pub enum HandledNodesEvent - HandledNodesTasks +impl + HandledNodesTasks { /// Creates a new empty collection. #[inline] @@ -202,7 +205,7 @@ impl /// /// This method spawns a task dedicated to resolving this future and processing the node's /// events. - pub fn add_reach_attempt(&mut self, future: TFut, handler: TIntoHandler) -> TaskId + pub fn add_reach_attempt(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId where TFut: Future + Send + 'static, TIntoHandler: IntoNodeHandler + Send + 'static, @@ -220,7 +223,7 @@ impl self.next_task_id.0 += 1; let (tx, rx) = mpsc::unbounded(); - self.tasks.insert(task_id, tx); + self.tasks.insert(task_id, (tx, user_data)); let task = Box::new(NodeTask { inner: NodeTaskInner::Future { @@ -241,7 +244,7 @@ impl pub fn broadcast_event(&mut self, event: &TInEvent) where TInEvent: Clone, { - for sender in self.tasks.values() { + for (sender, _) in self.tasks.values() { // Note: it is possible that sending an event fails if the background task has already // finished, but the local state hasn't reflected that yet because it hasn't been // polled. This is not an error situation. @@ -253,7 +256,7 @@ impl /// /// Returns `None` if the task id is invalid. #[inline] - pub fn task(&mut self, id: TaskId) -> Option> { + pub fn task(&mut self, id: TaskId) -> Option> { match self.tasks.entry(id) { Entry::Occupied(inner) => Some(Task { inner }), Entry::Vacant(_) => None, @@ -267,39 +270,58 @@ impl } /// Provides an API similar to `Stream`, except that it cannot produce an error. - pub fn poll(&mut self) -> Async> { + pub fn poll(&mut self) -> Async> { + let (message, task_id) = match self.poll_inner() { + Async::Ready(r) => r, + Async::NotReady => return Async::NotReady, + }; + + Async::Ready(match message { + InToExtMessage::NodeEvent(event) => { + HandledNodesEvent::NodeEvent { + task: match self.tasks.entry(task_id) { + Entry::Occupied(inner) => Task { inner }, + Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED") + }, + event + } + }, + InToExtMessage::NodeReached(peer_id) => { + HandledNodesEvent::NodeReached { + task: match self.tasks.entry(task_id) { + Entry::Occupied(inner) => Task { inner }, + Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED") + }, + peer_id + } + }, + InToExtMessage::TaskClosed(result, handler) => { + let (_, user_data) = self.tasks.remove(&task_id) + .expect("poll_inner only returns valid TaskIds; QED"); + HandledNodesEvent::TaskClosed { + id: task_id, result, handler, user_data, + } + }, + }) + } + + /// Since non-lexical lifetimes still don't work very well in Rust at the moment, we have to + /// split `poll()` in two. This method returns an `InToExtMessage` that is guaranteed to come + /// from an alive task. + // TODO: look into merging with `poll()` + fn poll_inner(&mut self) -> Async<(InToExtMessage, TaskId)> { for to_spawn in self.to_spawn.drain() { tokio_executor::spawn(to_spawn); } + loop { match self.events_rx.poll() { Ok(Async::Ready(Some((message, task_id)))) => { // If the task id is no longer in `self.tasks`, that means that the user called // `close()` on this task earlier. Therefore no new event should be generated // for this task. - if !self.tasks.contains_key(&task_id) { - continue; - }; - - match message { - InToExtMessage::NodeEvent(event) => { - break Async::Ready(HandledNodesEvent::NodeEvent { - id: task_id, - event, - }); - }, - InToExtMessage::NodeReached(peer_id) => { - break Async::Ready(HandledNodesEvent::NodeReached { - id: task_id, - peer_id, - }); - }, - InToExtMessage::TaskClosed(result, handler) => { - let _ = self.tasks.remove(&task_id); - break Async::Ready(HandledNodesEvent::TaskClosed { - id: task_id, result, handler - }); - }, + if self.tasks.contains_key(&task_id) { + break Async::Ready((message, task_id)); } } Ok(Async::NotReady) => { @@ -316,11 +338,11 @@ impl } /// Access to a task in the collection. -pub struct Task<'a, TInEvent> { - inner: OccupiedEntry<'a, TaskId, mpsc::UnboundedSender>, +pub struct Task<'a, TInEvent, TUserData> { + inner: OccupiedEntry<'a, TaskId, (mpsc::UnboundedSender, TUserData)>, } -impl<'a, TInEvent> Task<'a, TInEvent> { +impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> { /// Sends an event to the given node. // TODO: report back on delivery #[inline] @@ -328,7 +350,17 @@ impl<'a, TInEvent> Task<'a, TInEvent> { // It is possible that the sender is closed if the background task has already finished // but the local state hasn't been updated yet because we haven't been polled in the // meanwhile. - let _ = self.inner.get_mut().unbounded_send(event); + let _ = self.inner.get_mut().0.unbounded_send(event); + } + + /// Returns the user data associated with the task. + pub fn user_data(&self) -> &TUserData { + &self.inner.get().1 + } + + /// Returns the user data associated with the task. + pub fn user_data_mut(&mut self) -> &mut TUserData { + &mut self.inner.get_mut().1 } /// Returns the task id. @@ -345,26 +377,18 @@ impl<'a, TInEvent> Task<'a, TInEvent> { } } -impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> { +impl<'a, TInEvent, TUserData> fmt::Debug for Task<'a, TInEvent, TUserData> +where + TUserData: fmt::Debug, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_tuple("Task") .field(&self.id()) + .field(self.user_data()) .finish() } } -impl Stream for - HandledNodesTasks -{ - type Item = HandledNodesEvent; - type Error = Void; // TODO: use ! once stable - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - Ok(self.poll().map(Option::Some)) - } -} - /// Message to transmit from a task to the public API. #[derive(Debug)] enum InToExtMessage { diff --git a/core/src/nodes/handled_node_tasks/tests.rs b/core/src/nodes/handled_node_tasks/tests.rs index d24d2a87..8a41c2a8 100644 --- a/core/src/nodes/handled_node_tasks/tests.rs +++ b/core/src/nodes/handled_node_tasks/tests.rs @@ -27,10 +27,8 @@ use std::io; use assert_matches::assert_matches; use futures::future::{self, FutureResult}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use crate::nodes::handled_node::NodeHandlerEvent; -use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode}; +use crate::tests::dummy_handler::{Handler, InEvent, OutEvent, TestHandledNode}; use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; -use tokio::runtime::Builder; use tokio::runtime::current_thread::Runtime; use void::Void; use crate::PeerId; @@ -99,7 +97,7 @@ impl NodeTaskTestBuilder { } } -type TestHandledNodesTasks = HandledNodesTasks; +type TestHandledNodesTasks = HandledNodesTasks; struct HandledNodeTaskTestBuilder { muxer: DummyMuxer, @@ -120,22 +118,6 @@ impl HandledNodeTaskTestBuilder { self.task_count = amt; self } - fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self { - self.muxer.set_inbound_connection_state(state); - self - } - fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self { - self.muxer.set_outbound_connection_state(state); - self - } - fn with_handler_state(&mut self, state: HandlerState) -> &mut Self { - self.handler.state = Some(state); - self - } - fn with_handler_states(&mut self, states: Vec) -> &mut Self { - self.handler.next_states = states; - self - } fn handled_nodes_tasks(&mut self) -> (TestHandledNodesTasks, Vec) { let mut handled_nodes = HandledNodesTasks::new(); let peer_id = PeerId::random(); @@ -143,7 +125,7 @@ impl HandledNodeTaskTestBuilder { for _i in 0..self.task_count { let fut = future::ok((peer_id.clone(), self.muxer.clone())); task_ids.push( - handled_nodes.add_reach_attempt(fut, self.handler.clone()) + handled_nodes.add_reach_attempt(fut, (), self.handler.clone()) ); } (handled_nodes, task_ids) @@ -249,29 +231,6 @@ fn query_for_tasks() { assert!(handled_nodes.task(TaskId(545534)).is_none()); } -#[test] -fn send_event_to_task() { - let (mut handled_nodes, _) = HandledNodeTaskTestBuilder::new() - .with_tasks(1) - .handled_nodes_tasks(); - - let task_id = { - let mut task = handled_nodes.task(TaskId(0)).expect("can fetch a Task"); - task.send_event(InEvent::Custom("banana")); - task.id() - }; - - let mut rt = Builder::new().core_threads(1).build().unwrap(); - let mut events = rt.block_on(handled_nodes.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..}); - - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: event_task_id, event} => { - assert_eq!(event_task_id, task_id); - assert_matches!(event, OutEvent::Custom("banana")); - }); -} - #[test] fn iterate_over_all_tasks() { let (handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new() @@ -286,83 +245,12 @@ fn iterate_over_all_tasks() { #[test] fn add_reach_attempt_prepares_a_new_task() { - let mut handled_nodes: HandledNodesTasks<_, _, _, _, _> = HandledNodesTasks::new(); + let mut handled_nodes: HandledNodesTasks<_, _, _, _, _, _> = HandledNodesTasks::new(); assert_eq!(handled_nodes.tasks().count(), 0); assert_eq!(handled_nodes.to_spawn.len(), 0); - handled_nodes.add_reach_attempt( future::empty::<_, Void>(), Handler::default() ); + handled_nodes.add_reach_attempt(future::empty::<_, Void>(), (), Handler::default()); assert_eq!(handled_nodes.tasks().count(), 1); assert_eq!(handled_nodes.to_spawn.len(), 1); } - -#[test] -fn running_handled_tasks_reaches_the_nodes() { - let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new() - .with_tasks(5) - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Err) // stop the loop - .handled_nodes_tasks(); - - let mut rt = Runtime::new().unwrap(); - let mut events: (Option>, TestHandledNodesTasks); - // we're running on a single thread so events are sequential: first - // we get a NodeReached, then a TaskClosed - for i in 0..5 { - events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); - assert_matches!(events, (Some(HandledNodesEvent::NodeReached{..}), ref hnt) => { - assert_matches!(hnt, HandledNodesTasks{..}); - assert_eq!(hnt.tasks().count(), 5 - i); - }); - handled_nodes_tasks = events.1; - events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); - assert_matches!(events, (Some(HandledNodesEvent::TaskClosed{..}), _)); - handled_nodes_tasks = events.1; - } -} - -#[test] -fn events_in_tasks_are_emitted() { - // States are pop()'d so they are set in reverse order by the Handler - let handler_states = vec![ - HandlerState::Err, - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler2") ))), - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler") ))), - ]; - - let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new() - .with_tasks(1) - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Opened) - .with_handler_states(handler_states) - .handled_nodes_tasks(); - - let tx = { - let mut task0 = handled_nodes_tasks.task(TaskId(0)).unwrap(); - let tx = task0.inner.get_mut(); - tx.clone() - }; - - let mut rt = Builder::new().core_threads(1).build().unwrap(); - let mut events = rt.block_on(handled_nodes_tasks.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..}); - - tx.unbounded_send(InEvent::NextState).expect("send works"); - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => { - assert_matches!(event, OutEvent::Custom("from handler")); - }); - - tx.unbounded_send(InEvent::NextState).expect("send works"); - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => { - assert_matches!(event, OutEvent::Custom("from handler2")); - }); - - tx.unbounded_send(InEvent::NextState).expect("send works"); - events = rt.block_on(events.1.into_future()).unwrap(); - assert_matches!(events.0.unwrap(), HandledNodesEvent::TaskClosed{id: _, result, handler: _} => { - assert_matches!(result, Err(_)); - }); -} diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index 4f175600..21bfd3d2 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -30,7 +30,7 @@ use tokio_timer::{self, Delay}; use void::{Void, unreachable}; /// Delay between the moment we connect and the first time we identify. -const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500); +const DELAY_TO_FIRST_ID: Duration = Duration::from_secs(3600); /// After an identification succeeded, wait this long before the next time. const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60); /// After we failed to identify the remote, try again after the given delay.