From ee9ff643a547f45113793315eaad7a4ed7fcb9b2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 21 Sep 2018 17:31:52 +0200 Subject: [PATCH] Split collection.rs in two (#507) * Split collection.rs in two * Fix forgot to interrupt inner task * Another fix in collection.rs * More qed --- core/src/nodes/collection.rs | 549 +++++++-------------------- core/src/nodes/handled_node_tasks.rs | 436 +++++++++++++++++++++ core/src/nodes/mod.rs | 2 + 3 files changed, 576 insertions(+), 411 deletions(-) create mode 100644 core/src/nodes/handled_node_tasks.rs diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index c0578b50..45300882 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -19,80 +19,41 @@ // DEALINGS IN THE SOFTWARE. use fnv::FnvHashMap; -use futures::{prelude::*, sync::mpsc, sync::oneshot, task}; +use futures::prelude::*; use muxing::StreamMuxer; use nodes::node::Substream; -use nodes::handled_node::{HandledNode, NodeHandler}; -use smallvec::SmallVec; -use std::collections::hash_map::{Entry, OccupiedEntry}; -use std::io::Error as IoError; -use tokio_executor; +use nodes::handled_node_tasks::{HandledNodesEvent, HandledNodesTasks}; +use nodes::handled_node_tasks::{Task as HandledNodesTask, TaskId}; +use nodes::handled_node::NodeHandler; +use std::collections::hash_map::Entry; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use void::Void; use {Multiaddr, PeerId}; // TODO: make generic over PeerId -// Implementor notes -// ================= -// -// This collection of nodes spawns a task for each individual node to process. This means that -// events happen on the background at the same time as the `CollectionStream` is being polled. -// -// In order to make the API non-racy and avoid issues, we totally separate the state in the -// `CollectionStream` and the states that the task nodes can access. They are only allowed to -// exchange messages. The state in the `CollectionStream` is therefore delayed compared to the -// tasks, and is updated only when `poll()` is called. -// -// The only thing that we must be careful about is substreams, as they are "detached" from the -// state of the `CollectionStream` and allowed to process in parallel. This is why there is no -// "substream closed" event being reported, as it could potentially create confusions and race -// conditions in the user's code. See similar comments in the documentation of `NodeStream`. - /// Implementation of `Stream` that handles a collection of nodes. // TODO: implement Debug pub struct CollectionStream { - /// List of nodes, with a sender allowing to communicate messages. - nodes: FnvHashMap)>, - /// Known state of a task. Tasks are identified by the reach attempt ID. - tasks: FnvHashMap, - /// Identifier for the next task to spawn. - next_task_id: ReachAttemptId, - - /// List of node tasks to spawn. - // TODO: stronger typing? - to_spawn: SmallVec<[Box + Send>; 8]>, - /// Task to notify when an element is added to `to_spawn`. - to_notify: Option, - - /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. - events_tx: mpsc::UnboundedSender<(InToExtMessage, ReachAttemptId)>, - /// Receiver side for the events. - events_rx: mpsc::UnboundedReceiver<(InToExtMessage, ReachAttemptId)>, + /// Object that handles the tasks. + inner: HandledNodesTasks, + /// List of nodes, with the task id that handles this node. The corresponding entry in `tasks` + /// must always be in the `Connected` state. + nodes: FnvHashMap, + /// List of tasks and their state. If `Connected`, then a corresponding entry must be present + /// in `nodes`. + tasks: FnvHashMap, } -/// State of a task, as known by the frontend (the `ColletionStream`). Asynchronous compared to -/// the actual state. -enum TaskKnownState { +/// State of a task. +#[derive(Debug, Clone, PartialEq, Eq)] +enum TaskState { /// Task is attempting to reach a peer. - Pending { interrupt: oneshot::Sender<()> }, - /// The user interrupted this task. - Interrupted, + Pending, /// The task is connected to a peer. Connected(PeerId), } -impl TaskKnownState { - /// Returns `true` for `Pending`. - #[inline] - fn is_pending(&self) -> bool { - match *self { - TaskKnownState::Pending { .. } => true, - TaskKnownState::Interrupted => false, - TaskKnownState::Connected(_) => false, - } - } -} - /// Event that can happen on the `CollectionStream`. // TODO: implement Debug pub enum CollectionEvent { @@ -151,22 +112,16 @@ pub enum CollectionEvent { /// Identifier for a future that attempts to reach a node. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct ReachAttemptId(usize); +pub struct ReachAttemptId(TaskId); impl CollectionStream { /// Creates a new empty collection. #[inline] pub fn new() -> Self { - let (events_tx, events_rx) = mpsc::unbounded(); - CollectionStream { + inner: HandledNodesTasks::new(), nodes: Default::default(), tasks: Default::default(), - next_task_id: ReachAttemptId(0), - to_spawn: SmallVec::new(), - to_notify: None, - events_tx, - events_rx, } } @@ -186,81 +141,62 @@ impl CollectionStream { TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required { - let reach_attempt_id = self.next_task_id; - self.next_task_id.0 += 1; - - let (interrupt_tx, interrupt_rx) = oneshot::channel(); - self.tasks.insert( - reach_attempt_id, - TaskKnownState::Pending { - interrupt: interrupt_tx, - }, - ); - - let task = Box::new(NodeTask { - inner: NodeTaskInner::Future { - future, - interrupt: interrupt_rx, - handler: Some(handler), - }, - events_tx: self.events_tx.clone(), - id: reach_attempt_id, - }); - - self.to_spawn.push(task); - - if let Some(task) = self.to_notify.take() { - task.notify(); - } - - reach_attempt_id + let id = self.inner.add_reach_attempt(future, handler); + self.tasks.insert(id, TaskState::Pending); + ReachAttemptId(id) } /// Interrupts a reach attempt. /// /// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid. pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), ()> { - match self.tasks.entry(id) { - Entry::Vacant(_) => return Err(()), - Entry::Occupied(mut entry) => { + match self.tasks.entry(id.0) { + Entry::Vacant(_) => Err(()), + Entry::Occupied(entry) => { match entry.get() { - &TaskKnownState::Connected(_) => return Err(()), - &TaskKnownState::Interrupted => return Err(()), - &TaskKnownState::Pending { .. } => (), + &TaskState::Connected(_) => return Err(()), + &TaskState::Pending => (), }; - match entry.insert(TaskKnownState::Interrupted) { - TaskKnownState::Pending { interrupt } => { - let _ = interrupt.send(()); - } - TaskKnownState::Interrupted | TaskKnownState::Connected(_) => unreachable!(), - }; + entry.remove(); + self.inner.task(id.0) + .expect("whenever we receive a TaskClosed event or interrupt a task, we \ + remove the corresponding entry from self.tasks ; therefore all \ + elements in self.tasks are valid tasks in the \ + HandledNodesTasks ; qed") + .close(); + + Ok(()) } } - - Ok(()) } /// Sends an event to all nodes. + #[inline] pub fn broadcast_event(&mut self, event: &TInEvent) where TInEvent: Clone, { - for &(_, ref sender) in self.nodes.values() { - let _ = sender.unbounded_send(event.clone()); // TODO: unwrap - } + // TODO: remove the ones we're not connected to? + self.inner.broadcast_event(event) } - /// Grants access to an object that allows controlling a node of the collection. + /// Grants access to an object that allows controlling a peer of the collection. /// /// Returns `None` if we don't have a connection to this peer. #[inline] pub fn peer_mut(&mut self, id: &PeerId) -> Option> { - match self.nodes.entry(id.clone()) { - Entry::Occupied(inner) => Some(PeerMut { + let task = match self.nodes.get(id) { + Some(&task) => task, + None => return None, + }; + + match self.inner.task(task) { + Some(inner) => Some(PeerMut { inner, tasks: &mut self.tasks, + nodes: &mut self.nodes, }), - Entry::Vacant(_) => None, + None => None, } } @@ -283,38 +219,32 @@ impl CollectionStream { /// Access to a peer in the collection. pub struct PeerMut<'a, TInEvent: 'a> { - inner: OccupiedEntry<'a, PeerId, (ReachAttemptId, mpsc::UnboundedSender)>, - tasks: &'a mut FnvHashMap, + inner: HandledNodesTask<'a, TInEvent>, + tasks: &'a mut FnvHashMap, + nodes: &'a mut FnvHashMap, } impl<'a, TInEvent> PeerMut<'a, TInEvent> { /// Sends an event to the given node. #[inline] pub fn send_event(&mut self, event: TInEvent) { - // It is possible that the sender is closed if the task has already finished but we - // haven't been polled in the meanwhile. - let _ = self.inner.get_mut().1.unbounded_send(event); + self.inner.send_event(event) } /// Closes the connections to this node. /// /// No further event will be generated for this node. pub fn close(self) { - let (peer_id, (task_id, _)) = self.inner.remove_entry(); - // Set the task to `Interrupted` so that we ignore further messages from this closed node. - match self.tasks.insert(task_id, TaskKnownState::Interrupted) { - Some(TaskKnownState::Connected(ref p)) if p == &peer_id => (), - None - | Some(TaskKnownState::Connected(_)) - | Some(TaskKnownState::Pending { .. }) - | Some(TaskKnownState::Interrupted) => { - panic!("The task_id we have was retreived from self.nodes. We insert in this \ - hashmap when we reach a node, in which case we also insert Connected in \ - self.tasks with the corresponding peer ID. Once a task is Connected, it \ - can no longer switch back to Pending. We switch the state to Interrupted \ - only when we remove from self.nodes at the same time.") - }, - } + let task_state = self.tasks.remove(&self.inner.id()); + if let Some(TaskState::Connected(peer_id)) = task_state { + let old_task_id = self.nodes.remove(&peer_id); + debug_assert_eq!(old_task_id, Some(self.inner.id())); + } else { + panic!("a PeerMut can only be created if an entry is present in nodes ; an entry in \ + nodes always matched a Connected entry in tasks ; qed"); + }; + + self.inner.close(); } } @@ -323,295 +253,92 @@ impl Stream for CollectionStream { type Error = Void; // TODO: use ! once stable fn poll(&mut self) -> Poll, Self::Error> { - for to_spawn in self.to_spawn.drain() { - tokio_executor::spawn(to_spawn); - } + let item = try_ready!(self.inner.poll()); - loop { - return match self.events_rx.poll() { - Ok(Async::Ready(Some((InToExtMessage::NodeEvent(event), task_id)))) => { - let peer_id = match self.tasks.get(&task_id) { - Some(TaskKnownState::Connected(ref peer_id)) => peer_id.clone(), - Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task. - None | Some(TaskKnownState::Pending { .. }) => { - panic!("We insert Pending in self.tasks when a node is opened, and we \ - set it to Connected when we receive a NodeReached event from \ - this task. The way the task works, we are guaranteed to \ - a NodeReached event before any NodeEvent.") - } - }; - - Ok(Async::Ready(Some(CollectionEvent::NodeEvent { - peer_id, - event, - }))) - } - Ok(Async::Ready(Some((InToExtMessage::NodeReached(peer_id, sender), task_id)))) => { - { - let existing = match self.tasks.get_mut(&task_id) { - Some(state) => state, - None => panic!("We insert in self.tasks the corresponding task_id \ - when we create a task, and we only remove from it \ - if we receive the events NodeClosed, NodeError and \ - ReachError, that are produced only when a task ends. \ - After a task ends, we don't receive any more event \ - from it.") - }; - - match existing { - TaskKnownState::Pending { .. } => (), - TaskKnownState::Interrupted => continue, - TaskKnownState::Connected(_) => { - panic!("The only code that sets a task to the Connected state \ - is when we receive a NodeReached event. If we are already \ - connected, that would mean we received NodeReached twice, \ - which is not possible as a task changes state after \ - sending this event.") - }, - } - - *existing = TaskKnownState::Connected(peer_id.clone()); - } - - let replaced_node = self.nodes.insert(peer_id.clone(), (task_id, sender)); - if let Some(replaced_node) = replaced_node { - let old = self.tasks.insert(replaced_node.0, TaskKnownState::Interrupted); - debug_assert_eq!(old.map(|s| s.is_pending()), Some(false)); - Ok(Async::Ready(Some(CollectionEvent::NodeReplaced { - peer_id, - id: task_id, + match item { + Some(HandledNodesEvent::TaskClosed { id, result }) => { + match (self.tasks.remove(&id), result) { + (Some(TaskState::Pending), Err(err)) => { + Ok(Async::Ready(Some(CollectionEvent::ReachError { + id: ReachAttemptId(id), + error: err, }))) - } else { - Ok(Async::Ready(Some(CollectionEvent::NodeReached { - peer_id, - id: task_id, + }, + (Some(TaskState::Pending), Ok(())) => { + // TODO: this variant shouldn't happen ; prove this + Ok(Async::Ready(Some(CollectionEvent::ReachError { + id: ReachAttemptId(id), + error: IoError::new(IoErrorKind::Other, "couldn't reach the node"), }))) - } + }, + (Some(TaskState::Connected(peer_id)), Ok(())) => { + let _node_task_id = self.nodes.remove(&peer_id); + debug_assert_eq!(_node_task_id, Some(id)); + Ok(Async::Ready(Some(CollectionEvent::NodeClosed { + peer_id, + }))) + }, + (Some(TaskState::Connected(peer_id)), Err(err)) => { + let _node_task_id = self.nodes.remove(&peer_id); + debug_assert_eq!(_node_task_id, Some(id)); + Ok(Async::Ready(Some(CollectionEvent::NodeError { + peer_id, + error: err, + }))) + }, + (None, _) => { + panic!("self.tasks is always kept in sync with the tasks in self.inner ; \ + when we add a task in self.inner we add a corresponding entry in \ + self.tasks, and remove the entry only when the task is closed ; \ + qed") + }, } - Ok(Async::Ready(Some((InToExtMessage::NodeClosed, task_id)))) => { - let peer_id = match self.tasks.remove(&task_id) { - Some(TaskKnownState::Connected(peer_id)) => peer_id.clone(), - Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task. - None | Some(TaskKnownState::Pending { .. }) => { - panic!("We insert Pending in self.tasks when a node is opened, and we \ - set it to Connected when we receive a NodeReached event from \ - this task. The way the task works, we are guaranteed to \ - a NodeReached event before a NodeClosed.") - } - }; + }, + Some(HandledNodesEvent::NodeReached { id, peer_id }) => { + // Set the state of the task to `Connected`. + let former_task_id = self.nodes.insert(peer_id.clone(), id); + let _former_state = self.tasks.insert(id, TaskState::Connected(peer_id.clone())); + debug_assert_eq!(_former_state, Some(TaskState::Pending)); - let val = self.nodes.remove(&peer_id); - debug_assert!(val.is_some()); - Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id }))) - } - Ok(Async::Ready(Some((InToExtMessage::NodeError(err), task_id)))) => { - let peer_id = match self.tasks.remove(&task_id) { - Some(TaskKnownState::Connected(peer_id)) => peer_id.clone(), - Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task. - None | Some(TaskKnownState::Pending { .. }) => { - panic!("We insert Pending in self.tasks when a node is opened, and we \ - set it to Connected when we receive a NodeReached event from \ - this task. The way the task works, we are guaranteed to \ - a NodeReached event before a NodeError.") - } - }; + // It is possible that we already have a task connected to the same peer. In this + // case, we need to emit a `NodeReplaced` event. + if let Some(former_task_id) = former_task_id { + self.inner.task(former_task_id) + .expect("whenever we receive a TaskClosed event or close a node, we \ + remove the corresponding entry from self.nodes ; therefore all \ + elements in self.nodes are valid tasks in the \ + HandledNodesTasks ; qed") + .close(); + let _former_other_state = self.tasks.remove(&former_task_id); + debug_assert_eq!(_former_other_state, Some(TaskState::Connected(peer_id.clone()))); - let val = self.nodes.remove(&peer_id); - debug_assert!(val.is_some()); - Ok(Async::Ready(Some(CollectionEvent::NodeError { + Ok(Async::Ready(Some(CollectionEvent::NodeReplaced { peer_id, - error: err, + id: ReachAttemptId(id), + }))) + + } else { + Ok(Async::Ready(Some(CollectionEvent::NodeReached { + peer_id, + id: ReachAttemptId(id), }))) } - Ok(Async::Ready(Some((InToExtMessage::ReachError(err), task_id)))) => { - match self.tasks.remove(&task_id) { - Some(TaskKnownState::Interrupted) => continue, - Some(TaskKnownState::Pending { .. }) => (), - None | Some(TaskKnownState::Connected(_)) => { - panic!("We insert Pending in self.tasks when a node is opened, and we \ - set it to Connected when we receive a NodeReached event from \ - this task. The way the task works, we are guaranteed to \ - a NodeReached event before a ReachError.") - } - }; - - Ok(Async::Ready(Some(CollectionEvent::ReachError { - id: task_id, - error: err, - }))) - } - Ok(Async::NotReady) => { - self.to_notify = Some(task::current()); - Ok(Async::NotReady) - } - Ok(Async::Ready(None)) => { - unreachable!("The sender is in self as well, therefore the receiver never \ - closes.") - }, - Err(()) => unreachable!("An unbounded receiver never errors"), - }; - } - } -} - -/// Message to transmit from a task to the public API. -enum InToExtMessage { - /// A connection to a node has succeeded. - /// Closing the returned sender will end the task. - NodeReached(PeerId, mpsc::UnboundedSender), - NodeClosed, - NodeError(IoError), - ReachError(IoError), - /// An event from the node. - NodeEvent(TOutEvent), -} - -/// Implementation of `Future` that handles a single node, and all the communications between -/// the various components of the `CollectionStream`. -struct NodeTask -where - TMuxer: StreamMuxer, - THandler: NodeHandler>, -{ - /// Sender to transmit events to the outside. - events_tx: mpsc::UnboundedSender<(InToExtMessage, ReachAttemptId)>, - /// Inner state of the `NodeTask`. - inner: NodeTaskInner, - /// Identifier of the attempt. - id: ReachAttemptId, -} - -enum NodeTaskInner -where - TMuxer: StreamMuxer, - THandler: NodeHandler>, -{ - /// Future to resolve to connect to the node. - Future { - /// The future that will attempt to reach the node. - future: TFut, - /// Allows interrupting the attempt. - interrupt: oneshot::Receiver<()>, - /// The handler that will be used to build the `HandledNode`. - handler: Option, - }, - - /// Fully functional node. - Node { - /// The object that is actually processing things. - node: HandledNode, - /// Receiving end for events sent from the main `CollectionStream`. `None` if closed. - in_events_rx: Option>, - }, -} - -impl Future for - NodeTask -where - TMuxer: StreamMuxer, - TFut: Future, - TAddrFut: Future, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - // Remember that this poll function is dedicated to a single node and is run - // asynchronously. - - // First, handle if we are still trying to reach a node. - let new_state = if let NodeTaskInner::Future { - ref mut future, - ref mut interrupt, - ref mut handler, - } = self.inner - { - match interrupt.poll() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(())) | Err(_) => return Ok(Async::Ready(())), - } - - match future.poll() { - Ok(Async::Ready(((peer_id, muxer), addr_fut))) => { - let (sender, rx) = mpsc::unbounded(); - let event = InToExtMessage::NodeReached(peer_id, sender); - let _ = self.events_tx.unbounded_send((event, self.id)); - - let handler = handler.take() - .expect("The handler is only extracted right before we switch state"); - - Some(NodeTaskInner::Node { - node: HandledNode::new(muxer, addr_fut, handler), - in_events_rx: Some(rx), - }) - } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); - } - Err(error) => { - // End the task - let event = InToExtMessage::ReachError(error); - let _ = self.events_tx.unbounded_send((event, self.id)); - return Ok(Async::Ready(())); - } - } - } else { - None - }; - - if let Some(new_state) = new_state { - self.inner = new_state; - } - - // Then handle if we're a node. - if let NodeTaskInner::Node { - ref mut node, - ref mut in_events_rx, - } = self.inner - { - // Start by handling commands received from the outside of the task. - if let Some(mut local_rx) = in_events_rx.take() { - *in_events_rx = loop { - match local_rx.poll() { - Ok(Async::Ready(Some(event))) => { - node.inject_event(event); - }, - Ok(Async::Ready(None)) => { - // Node closed by the external API ; start shutdown process. - node.shutdown(); - break None; - } - Ok(Async::NotReady) => break Some(local_rx), - Err(()) => unreachable!("An unbounded receiver never errors"), - } + }, + Some(HandledNodesEvent::NodeEvent { id, event }) => { + let peer_id = match self.tasks.get(&id) { + Some(TaskState::Connected(peer_id)) => peer_id.clone(), + _ => panic!("we can only receive NodeEvent events from a task after we \ + received a corresponding NodeReached event from that same task ; \ + when we receive a NodeReached event, we ensure that the entry in \ + self.tasks is switched to the Connected state ; qed"), }; - } - // Process the node. - loop { - match node.poll() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(event))) => { - let event = InToExtMessage::NodeEvent(event); - let _ = self.events_tx.unbounded_send((event, self.id)); - } - Ok(Async::Ready(None)) => { - let event = InToExtMessage::NodeClosed; - let _ = self.events_tx.unbounded_send((event, self.id)); - return Ok(Async::Ready(())); // End the task. - } - Err(err) => { - let event = InToExtMessage::NodeError(err); - let _ = self.events_tx.unbounded_send((event, self.id)); - return Ok(Async::Ready(())); // End the task. - } - } + Ok(Async::Ready(Some(CollectionEvent::NodeEvent { + peer_id, + event, + }))) } + None => Ok(Async::Ready(None)), } - - // Nothing's ready. The current task should have been registered by all of the inner - // handlers. - Ok(Async::NotReady) } } diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs new file mode 100644 index 00000000..5f76d96e --- /dev/null +++ b/core/src/nodes/handled_node_tasks.rs @@ -0,0 +1,436 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use fnv::FnvHashMap; +use futures::{prelude::*, stream, sync::mpsc, task}; +use muxing::StreamMuxer; +use nodes::node::Substream; +use nodes::handled_node::{HandledNode, NodeHandler}; +use smallvec::SmallVec; +use std::collections::hash_map::{Entry, OccupiedEntry}; +use std::io::Error as IoError; +use std::mem; +use tokio_executor; +use void::Void; +use {Multiaddr, PeerId}; + +// TODO: make generic over PeerId + +// Implementor notes +// ================= +// +// This collection of nodes spawns a task for each individual node to process. This means that +// events happen on the background at the same time as the `HandledNodesTasks` is being polled. +// +// In order to make the API non-racy and avoid issues, we totally separate the state in the +// `HandledNodesTasks` and the states that the task nodes can access. They are only allowed to +// exchange messages. The state in the `HandledNodesTasks` is therefore delayed compared to the +// tasks, and is updated only when `poll()` is called. +// +// The only thing that we must be careful about is substreams, as they are "detached" from the +// state of the `HandledNodesTasks` and allowed to process in parallel. This is why there is no +// "substream closed" event being reported, as it could potentially create confusions and race +// conditions in the user's code. See similar comments in the documentation of `NodeStream`. + +/// Implementation of `Stream` that handles a collection of nodes. +// TODO: implement Debug +pub struct HandledNodesTasks { + /// For each active task, a sender allowing to transmit messages. Closing the sender interrupts + /// the task. It is possible that we receive messages from tasks that used to be in this list + /// but no longer are, in which case we should ignore them. + tasks: FnvHashMap>, + /// Identifier for the next task to spawn. + next_task_id: TaskId, + + /// List of node tasks to spawn. + // TODO: stronger typing? + to_spawn: SmallVec<[Box + Send>; 8]>, + /// Task to notify when an element is added to `to_spawn`. + to_notify: Option, + + /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. + events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, + /// Receiver side for the events. + events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, +} + +/// Event that can happen on the `HandledNodesTasks`. +#[derive(Debug)] +pub enum HandledNodesEvent { + /// A task has been closed. + /// + /// This happens once the node handler closes or an error happens. + TaskClosed { + /// Identifier of the task that closed. + id: TaskId, + /// What happened. + result: Result<(), IoError>, + }, + + /// A task has succeesfully connected to a node. + NodeReached { + /// Identifier of the task that succeeded. + id: TaskId, + /// Identifier of the node. + peer_id: PeerId, + }, + + /// A task has produced an event. + NodeEvent { + /// Identifier of the task that produced the event. + id: TaskId, + /// The produced event. + event: TOutEvent, + }, +} + +/// Identifier for a future that attempts to reach a node. +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct TaskId(usize); + +impl HandledNodesTasks { + /// Creates a new empty collection. + #[inline] + pub fn new() -> Self { + let (events_tx, events_rx) = mpsc::unbounded(); + + HandledNodesTasks { + tasks: Default::default(), + next_task_id: TaskId(0), + to_spawn: SmallVec::new(), + to_notify: None, + events_tx, + events_rx, + } + } + + /// Adds to the collection a future that tries to reach a node. + /// + /// This method spawns a task dedicated to resolving this future and processing the node's + /// events. + pub fn add_reach_attempt(&mut self, future: TFut, handler: THandler) + -> TaskId + where + TFut: Future + Send + 'static, + TAddrFut: Future + Send + 'static, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? + TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required + TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required + { + let task_id = self.next_task_id; + self.next_task_id.0 += 1; + + let (tx, rx) = mpsc::unbounded(); + self.tasks.insert(task_id, tx); + + let task = Box::new(NodeTask { + inner: NodeTaskInner::Future { + future, + handler, + events_buffer: Vec::new(), + }, + events_tx: self.events_tx.clone(), + in_events_rx: rx.fuse(), + id: task_id, + }); + + self.to_spawn.push(task); + + // We notify the polling task so that `to_spawn` gets flushed. + if let Some(task) = self.to_notify.take() { + task.notify(); + } + + task_id + } + + /// Sends an event to all the tasks, including the pending ones. + pub fn broadcast_event(&mut self, event: &TInEvent) + where TInEvent: Clone, + { + for sender in self.tasks.values() { + // Note: it is possible that sending an event fails if the background task has already + // finished, but the local state hasn't reflected that yet becaues it hasn't been + // polled. This is not an error situation. + let _ = sender.unbounded_send(event.clone()); + } + } + + /// Grants access to an object that allows controlling a task of the collection. + /// + /// Returns `None` if the task id is invalid. + #[inline] + pub fn task(&mut self, id: TaskId) -> Option> { + match self.tasks.entry(id.clone()) { + Entry::Occupied(inner) => Some(Task { inner }), + Entry::Vacant(_) => None, + } + } + + /// Returns a list of all the active tasks. + #[inline] + pub fn tasks<'a>(&'a self) -> impl Iterator + 'a { + self.tasks.keys().cloned() + } +} + +/// Access to a task in the collection. +pub struct Task<'a, TInEvent: 'a> { + inner: OccupiedEntry<'a, TaskId, mpsc::UnboundedSender>, +} + +impl<'a, TInEvent> Task<'a, TInEvent> { + /// Sends an event to the given node. + #[inline] + pub fn send_event(&mut self, event: TInEvent) { + // It is possible that the sender is closed if the background task has already finished + // but the local state hasn't been updated yet because we haven't been polled in the + // meanwhile. + let _ = self.inner.get_mut().unbounded_send(event); + } + + /// Returns the task id. + #[inline] + pub fn id(&self) -> TaskId { + *self.inner.key() + } + + /// Closes the task. + /// + /// No further event will be generated for this task. + pub fn close(self) { + self.inner.remove(); + } +} + +impl Stream for HandledNodesTasks { + type Item = HandledNodesEvent; + type Error = Void; // TODO: use ! once stable + + fn poll(&mut self) -> Poll, Self::Error> { + for to_spawn in self.to_spawn.drain() { + tokio_executor::spawn(to_spawn); + } + + loop { + match self.events_rx.poll() { + Ok(Async::Ready(Some((message, task_id)))) => { + // If the task id is no longer in `self.tasks`, that means that the user called + // `close()` on this task earlier. Therefore no new event should be generated + // for this task. + if !self.tasks.contains_key(&task_id) { + continue; + }; + + match message { + InToExtMessage::NodeEvent(event) => { + break Ok(Async::Ready(Some(HandledNodesEvent::NodeEvent { + id: task_id, + event, + }))); + }, + InToExtMessage::NodeReached(peer_id) => { + break Ok(Async::Ready(Some(HandledNodesEvent::NodeReached { + id: task_id, + peer_id, + }))); + }, + InToExtMessage::TaskClosed(result) => { + let _ = self.tasks.remove(&task_id); + break Ok(Async::Ready(Some(HandledNodesEvent::TaskClosed { + id: task_id, result + }))); + }, + } + } + Ok(Async::NotReady) => { + self.to_notify = Some(task::current()); + break Ok(Async::NotReady); + } + Ok(Async::Ready(None)) => { + unreachable!("The sender is in self as well, therefore the receiver never \ + closes.") + }, + Err(()) => unreachable!("An unbounded receiver never errors"), + } + } + } +} + +/// Message to transmit from a task to the public API. +enum InToExtMessage { + /// A connection to a node has succeeded. + NodeReached(PeerId), + /// The task closed. + TaskClosed(Result<(), IoError>), + /// An event from the node. + NodeEvent(TOutEvent), +} + +/// Implementation of `Future` that handles a single node, and all the communications between +/// the various components of the `HandledNodesTasks`. +struct NodeTask +where + TMuxer: StreamMuxer, + THandler: NodeHandler>, +{ + /// Sender to transmit events to the outside. + events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, + /// Receiving end for events sent from the main `HandledNodesTasks`. + in_events_rx: stream::Fuse>, + /// Inner state of the `NodeTask`. + inner: NodeTaskInner, + /// Identifier of the attempt. + id: TaskId, +} + +enum NodeTaskInner +where + TMuxer: StreamMuxer, + THandler: NodeHandler>, +{ + /// Future to resolve to connect to the node. + Future { + /// The future that will attempt to reach the node. + future: TFut, + /// The handler that will be used to build the `HandledNode`. + handler: THandler, + /// While we are dialing the future, we need to buffer the events received on + /// `in_events_rx` so that they get delivered once dialing succeeds. We can't simply leave + /// events in `in_events_rx` because we have to detect if it gets closed. + events_buffer: Vec, + }, + + /// Fully functional node. + Node(HandledNode), + + /// A panic happened while polling. + Poisoned, +} + +impl Future for + NodeTask +where + TMuxer: StreamMuxer, + TFut: Future, + TAddrFut: Future, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + match mem::replace(&mut self.inner, NodeTaskInner::Poisoned) { + // First possibility: we are still trying to reach a node. + NodeTaskInner::Future { mut future, handler, mut events_buffer } => { + // If self.in_events_rx is closed, we stop the task. + loop { + match self.in_events_rx.poll() { + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(event))) => events_buffer.push(event), + Ok(Async::NotReady) => break, + Err(_) => unreachable!("An UnboundedReceiver never errors"), + } + } + + // Check whether dialing succeeded. + match future.poll() { + Ok(Async::Ready(((peer_id, muxer), addr_fut))) => { + let event = InToExtMessage::NodeReached(peer_id); + let mut node = HandledNode::new(muxer, addr_fut, handler); + for event in events_buffer { + node.inject_event(event); + } + if let Err(_) = self.events_tx.unbounded_send((event, self.id)) { + node.shutdown(); + } + self.inner = NodeTaskInner::Node(node); + } + Ok(Async::NotReady) => { + self.inner = NodeTaskInner::Future { future, handler, events_buffer }; + return Ok(Async::NotReady); + }, + Err(err) => { + // End the task + let event = InToExtMessage::TaskClosed(Err(err)); + let _ = self.events_tx.unbounded_send((event, self.id)); + return Ok(Async::Ready(())); + } + } + }, + + // Second possibility: we have a node. + NodeTaskInner::Node(mut node) => { + // Start by handling commands received from the outside of the task. + if !self.in_events_rx.is_done() { + loop { + match self.in_events_rx.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(event))) => { + node.inject_event(event); + }, + Ok(Async::Ready(None)) => { + // Node closed by the external API ; start shutdown process. + node.shutdown(); + break; + } + Err(()) => unreachable!("An unbounded receiver never errors"), + } + } + } + + // Process the node. + loop { + match node.poll() { + Ok(Async::NotReady) => { + self.inner = NodeTaskInner::Node(node); + return Ok(Async::NotReady); + }, + Ok(Async::Ready(Some(event))) => { + let event = InToExtMessage::NodeEvent(event); + if let Err(_) = self.events_tx.unbounded_send((event, self.id)) { + node.shutdown(); + } + } + Ok(Async::Ready(None)) => { + let event = InToExtMessage::TaskClosed(Ok(())); + let _ = self.events_tx.unbounded_send((event, self.id)); + return Ok(Async::Ready(())); // End the task. + } + Err(err) => { + let event = InToExtMessage::TaskClosed(Err(err)); + let _ = self.events_tx.unbounded_send((event, self.id)); + return Ok(Async::Ready(())); // End the task. + } + } + } + }, + + // This happens if a previous poll has ended unexpectedly. The API of futures + // guarantees that we shouldn't be polled again. + NodeTaskInner::Poisoned => panic!("the node task panicked or errored earlier") + } + } + } +} diff --git a/core/src/nodes/mod.rs b/core/src/nodes/mod.rs index fd0d8015..bf206e94 100644 --- a/core/src/nodes/mod.rs +++ b/core/src/nodes/mod.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +mod handled_node_tasks; + pub mod collection; pub mod handled_node; pub mod listeners;