mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-29 02:31:20 +00:00
Split collection.rs in two (#507)
* Split collection.rs in two * Fix forgot to interrupt inner task * Another fix in collection.rs * More qed
This commit is contained in:
parent
4fe92f81e8
commit
ee9ff643a5
@ -19,80 +19,41 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
use futures::{prelude::*, sync::mpsc, sync::oneshot, task};
|
use futures::prelude::*;
|
||||||
use muxing::StreamMuxer;
|
use muxing::StreamMuxer;
|
||||||
use nodes::node::Substream;
|
use nodes::node::Substream;
|
||||||
use nodes::handled_node::{HandledNode, NodeHandler};
|
use nodes::handled_node_tasks::{HandledNodesEvent, HandledNodesTasks};
|
||||||
use smallvec::SmallVec;
|
use nodes::handled_node_tasks::{Task as HandledNodesTask, TaskId};
|
||||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
use nodes::handled_node::NodeHandler;
|
||||||
use std::io::Error as IoError;
|
use std::collections::hash_map::Entry;
|
||||||
use tokio_executor;
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
use void::Void;
|
use void::Void;
|
||||||
use {Multiaddr, PeerId};
|
use {Multiaddr, PeerId};
|
||||||
|
|
||||||
// TODO: make generic over PeerId
|
// TODO: make generic over PeerId
|
||||||
|
|
||||||
// Implementor notes
|
|
||||||
// =================
|
|
||||||
//
|
|
||||||
// This collection of nodes spawns a task for each individual node to process. This means that
|
|
||||||
// events happen on the background at the same time as the `CollectionStream` is being polled.
|
|
||||||
//
|
|
||||||
// In order to make the API non-racy and avoid issues, we totally separate the state in the
|
|
||||||
// `CollectionStream` and the states that the task nodes can access. They are only allowed to
|
|
||||||
// exchange messages. The state in the `CollectionStream` is therefore delayed compared to the
|
|
||||||
// tasks, and is updated only when `poll()` is called.
|
|
||||||
//
|
|
||||||
// The only thing that we must be careful about is substreams, as they are "detached" from the
|
|
||||||
// state of the `CollectionStream` and allowed to process in parallel. This is why there is no
|
|
||||||
// "substream closed" event being reported, as it could potentially create confusions and race
|
|
||||||
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
|
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles a collection of nodes.
|
/// Implementation of `Stream` that handles a collection of nodes.
|
||||||
// TODO: implement Debug
|
// TODO: implement Debug
|
||||||
pub struct CollectionStream<TInEvent, TOutEvent> {
|
pub struct CollectionStream<TInEvent, TOutEvent> {
|
||||||
/// List of nodes, with a sender allowing to communicate messages.
|
/// Object that handles the tasks.
|
||||||
nodes: FnvHashMap<PeerId, (ReachAttemptId, mpsc::UnboundedSender<TInEvent>)>,
|
inner: HandledNodesTasks<TInEvent, TOutEvent>,
|
||||||
/// Known state of a task. Tasks are identified by the reach attempt ID.
|
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
|
||||||
tasks: FnvHashMap<ReachAttemptId, TaskKnownState>,
|
/// must always be in the `Connected` state.
|
||||||
/// Identifier for the next task to spawn.
|
nodes: FnvHashMap<PeerId, TaskId>,
|
||||||
next_task_id: ReachAttemptId,
|
/// List of tasks and their state. If `Connected`, then a corresponding entry must be present
|
||||||
|
/// in `nodes`.
|
||||||
/// List of node tasks to spawn.
|
tasks: FnvHashMap<TaskId, TaskState>,
|
||||||
// TODO: stronger typing?
|
|
||||||
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
|
|
||||||
/// Task to notify when an element is added to `to_spawn`.
|
|
||||||
to_notify: Option<task::Task>,
|
|
||||||
|
|
||||||
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TInEvent, TOutEvent>, ReachAttemptId)>,
|
|
||||||
/// Receiver side for the events.
|
|
||||||
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TInEvent, TOutEvent>, ReachAttemptId)>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// State of a task, as known by the frontend (the `ColletionStream`). Asynchronous compared to
|
/// State of a task.
|
||||||
/// the actual state.
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
enum TaskKnownState {
|
enum TaskState {
|
||||||
/// Task is attempting to reach a peer.
|
/// Task is attempting to reach a peer.
|
||||||
Pending { interrupt: oneshot::Sender<()> },
|
Pending,
|
||||||
/// The user interrupted this task.
|
|
||||||
Interrupted,
|
|
||||||
/// The task is connected to a peer.
|
/// The task is connected to a peer.
|
||||||
Connected(PeerId),
|
Connected(PeerId),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskKnownState {
|
|
||||||
/// Returns `true` for `Pending`.
|
|
||||||
#[inline]
|
|
||||||
fn is_pending(&self) -> bool {
|
|
||||||
match *self {
|
|
||||||
TaskKnownState::Pending { .. } => true,
|
|
||||||
TaskKnownState::Interrupted => false,
|
|
||||||
TaskKnownState::Connected(_) => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Event that can happen on the `CollectionStream`.
|
/// Event that can happen on the `CollectionStream`.
|
||||||
// TODO: implement Debug
|
// TODO: implement Debug
|
||||||
pub enum CollectionEvent<TOutEvent> {
|
pub enum CollectionEvent<TOutEvent> {
|
||||||
@ -151,22 +112,16 @@ pub enum CollectionEvent<TOutEvent> {
|
|||||||
|
|
||||||
/// Identifier for a future that attempts to reach a node.
|
/// Identifier for a future that attempts to reach a node.
|
||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct ReachAttemptId(usize);
|
pub struct ReachAttemptId(TaskId);
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
|
impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
|
||||||
/// Creates a new empty collection.
|
/// Creates a new empty collection.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let (events_tx, events_rx) = mpsc::unbounded();
|
|
||||||
|
|
||||||
CollectionStream {
|
CollectionStream {
|
||||||
|
inner: HandledNodesTasks::new(),
|
||||||
nodes: Default::default(),
|
nodes: Default::default(),
|
||||||
tasks: Default::default(),
|
tasks: Default::default(),
|
||||||
next_task_id: ReachAttemptId(0),
|
|
||||||
to_spawn: SmallVec::new(),
|
|
||||||
to_notify: None,
|
|
||||||
events_tx,
|
|
||||||
events_rx,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -186,81 +141,62 @@ impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
|
|||||||
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
||||||
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
||||||
{
|
{
|
||||||
let reach_attempt_id = self.next_task_id;
|
let id = self.inner.add_reach_attempt(future, handler);
|
||||||
self.next_task_id.0 += 1;
|
self.tasks.insert(id, TaskState::Pending);
|
||||||
|
ReachAttemptId(id)
|
||||||
let (interrupt_tx, interrupt_rx) = oneshot::channel();
|
|
||||||
self.tasks.insert(
|
|
||||||
reach_attempt_id,
|
|
||||||
TaskKnownState::Pending {
|
|
||||||
interrupt: interrupt_tx,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let task = Box::new(NodeTask {
|
|
||||||
inner: NodeTaskInner::Future {
|
|
||||||
future,
|
|
||||||
interrupt: interrupt_rx,
|
|
||||||
handler: Some(handler),
|
|
||||||
},
|
|
||||||
events_tx: self.events_tx.clone(),
|
|
||||||
id: reach_attempt_id,
|
|
||||||
});
|
|
||||||
|
|
||||||
self.to_spawn.push(task);
|
|
||||||
|
|
||||||
if let Some(task) = self.to_notify.take() {
|
|
||||||
task.notify();
|
|
||||||
}
|
|
||||||
|
|
||||||
reach_attempt_id
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Interrupts a reach attempt.
|
/// Interrupts a reach attempt.
|
||||||
///
|
///
|
||||||
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
|
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
|
||||||
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), ()> {
|
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), ()> {
|
||||||
match self.tasks.entry(id) {
|
match self.tasks.entry(id.0) {
|
||||||
Entry::Vacant(_) => return Err(()),
|
Entry::Vacant(_) => Err(()),
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(entry) => {
|
||||||
match entry.get() {
|
match entry.get() {
|
||||||
&TaskKnownState::Connected(_) => return Err(()),
|
&TaskState::Connected(_) => return Err(()),
|
||||||
&TaskKnownState::Interrupted => return Err(()),
|
&TaskState::Pending => (),
|
||||||
&TaskKnownState::Pending { .. } => (),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
match entry.insert(TaskKnownState::Interrupted) {
|
entry.remove();
|
||||||
TaskKnownState::Pending { interrupt } => {
|
self.inner.task(id.0)
|
||||||
let _ = interrupt.send(());
|
.expect("whenever we receive a TaskClosed event or interrupt a task, we \
|
||||||
}
|
remove the corresponding entry from self.tasks ; therefore all \
|
||||||
TaskKnownState::Interrupted | TaskKnownState::Connected(_) => unreachable!(),
|
elements in self.tasks are valid tasks in the \
|
||||||
};
|
HandledNodesTasks ; qed")
|
||||||
|
.close();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends an event to all nodes.
|
/// Sends an event to all nodes.
|
||||||
|
#[inline]
|
||||||
pub fn broadcast_event(&mut self, event: &TInEvent)
|
pub fn broadcast_event(&mut self, event: &TInEvent)
|
||||||
where TInEvent: Clone,
|
where TInEvent: Clone,
|
||||||
{
|
{
|
||||||
for &(_, ref sender) in self.nodes.values() {
|
// TODO: remove the ones we're not connected to?
|
||||||
let _ = sender.unbounded_send(event.clone()); // TODO: unwrap
|
self.inner.broadcast_event(event)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Grants access to an object that allows controlling a node of the collection.
|
/// Grants access to an object that allows controlling a peer of the collection.
|
||||||
///
|
///
|
||||||
/// Returns `None` if we don't have a connection to this peer.
|
/// Returns `None` if we don't have a connection to this peer.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer_mut(&mut self, id: &PeerId) -> Option<PeerMut<TInEvent>> {
|
pub fn peer_mut(&mut self, id: &PeerId) -> Option<PeerMut<TInEvent>> {
|
||||||
match self.nodes.entry(id.clone()) {
|
let task = match self.nodes.get(id) {
|
||||||
Entry::Occupied(inner) => Some(PeerMut {
|
Some(&task) => task,
|
||||||
|
None => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.inner.task(task) {
|
||||||
|
Some(inner) => Some(PeerMut {
|
||||||
inner,
|
inner,
|
||||||
tasks: &mut self.tasks,
|
tasks: &mut self.tasks,
|
||||||
|
nodes: &mut self.nodes,
|
||||||
}),
|
}),
|
||||||
Entry::Vacant(_) => None,
|
None => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,38 +219,32 @@ impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
|
|||||||
|
|
||||||
/// Access to a peer in the collection.
|
/// Access to a peer in the collection.
|
||||||
pub struct PeerMut<'a, TInEvent: 'a> {
|
pub struct PeerMut<'a, TInEvent: 'a> {
|
||||||
inner: OccupiedEntry<'a, PeerId, (ReachAttemptId, mpsc::UnboundedSender<TInEvent>)>,
|
inner: HandledNodesTask<'a, TInEvent>,
|
||||||
tasks: &'a mut FnvHashMap<ReachAttemptId, TaskKnownState>,
|
tasks: &'a mut FnvHashMap<TaskId, TaskState>,
|
||||||
|
nodes: &'a mut FnvHashMap<PeerId, TaskId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent> PeerMut<'a, TInEvent> {
|
impl<'a, TInEvent> PeerMut<'a, TInEvent> {
|
||||||
/// Sends an event to the given node.
|
/// Sends an event to the given node.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn send_event(&mut self, event: TInEvent) {
|
pub fn send_event(&mut self, event: TInEvent) {
|
||||||
// It is possible that the sender is closed if the task has already finished but we
|
self.inner.send_event(event)
|
||||||
// haven't been polled in the meanwhile.
|
|
||||||
let _ = self.inner.get_mut().1.unbounded_send(event);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Closes the connections to this node.
|
/// Closes the connections to this node.
|
||||||
///
|
///
|
||||||
/// No further event will be generated for this node.
|
/// No further event will be generated for this node.
|
||||||
pub fn close(self) {
|
pub fn close(self) {
|
||||||
let (peer_id, (task_id, _)) = self.inner.remove_entry();
|
let task_state = self.tasks.remove(&self.inner.id());
|
||||||
// Set the task to `Interrupted` so that we ignore further messages from this closed node.
|
if let Some(TaskState::Connected(peer_id)) = task_state {
|
||||||
match self.tasks.insert(task_id, TaskKnownState::Interrupted) {
|
let old_task_id = self.nodes.remove(&peer_id);
|
||||||
Some(TaskKnownState::Connected(ref p)) if p == &peer_id => (),
|
debug_assert_eq!(old_task_id, Some(self.inner.id()));
|
||||||
None
|
} else {
|
||||||
| Some(TaskKnownState::Connected(_))
|
panic!("a PeerMut can only be created if an entry is present in nodes ; an entry in \
|
||||||
| Some(TaskKnownState::Pending { .. })
|
nodes always matched a Connected entry in tasks ; qed");
|
||||||
| Some(TaskKnownState::Interrupted) => {
|
};
|
||||||
panic!("The task_id we have was retreived from self.nodes. We insert in this \
|
|
||||||
hashmap when we reach a node, in which case we also insert Connected in \
|
self.inner.close();
|
||||||
self.tasks with the corresponding peer ID. Once a task is Connected, it \
|
|
||||||
can no longer switch back to Pending. We switch the state to Interrupted \
|
|
||||||
only when we remove from self.nodes at the same time.")
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -323,295 +253,92 @@ impl<TInEvent, TOutEvent> Stream for CollectionStream<TInEvent, TOutEvent> {
|
|||||||
type Error = Void; // TODO: use ! once stable
|
type Error = Void; // TODO: use ! once stable
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
for to_spawn in self.to_spawn.drain() {
|
let item = try_ready!(self.inner.poll());
|
||||||
tokio_executor::spawn(to_spawn);
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
match item {
|
||||||
return match self.events_rx.poll() {
|
Some(HandledNodesEvent::TaskClosed { id, result }) => {
|
||||||
Ok(Async::Ready(Some((InToExtMessage::NodeEvent(event), task_id)))) => {
|
match (self.tasks.remove(&id), result) {
|
||||||
let peer_id = match self.tasks.get(&task_id) {
|
(Some(TaskState::Pending), Err(err)) => {
|
||||||
Some(TaskKnownState::Connected(ref peer_id)) => peer_id.clone(),
|
Ok(Async::Ready(Some(CollectionEvent::ReachError {
|
||||||
Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task.
|
id: ReachAttemptId(id),
|
||||||
None | Some(TaskKnownState::Pending { .. }) => {
|
error: err,
|
||||||
panic!("We insert Pending in self.tasks when a node is opened, and we \
|
|
||||||
set it to Connected when we receive a NodeReached event from \
|
|
||||||
this task. The way the task works, we are guaranteed to \
|
|
||||||
a NodeReached event before any NodeEvent.")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeEvent {
|
|
||||||
peer_id,
|
|
||||||
event,
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some((InToExtMessage::NodeReached(peer_id, sender), task_id)))) => {
|
|
||||||
{
|
|
||||||
let existing = match self.tasks.get_mut(&task_id) {
|
|
||||||
Some(state) => state,
|
|
||||||
None => panic!("We insert in self.tasks the corresponding task_id \
|
|
||||||
when we create a task, and we only remove from it \
|
|
||||||
if we receive the events NodeClosed, NodeError and \
|
|
||||||
ReachError, that are produced only when a task ends. \
|
|
||||||
After a task ends, we don't receive any more event \
|
|
||||||
from it.")
|
|
||||||
};
|
|
||||||
|
|
||||||
match existing {
|
|
||||||
TaskKnownState::Pending { .. } => (),
|
|
||||||
TaskKnownState::Interrupted => continue,
|
|
||||||
TaskKnownState::Connected(_) => {
|
|
||||||
panic!("The only code that sets a task to the Connected state \
|
|
||||||
is when we receive a NodeReached event. If we are already \
|
|
||||||
connected, that would mean we received NodeReached twice, \
|
|
||||||
which is not possible as a task changes state after \
|
|
||||||
sending this event.")
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
*existing = TaskKnownState::Connected(peer_id.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let replaced_node = self.nodes.insert(peer_id.clone(), (task_id, sender));
|
|
||||||
if let Some(replaced_node) = replaced_node {
|
|
||||||
let old = self.tasks.insert(replaced_node.0, TaskKnownState::Interrupted);
|
|
||||||
debug_assert_eq!(old.map(|s| s.is_pending()), Some(false));
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
|
||||||
peer_id,
|
|
||||||
id: task_id,
|
|
||||||
})))
|
})))
|
||||||
} else {
|
},
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeReached {
|
(Some(TaskState::Pending), Ok(())) => {
|
||||||
peer_id,
|
// TODO: this variant shouldn't happen ; prove this
|
||||||
id: task_id,
|
Ok(Async::Ready(Some(CollectionEvent::ReachError {
|
||||||
|
id: ReachAttemptId(id),
|
||||||
|
error: IoError::new(IoErrorKind::Other, "couldn't reach the node"),
|
||||||
})))
|
})))
|
||||||
}
|
},
|
||||||
|
(Some(TaskState::Connected(peer_id)), Ok(())) => {
|
||||||
|
let _node_task_id = self.nodes.remove(&peer_id);
|
||||||
|
debug_assert_eq!(_node_task_id, Some(id));
|
||||||
|
Ok(Async::Ready(Some(CollectionEvent::NodeClosed {
|
||||||
|
peer_id,
|
||||||
|
})))
|
||||||
|
},
|
||||||
|
(Some(TaskState::Connected(peer_id)), Err(err)) => {
|
||||||
|
let _node_task_id = self.nodes.remove(&peer_id);
|
||||||
|
debug_assert_eq!(_node_task_id, Some(id));
|
||||||
|
Ok(Async::Ready(Some(CollectionEvent::NodeError {
|
||||||
|
peer_id,
|
||||||
|
error: err,
|
||||||
|
})))
|
||||||
|
},
|
||||||
|
(None, _) => {
|
||||||
|
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
|
||||||
|
when we add a task in self.inner we add a corresponding entry in \
|
||||||
|
self.tasks, and remove the entry only when the task is closed ; \
|
||||||
|
qed")
|
||||||
|
},
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some((InToExtMessage::NodeClosed, task_id)))) => {
|
},
|
||||||
let peer_id = match self.tasks.remove(&task_id) {
|
Some(HandledNodesEvent::NodeReached { id, peer_id }) => {
|
||||||
Some(TaskKnownState::Connected(peer_id)) => peer_id.clone(),
|
// Set the state of the task to `Connected`.
|
||||||
Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task.
|
let former_task_id = self.nodes.insert(peer_id.clone(), id);
|
||||||
None | Some(TaskKnownState::Pending { .. }) => {
|
let _former_state = self.tasks.insert(id, TaskState::Connected(peer_id.clone()));
|
||||||
panic!("We insert Pending in self.tasks when a node is opened, and we \
|
debug_assert_eq!(_former_state, Some(TaskState::Pending));
|
||||||
set it to Connected when we receive a NodeReached event from \
|
|
||||||
this task. The way the task works, we are guaranteed to \
|
|
||||||
a NodeReached event before a NodeClosed.")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let val = self.nodes.remove(&peer_id);
|
// It is possible that we already have a task connected to the same peer. In this
|
||||||
debug_assert!(val.is_some());
|
// case, we need to emit a `NodeReplaced` event.
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id })))
|
if let Some(former_task_id) = former_task_id {
|
||||||
}
|
self.inner.task(former_task_id)
|
||||||
Ok(Async::Ready(Some((InToExtMessage::NodeError(err), task_id)))) => {
|
.expect("whenever we receive a TaskClosed event or close a node, we \
|
||||||
let peer_id = match self.tasks.remove(&task_id) {
|
remove the corresponding entry from self.nodes ; therefore all \
|
||||||
Some(TaskKnownState::Connected(peer_id)) => peer_id.clone(),
|
elements in self.nodes are valid tasks in the \
|
||||||
Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task.
|
HandledNodesTasks ; qed")
|
||||||
None | Some(TaskKnownState::Pending { .. }) => {
|
.close();
|
||||||
panic!("We insert Pending in self.tasks when a node is opened, and we \
|
let _former_other_state = self.tasks.remove(&former_task_id);
|
||||||
set it to Connected when we receive a NodeReached event from \
|
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(peer_id.clone())));
|
||||||
this task. The way the task works, we are guaranteed to \
|
|
||||||
a NodeReached event before a NodeError.")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let val = self.nodes.remove(&peer_id);
|
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
||||||
debug_assert!(val.is_some());
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeError {
|
|
||||||
peer_id,
|
peer_id,
|
||||||
error: err,
|
id: ReachAttemptId(id),
|
||||||
|
})))
|
||||||
|
|
||||||
|
} else {
|
||||||
|
Ok(Async::Ready(Some(CollectionEvent::NodeReached {
|
||||||
|
peer_id,
|
||||||
|
id: ReachAttemptId(id),
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some((InToExtMessage::ReachError(err), task_id)))) => {
|
},
|
||||||
match self.tasks.remove(&task_id) {
|
Some(HandledNodesEvent::NodeEvent { id, event }) => {
|
||||||
Some(TaskKnownState::Interrupted) => continue,
|
let peer_id = match self.tasks.get(&id) {
|
||||||
Some(TaskKnownState::Pending { .. }) => (),
|
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
|
||||||
None | Some(TaskKnownState::Connected(_)) => {
|
_ => panic!("we can only receive NodeEvent events from a task after we \
|
||||||
panic!("We insert Pending in self.tasks when a node is opened, and we \
|
received a corresponding NodeReached event from that same task ; \
|
||||||
set it to Connected when we receive a NodeReached event from \
|
when we receive a NodeReached event, we ensure that the entry in \
|
||||||
this task. The way the task works, we are guaranteed to \
|
self.tasks is switched to the Connected state ; qed"),
|
||||||
a NodeReached event before a ReachError.")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::ReachError {
|
|
||||||
id: task_id,
|
|
||||||
error: err,
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => {
|
|
||||||
self.to_notify = Some(task::current());
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
unreachable!("The sender is in self as well, therefore the receiver never \
|
|
||||||
closes.")
|
|
||||||
},
|
|
||||||
Err(()) => unreachable!("An unbounded receiver never errors"),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Message to transmit from a task to the public API.
|
|
||||||
enum InToExtMessage<TInEvent, TOutEvent> {
|
|
||||||
/// A connection to a node has succeeded.
|
|
||||||
/// Closing the returned sender will end the task.
|
|
||||||
NodeReached(PeerId, mpsc::UnboundedSender<TInEvent>),
|
|
||||||
NodeClosed,
|
|
||||||
NodeError(IoError),
|
|
||||||
ReachError(IoError),
|
|
||||||
/// An event from the node.
|
|
||||||
NodeEvent(TOutEvent),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Implementation of `Future` that handles a single node, and all the communications between
|
|
||||||
/// the various components of the `CollectionStream`.
|
|
||||||
struct NodeTask<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent>
|
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
THandler: NodeHandler<Substream<TMuxer>>,
|
|
||||||
{
|
|
||||||
/// Sender to transmit events to the outside.
|
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TInEvent, TOutEvent>, ReachAttemptId)>,
|
|
||||||
/// Inner state of the `NodeTask`.
|
|
||||||
inner: NodeTaskInner<TFut, TMuxer, TAddrFut, THandler, TInEvent>,
|
|
||||||
/// Identifier of the attempt.
|
|
||||||
id: ReachAttemptId,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum NodeTaskInner<TFut, TMuxer, TAddrFut, THandler, TInEvent>
|
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
THandler: NodeHandler<Substream<TMuxer>>,
|
|
||||||
{
|
|
||||||
/// Future to resolve to connect to the node.
|
|
||||||
Future {
|
|
||||||
/// The future that will attempt to reach the node.
|
|
||||||
future: TFut,
|
|
||||||
/// Allows interrupting the attempt.
|
|
||||||
interrupt: oneshot::Receiver<()>,
|
|
||||||
/// The handler that will be used to build the `HandledNode`.
|
|
||||||
handler: Option<THandler>,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// Fully functional node.
|
|
||||||
Node {
|
|
||||||
/// The object that is actually processing things.
|
|
||||||
node: HandledNode<TMuxer, TAddrFut, THandler>,
|
|
||||||
/// Receiving end for events sent from the main `CollectionStream`. `None` if closed.
|
|
||||||
in_events_rx: Option<mpsc::UnboundedReceiver<TInEvent>>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent> Future for
|
|
||||||
NodeTask<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent>
|
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError>,
|
|
||||||
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
|
||||||
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
|
|
||||||
{
|
|
||||||
type Item = ();
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<(), ()> {
|
|
||||||
// Remember that this poll function is dedicated to a single node and is run
|
|
||||||
// asynchronously.
|
|
||||||
|
|
||||||
// First, handle if we are still trying to reach a node.
|
|
||||||
let new_state = if let NodeTaskInner::Future {
|
|
||||||
ref mut future,
|
|
||||||
ref mut interrupt,
|
|
||||||
ref mut handler,
|
|
||||||
} = self.inner
|
|
||||||
{
|
|
||||||
match interrupt.poll() {
|
|
||||||
Ok(Async::NotReady) => (),
|
|
||||||
Ok(Async::Ready(())) | Err(_) => return Ok(Async::Ready(())),
|
|
||||||
}
|
|
||||||
|
|
||||||
match future.poll() {
|
|
||||||
Ok(Async::Ready(((peer_id, muxer), addr_fut))) => {
|
|
||||||
let (sender, rx) = mpsc::unbounded();
|
|
||||||
let event = InToExtMessage::NodeReached(peer_id, sender);
|
|
||||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
|
||||||
|
|
||||||
let handler = handler.take()
|
|
||||||
.expect("The handler is only extracted right before we switch state");
|
|
||||||
|
|
||||||
Some(NodeTaskInner::Node {
|
|
||||||
node: HandledNode::new(muxer, addr_fut, handler),
|
|
||||||
in_events_rx: Some(rx),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => {
|
|
||||||
return Ok(Async::NotReady);
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
// End the task
|
|
||||||
let event = InToExtMessage::ReachError(error);
|
|
||||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
|
||||||
return Ok(Async::Ready(()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(new_state) = new_state {
|
|
||||||
self.inner = new_state;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then handle if we're a node.
|
|
||||||
if let NodeTaskInner::Node {
|
|
||||||
ref mut node,
|
|
||||||
ref mut in_events_rx,
|
|
||||||
} = self.inner
|
|
||||||
{
|
|
||||||
// Start by handling commands received from the outside of the task.
|
|
||||||
if let Some(mut local_rx) = in_events_rx.take() {
|
|
||||||
*in_events_rx = loop {
|
|
||||||
match local_rx.poll() {
|
|
||||||
Ok(Async::Ready(Some(event))) => {
|
|
||||||
node.inject_event(event);
|
|
||||||
},
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
// Node closed by the external API ; start shutdown process.
|
|
||||||
node.shutdown();
|
|
||||||
break None;
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => break Some(local_rx),
|
|
||||||
Err(()) => unreachable!("An unbounded receiver never errors"),
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
|
||||||
// Process the node.
|
Ok(Async::Ready(Some(CollectionEvent::NodeEvent {
|
||||||
loop {
|
peer_id,
|
||||||
match node.poll() {
|
event,
|
||||||
Ok(Async::NotReady) => break,
|
})))
|
||||||
Ok(Async::Ready(Some(event))) => {
|
|
||||||
let event = InToExtMessage::NodeEvent(event);
|
|
||||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
let event = InToExtMessage::NodeClosed;
|
|
||||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
|
||||||
return Ok(Async::Ready(())); // End the task.
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let event = InToExtMessage::NodeError(err);
|
|
||||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
|
||||||
return Ok(Async::Ready(())); // End the task.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
None => Ok(Async::Ready(None)),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nothing's ready. The current task should have been registered by all of the inner
|
|
||||||
// handlers.
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
436
core/src/nodes/handled_node_tasks.rs
Normal file
436
core/src/nodes/handled_node_tasks.rs
Normal file
@ -0,0 +1,436 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use fnv::FnvHashMap;
|
||||||
|
use futures::{prelude::*, stream, sync::mpsc, task};
|
||||||
|
use muxing::StreamMuxer;
|
||||||
|
use nodes::node::Substream;
|
||||||
|
use nodes::handled_node::{HandledNode, NodeHandler};
|
||||||
|
use smallvec::SmallVec;
|
||||||
|
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
use std::mem;
|
||||||
|
use tokio_executor;
|
||||||
|
use void::Void;
|
||||||
|
use {Multiaddr, PeerId};
|
||||||
|
|
||||||
|
// TODO: make generic over PeerId
|
||||||
|
|
||||||
|
// Implementor notes
|
||||||
|
// =================
|
||||||
|
//
|
||||||
|
// This collection of nodes spawns a task for each individual node to process. This means that
|
||||||
|
// events happen on the background at the same time as the `HandledNodesTasks` is being polled.
|
||||||
|
//
|
||||||
|
// In order to make the API non-racy and avoid issues, we totally separate the state in the
|
||||||
|
// `HandledNodesTasks` and the states that the task nodes can access. They are only allowed to
|
||||||
|
// exchange messages. The state in the `HandledNodesTasks` is therefore delayed compared to the
|
||||||
|
// tasks, and is updated only when `poll()` is called.
|
||||||
|
//
|
||||||
|
// The only thing that we must be careful about is substreams, as they are "detached" from the
|
||||||
|
// state of the `HandledNodesTasks` and allowed to process in parallel. This is why there is no
|
||||||
|
// "substream closed" event being reported, as it could potentially create confusions and race
|
||||||
|
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
|
||||||
|
|
||||||
|
/// Implementation of `Stream` that handles a collection of nodes.
|
||||||
|
// TODO: implement Debug
|
||||||
|
pub struct HandledNodesTasks<TInEvent, TOutEvent> {
|
||||||
|
/// For each active task, a sender allowing to transmit messages. Closing the sender interrupts
|
||||||
|
/// the task. It is possible that we receive messages from tasks that used to be in this list
|
||||||
|
/// but no longer are, in which case we should ignore them.
|
||||||
|
tasks: FnvHashMap<TaskId, mpsc::UnboundedSender<TInEvent>>,
|
||||||
|
/// Identifier for the next task to spawn.
|
||||||
|
next_task_id: TaskId,
|
||||||
|
|
||||||
|
/// List of node tasks to spawn.
|
||||||
|
// TODO: stronger typing?
|
||||||
|
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
|
||||||
|
/// Task to notify when an element is added to `to_spawn`.
|
||||||
|
to_notify: Option<task::Task>,
|
||||||
|
|
||||||
|
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
||||||
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent>, TaskId)>,
|
||||||
|
/// Receiver side for the events.
|
||||||
|
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent>, TaskId)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Event that can happen on the `HandledNodesTasks`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum HandledNodesEvent<TOutEvent> {
|
||||||
|
/// A task has been closed.
|
||||||
|
///
|
||||||
|
/// This happens once the node handler closes or an error happens.
|
||||||
|
TaskClosed {
|
||||||
|
/// Identifier of the task that closed.
|
||||||
|
id: TaskId,
|
||||||
|
/// What happened.
|
||||||
|
result: Result<(), IoError>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// A task has succeesfully connected to a node.
|
||||||
|
NodeReached {
|
||||||
|
/// Identifier of the task that succeeded.
|
||||||
|
id: TaskId,
|
||||||
|
/// Identifier of the node.
|
||||||
|
peer_id: PeerId,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// A task has produced an event.
|
||||||
|
NodeEvent {
|
||||||
|
/// Identifier of the task that produced the event.
|
||||||
|
id: TaskId,
|
||||||
|
/// The produced event.
|
||||||
|
event: TOutEvent,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Identifier for a future that attempts to reach a node.
|
||||||
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct TaskId(usize);
|
||||||
|
|
||||||
|
impl<TInEvent, TOutEvent> HandledNodesTasks<TInEvent, TOutEvent> {
|
||||||
|
/// Creates a new empty collection.
|
||||||
|
#[inline]
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let (events_tx, events_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
HandledNodesTasks {
|
||||||
|
tasks: Default::default(),
|
||||||
|
next_task_id: TaskId(0),
|
||||||
|
to_spawn: SmallVec::new(),
|
||||||
|
to_notify: None,
|
||||||
|
events_tx,
|
||||||
|
events_rx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds to the collection a future that tries to reach a node.
|
||||||
|
///
|
||||||
|
/// This method spawns a task dedicated to resolving this future and processing the node's
|
||||||
|
/// events.
|
||||||
|
pub fn add_reach_attempt<TFut, TMuxer, TAddrFut, THandler>(&mut self, future: TFut, handler: THandler)
|
||||||
|
-> TaskId
|
||||||
|
where
|
||||||
|
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError> + Send + 'static,
|
||||||
|
TAddrFut: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
|
||||||
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
|
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
||||||
|
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
||||||
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
||||||
|
{
|
||||||
|
let task_id = self.next_task_id;
|
||||||
|
self.next_task_id.0 += 1;
|
||||||
|
|
||||||
|
let (tx, rx) = mpsc::unbounded();
|
||||||
|
self.tasks.insert(task_id, tx);
|
||||||
|
|
||||||
|
let task = Box::new(NodeTask {
|
||||||
|
inner: NodeTaskInner::Future {
|
||||||
|
future,
|
||||||
|
handler,
|
||||||
|
events_buffer: Vec::new(),
|
||||||
|
},
|
||||||
|
events_tx: self.events_tx.clone(),
|
||||||
|
in_events_rx: rx.fuse(),
|
||||||
|
id: task_id,
|
||||||
|
});
|
||||||
|
|
||||||
|
self.to_spawn.push(task);
|
||||||
|
|
||||||
|
// We notify the polling task so that `to_spawn` gets flushed.
|
||||||
|
if let Some(task) = self.to_notify.take() {
|
||||||
|
task.notify();
|
||||||
|
}
|
||||||
|
|
||||||
|
task_id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends an event to all the tasks, including the pending ones.
|
||||||
|
pub fn broadcast_event(&mut self, event: &TInEvent)
|
||||||
|
where TInEvent: Clone,
|
||||||
|
{
|
||||||
|
for sender in self.tasks.values() {
|
||||||
|
// Note: it is possible that sending an event fails if the background task has already
|
||||||
|
// finished, but the local state hasn't reflected that yet becaues it hasn't been
|
||||||
|
// polled. This is not an error situation.
|
||||||
|
let _ = sender.unbounded_send(event.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Grants access to an object that allows controlling a task of the collection.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the task id is invalid.
|
||||||
|
#[inline]
|
||||||
|
pub fn task(&mut self, id: TaskId) -> Option<Task<TInEvent>> {
|
||||||
|
match self.tasks.entry(id.clone()) {
|
||||||
|
Entry::Occupied(inner) => Some(Task { inner }),
|
||||||
|
Entry::Vacant(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a list of all the active tasks.
|
||||||
|
#[inline]
|
||||||
|
pub fn tasks<'a>(&'a self) -> impl Iterator<Item = TaskId> + 'a {
|
||||||
|
self.tasks.keys().cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Access to a task in the collection.
|
||||||
|
pub struct Task<'a, TInEvent: 'a> {
|
||||||
|
inner: OccupiedEntry<'a, TaskId, mpsc::UnboundedSender<TInEvent>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, TInEvent> Task<'a, TInEvent> {
|
||||||
|
/// Sends an event to the given node.
|
||||||
|
#[inline]
|
||||||
|
pub fn send_event(&mut self, event: TInEvent) {
|
||||||
|
// It is possible that the sender is closed if the background task has already finished
|
||||||
|
// but the local state hasn't been updated yet because we haven't been polled in the
|
||||||
|
// meanwhile.
|
||||||
|
let _ = self.inner.get_mut().unbounded_send(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the task id.
|
||||||
|
#[inline]
|
||||||
|
pub fn id(&self) -> TaskId {
|
||||||
|
*self.inner.key()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Closes the task.
|
||||||
|
///
|
||||||
|
/// No further event will be generated for this task.
|
||||||
|
pub fn close(self) {
|
||||||
|
self.inner.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TInEvent, TOutEvent> Stream for HandledNodesTasks<TInEvent, TOutEvent> {
|
||||||
|
type Item = HandledNodesEvent<TOutEvent>;
|
||||||
|
type Error = Void; // TODO: use ! once stable
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
|
for to_spawn in self.to_spawn.drain() {
|
||||||
|
tokio_executor::spawn(to_spawn);
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.events_rx.poll() {
|
||||||
|
Ok(Async::Ready(Some((message, task_id)))) => {
|
||||||
|
// If the task id is no longer in `self.tasks`, that means that the user called
|
||||||
|
// `close()` on this task earlier. Therefore no new event should be generated
|
||||||
|
// for this task.
|
||||||
|
if !self.tasks.contains_key(&task_id) {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
match message {
|
||||||
|
InToExtMessage::NodeEvent(event) => {
|
||||||
|
break Ok(Async::Ready(Some(HandledNodesEvent::NodeEvent {
|
||||||
|
id: task_id,
|
||||||
|
event,
|
||||||
|
})));
|
||||||
|
},
|
||||||
|
InToExtMessage::NodeReached(peer_id) => {
|
||||||
|
break Ok(Async::Ready(Some(HandledNodesEvent::NodeReached {
|
||||||
|
id: task_id,
|
||||||
|
peer_id,
|
||||||
|
})));
|
||||||
|
},
|
||||||
|
InToExtMessage::TaskClosed(result) => {
|
||||||
|
let _ = self.tasks.remove(&task_id);
|
||||||
|
break Ok(Async::Ready(Some(HandledNodesEvent::TaskClosed {
|
||||||
|
id: task_id, result
|
||||||
|
})));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
self.to_notify = Some(task::current());
|
||||||
|
break Ok(Async::NotReady);
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
unreachable!("The sender is in self as well, therefore the receiver never \
|
||||||
|
closes.")
|
||||||
|
},
|
||||||
|
Err(()) => unreachable!("An unbounded receiver never errors"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Message to transmit from a task to the public API.
|
||||||
|
enum InToExtMessage<TOutEvent> {
|
||||||
|
/// A connection to a node has succeeded.
|
||||||
|
NodeReached(PeerId),
|
||||||
|
/// The task closed.
|
||||||
|
TaskClosed(Result<(), IoError>),
|
||||||
|
/// An event from the node.
|
||||||
|
NodeEvent(TOutEvent),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Implementation of `Future` that handles a single node, and all the communications between
|
||||||
|
/// the various components of the `HandledNodesTasks`.
|
||||||
|
struct NodeTask<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent>
|
||||||
|
where
|
||||||
|
TMuxer: StreamMuxer,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>>,
|
||||||
|
{
|
||||||
|
/// Sender to transmit events to the outside.
|
||||||
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent>, TaskId)>,
|
||||||
|
/// Receiving end for events sent from the main `HandledNodesTasks`.
|
||||||
|
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<TInEvent>>,
|
||||||
|
/// Inner state of the `NodeTask`.
|
||||||
|
inner: NodeTaskInner<TFut, TMuxer, TAddrFut, THandler, TInEvent>,
|
||||||
|
/// Identifier of the attempt.
|
||||||
|
id: TaskId,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum NodeTaskInner<TFut, TMuxer, TAddrFut, THandler, TInEvent>
|
||||||
|
where
|
||||||
|
TMuxer: StreamMuxer,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>>,
|
||||||
|
{
|
||||||
|
/// Future to resolve to connect to the node.
|
||||||
|
Future {
|
||||||
|
/// The future that will attempt to reach the node.
|
||||||
|
future: TFut,
|
||||||
|
/// The handler that will be used to build the `HandledNode`.
|
||||||
|
handler: THandler,
|
||||||
|
/// While we are dialing the future, we need to buffer the events received on
|
||||||
|
/// `in_events_rx` so that they get delivered once dialing succeeds. We can't simply leave
|
||||||
|
/// events in `in_events_rx` because we have to detect if it gets closed.
|
||||||
|
events_buffer: Vec<TInEvent>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Fully functional node.
|
||||||
|
Node(HandledNode<TMuxer, TAddrFut, THandler>),
|
||||||
|
|
||||||
|
/// A panic happened while polling.
|
||||||
|
Poisoned,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent> Future for
|
||||||
|
NodeTask<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent>
|
||||||
|
where
|
||||||
|
TMuxer: StreamMuxer,
|
||||||
|
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError>,
|
||||||
|
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
|
||||||
|
{
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<(), ()> {
|
||||||
|
loop {
|
||||||
|
match mem::replace(&mut self.inner, NodeTaskInner::Poisoned) {
|
||||||
|
// First possibility: we are still trying to reach a node.
|
||||||
|
NodeTaskInner::Future { mut future, handler, mut events_buffer } => {
|
||||||
|
// If self.in_events_rx is closed, we stop the task.
|
||||||
|
loop {
|
||||||
|
match self.in_events_rx.poll() {
|
||||||
|
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||||
|
Ok(Async::Ready(Some(event))) => events_buffer.push(event),
|
||||||
|
Ok(Async::NotReady) => break,
|
||||||
|
Err(_) => unreachable!("An UnboundedReceiver never errors"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether dialing succeeded.
|
||||||
|
match future.poll() {
|
||||||
|
Ok(Async::Ready(((peer_id, muxer), addr_fut))) => {
|
||||||
|
let event = InToExtMessage::NodeReached(peer_id);
|
||||||
|
let mut node = HandledNode::new(muxer, addr_fut, handler);
|
||||||
|
for event in events_buffer {
|
||||||
|
node.inject_event(event);
|
||||||
|
}
|
||||||
|
if let Err(_) = self.events_tx.unbounded_send((event, self.id)) {
|
||||||
|
node.shutdown();
|
||||||
|
}
|
||||||
|
self.inner = NodeTaskInner::Node(node);
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
self.inner = NodeTaskInner::Future { future, handler, events_buffer };
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
// End the task
|
||||||
|
let event = InToExtMessage::TaskClosed(Err(err));
|
||||||
|
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Second possibility: we have a node.
|
||||||
|
NodeTaskInner::Node(mut node) => {
|
||||||
|
// Start by handling commands received from the outside of the task.
|
||||||
|
if !self.in_events_rx.is_done() {
|
||||||
|
loop {
|
||||||
|
match self.in_events_rx.poll() {
|
||||||
|
Ok(Async::NotReady) => break,
|
||||||
|
Ok(Async::Ready(Some(event))) => {
|
||||||
|
node.inject_event(event);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
// Node closed by the external API ; start shutdown process.
|
||||||
|
node.shutdown();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(()) => unreachable!("An unbounded receiver never errors"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process the node.
|
||||||
|
loop {
|
||||||
|
match node.poll() {
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
self.inner = NodeTaskInner::Node(node);
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(Some(event))) => {
|
||||||
|
let event = InToExtMessage::NodeEvent(event);
|
||||||
|
if let Err(_) = self.events_tx.unbounded_send((event, self.id)) {
|
||||||
|
node.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
let event = InToExtMessage::TaskClosed(Ok(()));
|
||||||
|
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||||
|
return Ok(Async::Ready(())); // End the task.
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let event = InToExtMessage::TaskClosed(Err(err));
|
||||||
|
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||||
|
return Ok(Async::Ready(())); // End the task.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// This happens if a previous poll has ended unexpectedly. The API of futures
|
||||||
|
// guarantees that we shouldn't be polled again.
|
||||||
|
NodeTaskInner::Poisoned => panic!("the node task panicked or errored earlier")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -18,6 +18,8 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
mod handled_node_tasks;
|
||||||
|
|
||||||
pub mod collection;
|
pub mod collection;
|
||||||
pub mod handled_node;
|
pub mod handled_node;
|
||||||
pub mod listeners;
|
pub mod listeners;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user