rust-libp2p/core/src/nodes/handled_node_tasks.rs

522 lines
21 KiB
Rust
Raw Normal View History

// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
PeerId,
muxing::StreamMuxer,
nodes::{
handled_node::{HandledNode, HandledNodeError, NodeHandler},
node::Substream
}
};
use fnv::FnvHashMap;
2018-10-01 11:18:00 +02:00
use futures::{prelude::*, stream, sync::mpsc};
use smallvec::SmallVec;
use std::{
collections::hash_map::{Entry, OccupiedEntry},
error,
fmt,
mem
};
use tokio_executor;
use void::Void;
2019-01-09 15:48:56 +01:00
mod tests;
// TODO: make generic over PeerId
// Implementor notes
// =================
//
// This collection of nodes spawns a task for each individual node to process. This means that
// events happen on the background at the same time as the `HandledNodesTasks` is being polled.
//
Tests for HandledNodesTasks (#584) * Add unit tests for core::nodes::NodeStream * Move DummyMuxer to core/tests * Address grumbles * Impl Debug for SubstreamRef<P> * Add test for poll() * Don't need to open a substream * pretty printer test * More tests for NodeStream poll() * ListenerStream unit tests: transport() and listeners() * Tests for nodes/listeners.rs * Add a few tests to help illustrate the "drowning" behaviour of busy listeners * Tests for HandledNode * Address grumbles * Remove non-project specific stuff * Address grumbles * Prefer freestanding function * Untangle the code for old shutdown test from the new tests Add HandlerState and use it in TestBuilder Shorter test names * WIP – tests pass * Use a newtype to lighten up the function signatures a bit Test NotReady case * Cleanup Event enum Track events as they reach the Handler Describe complex test logic * Assert on the event trace * More tests for poll() * Switch to using usize as the OutboundOpenInfo so we can assert on event contents More tests for poll() * whitespace * whitespace and spelling * WIP tests for handled_node_tasks:Task * wip * Move Handler related code to dummy_handler * Sort out the events going to/from node * WIP tests for poll * Add a TestBuilder for NodeTask tests More NodeTask tests * Fixes broken test after upstream changes * Clarify the behaviour of is_shutting_down * Fix broken test * Test for task exit when muxers in- and outbound are closed * Add question about impossible Async::NotReady * Fix tests after recent changes on master * Upstream changes * Tests for HandledNodesTasks * Add test for HandledNodesTasks poll * Test we reach all nodes and then closed all nodes * Test event emission by HandledNodesTasks * Rustfmt * More rustfmt * Less noise * cleanup * Address grumbles * Better debug impl for HandledNodesTasks * Remove tests for Task we don't need Test Task.send_event() and id() using a HandledNodesTasks * Rename test builders * Don't organise tests in submodules * whitespace * Revert changes to Debug impl for HandledNodesTasks
2018-11-14 11:51:38 +01:00
// In order to make the API non-racy and avoid issues, we completely separate the state in the
// `HandledNodesTasks` from the states that the task nodes can access. They are only allowed to
// exchange messages. The state in the `HandledNodesTasks` is therefore delayed compared to the
// tasks, and is updated only when `poll()` is called.
//
// The only thing that we must be careful about is substreams, as they are "detached" from the
// state of the `HandledNodesTasks` and allowed to process in parallel. This is why there is no
// "substream closed" event being reported, as it could potentially create confusions and race
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
/// Implementation of `Stream` that handles a collection of nodes.
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
/// the task. It is possible that we receive messages from tasks that used to be in this list
/// but no longer are, in which case we should ignore them.
tasks: FnvHashMap<TaskId, mpsc::UnboundedSender<TInEvent>>,
Tests for HandledNodesTasks (#584) * Add unit tests for core::nodes::NodeStream * Move DummyMuxer to core/tests * Address grumbles * Impl Debug for SubstreamRef<P> * Add test for poll() * Don't need to open a substream * pretty printer test * More tests for NodeStream poll() * ListenerStream unit tests: transport() and listeners() * Tests for nodes/listeners.rs * Add a few tests to help illustrate the "drowning" behaviour of busy listeners * Tests for HandledNode * Address grumbles * Remove non-project specific stuff * Address grumbles * Prefer freestanding function * Untangle the code for old shutdown test from the new tests Add HandlerState and use it in TestBuilder Shorter test names * WIP – tests pass * Use a newtype to lighten up the function signatures a bit Test NotReady case * Cleanup Event enum Track events as they reach the Handler Describe complex test logic * Assert on the event trace * More tests for poll() * Switch to using usize as the OutboundOpenInfo so we can assert on event contents More tests for poll() * whitespace * whitespace and spelling * WIP tests for handled_node_tasks:Task * wip * Move Handler related code to dummy_handler * Sort out the events going to/from node * WIP tests for poll * Add a TestBuilder for NodeTask tests More NodeTask tests * Fixes broken test after upstream changes * Clarify the behaviour of is_shutting_down * Fix broken test * Test for task exit when muxers in- and outbound are closed * Add question about impossible Async::NotReady * Fix tests after recent changes on master * Upstream changes * Tests for HandledNodesTasks * Add test for HandledNodesTasks poll * Test we reach all nodes and then closed all nodes * Test event emission by HandledNodesTasks * Rustfmt * More rustfmt * Less noise * cleanup * Address grumbles * Better debug impl for HandledNodesTasks * Remove tests for Task we don't need Test Task.send_event() and id() using a HandledNodesTasks * Rename test builders * Don't organise tests in submodules * whitespace * Revert changes to Debug impl for HandledNodesTasks
2018-11-14 11:51:38 +01:00
/// Identifier for the next task to spawn.
next_task_id: TaskId,
/// List of node tasks to spawn.
// TODO: stronger typing?
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr>, TaskId)>,
/// Receiver side for the events.
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr>, TaskId)>,
}
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> fmt::Debug for HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_list()
.entries(self.tasks.keys().cloned())
.finish()
}
}
/// Error that can happen in a task.
#[derive(Debug)]
pub enum TaskClosedEvent<TReachErr, THandlerErr> {
/// An error happend while we were trying to reach the node.
Reach(TReachErr),
/// An error happened after the node has been reached.
Node(HandledNodeError<THandlerErr>),
}
impl<TReachErr, THandlerErr> fmt::Display for TaskClosedEvent<TReachErr, THandlerErr>
where
TReachErr: fmt::Display,
THandlerErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TaskClosedEvent::Reach(err) => write!(f, "{}", err),
TaskClosedEvent::Node(err) => write!(f, "{}", err),
}
}
}
impl<TReachErr, THandlerErr> error::Error for TaskClosedEvent<TReachErr, THandlerErr>
where
TReachErr: error::Error + 'static,
THandlerErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
TaskClosedEvent::Reach(err) => Some(err),
TaskClosedEvent::Node(err) => Some(err),
}
}
}
/// Prototype for a `NodeHandler`.
pub trait IntoNodeHandler {
/// The node handler.
type Handler: NodeHandler;
/// Builds the node handler.
///
/// The `PeerId` is the id of the node the handler is going to handle.
fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler;
}
impl<T> IntoNodeHandler for T
where T: NodeHandler
{
type Handler = Self;
#[inline]
fn into_handler(self, _: &PeerId) -> Self {
self
}
}
/// Event that can happen on the `HandledNodesTasks`.
#[derive(Debug)]
pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// A task has been closed.
///
/// This happens once the node handler closes or an error happens.
// TODO: send back undelivered events?
TaskClosed {
/// Identifier of the task that closed.
id: TaskId,
/// What happened.
result: Result<(), TaskClosedEvent<TReachErr, THandlerErr>>,
/// If the task closed before reaching the node, this contains the handler that was passed
/// to `add_reach_attempt`.
handler: Option<TIntoHandler>,
},
/// A task has successfully connected to a node.
NodeReached {
/// Identifier of the task that succeeded.
id: TaskId,
/// Identifier of the node.
peer_id: PeerId,
},
/// A task has produced an event.
NodeEvent {
/// Identifier of the task that produced the event.
id: TaskId,
/// The produced event.
event: TOutEvent,
},
}
/// Identifier for a future that attempts to reach a node.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(usize);
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// Creates a new empty collection.
#[inline]
pub fn new() -> Self {
let (events_tx, events_rx) = mpsc::unbounded();
HandledNodesTasks {
tasks: Default::default(),
next_task_id: TaskId(0),
to_spawn: SmallVec::new(),
events_tx,
events_rx,
}
}
/// Adds to the collection a future that tries to reach a node.
///
/// This method spawns a task dedicated to resolving this future and processing the node's
/// events.
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: TIntoHandler) -> TaskId
where
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr> + Send + 'static,
TIntoHandler: IntoNodeHandler + Send + 'static,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::unbounded();
self.tasks.insert(task_id, tx);
let task = Box::new(NodeTask {
inner: NodeTaskInner::Future {
future,
handler,
events_buffer: Vec::new(),
},
events_tx: self.events_tx.clone(),
in_events_rx: rx.fuse(),
id: task_id,
});
self.to_spawn.push(task);
task_id
}
/// Sends an event to all the tasks, including the pending ones.
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,
{
for sender in self.tasks.values() {
// Note: it is possible that sending an event fails if the background task has already
Tests for HandledNodesTasks (#584) * Add unit tests for core::nodes::NodeStream * Move DummyMuxer to core/tests * Address grumbles * Impl Debug for SubstreamRef<P> * Add test for poll() * Don't need to open a substream * pretty printer test * More tests for NodeStream poll() * ListenerStream unit tests: transport() and listeners() * Tests for nodes/listeners.rs * Add a few tests to help illustrate the "drowning" behaviour of busy listeners * Tests for HandledNode * Address grumbles * Remove non-project specific stuff * Address grumbles * Prefer freestanding function * Untangle the code for old shutdown test from the new tests Add HandlerState and use it in TestBuilder Shorter test names * WIP – tests pass * Use a newtype to lighten up the function signatures a bit Test NotReady case * Cleanup Event enum Track events as they reach the Handler Describe complex test logic * Assert on the event trace * More tests for poll() * Switch to using usize as the OutboundOpenInfo so we can assert on event contents More tests for poll() * whitespace * whitespace and spelling * WIP tests for handled_node_tasks:Task * wip * Move Handler related code to dummy_handler * Sort out the events going to/from node * WIP tests for poll * Add a TestBuilder for NodeTask tests More NodeTask tests * Fixes broken test after upstream changes * Clarify the behaviour of is_shutting_down * Fix broken test * Test for task exit when muxers in- and outbound are closed * Add question about impossible Async::NotReady * Fix tests after recent changes on master * Upstream changes * Tests for HandledNodesTasks * Add test for HandledNodesTasks poll * Test we reach all nodes and then closed all nodes * Test event emission by HandledNodesTasks * Rustfmt * More rustfmt * Less noise * cleanup * Address grumbles * Better debug impl for HandledNodesTasks * Remove tests for Task we don't need Test Task.send_event() and id() using a HandledNodesTasks * Rename test builders * Don't organise tests in submodules * whitespace * Revert changes to Debug impl for HandledNodesTasks
2018-11-14 11:51:38 +01:00
// finished, but the local state hasn't reflected that yet because it hasn't been
// polled. This is not an error situation.
let _ = sender.unbounded_send(event.clone());
}
}
/// Grants access to an object that allows controlling a task of the collection.
///
/// Returns `None` if the task id is invalid.
#[inline]
pub fn task(&mut self, id: TaskId) -> Option<Task<TInEvent>> {
match self.tasks.entry(id) {
Entry::Occupied(inner) => Some(Task { inner }),
Entry::Vacant(_) => None,
}
}
/// Returns a list of all the active tasks.
#[inline]
pub fn tasks<'a>(&'a self) -> impl Iterator<Item = TaskId> + 'a {
self.tasks.keys().cloned()
}
/// Provides an API similar to `Stream`, except that it cannot produce an error.
pub fn poll(&mut self) -> Async<HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr>> {
for to_spawn in self.to_spawn.drain() {
tokio_executor::spawn(to_spawn);
}
loop {
match self.events_rx.poll() {
Ok(Async::Ready(Some((message, task_id)))) => {
// If the task id is no longer in `self.tasks`, that means that the user called
// `close()` on this task earlier. Therefore no new event should be generated
// for this task.
if !self.tasks.contains_key(&task_id) {
continue;
};
match message {
InToExtMessage::NodeEvent(event) => {
break Async::Ready(HandledNodesEvent::NodeEvent {
id: task_id,
event,
});
},
InToExtMessage::NodeReached(peer_id) => {
break Async::Ready(HandledNodesEvent::NodeReached {
id: task_id,
peer_id,
});
},
InToExtMessage::TaskClosed(result, handler) => {
let _ = self.tasks.remove(&task_id);
break Async::Ready(HandledNodesEvent::TaskClosed {
id: task_id, result, handler
});
},
}
}
Ok(Async::NotReady) => {
break Async::NotReady;
}
Ok(Async::Ready(None)) => {
unreachable!("The sender is in self as well, therefore the receiver never \
closes.")
},
Err(()) => unreachable!("An unbounded receiver never errors"),
}
}
}
}
/// Access to a task in the collection.
pub struct Task<'a, TInEvent: 'a> {
inner: OccupiedEntry<'a, TaskId, mpsc::UnboundedSender<TInEvent>>,
}
impl<'a, TInEvent> Task<'a, TInEvent> {
/// Sends an event to the given node.
// TODO: report back on delivery
#[inline]
pub fn send_event(&mut self, event: TInEvent) {
// It is possible that the sender is closed if the background task has already finished
// but the local state hasn't been updated yet because we haven't been polled in the
// meanwhile.
let _ = self.inner.get_mut().unbounded_send(event);
}
/// Returns the task id.
#[inline]
pub fn id(&self) -> TaskId {
*self.inner.key()
}
/// Closes the task.
///
/// No further event will be generated for this task.
pub fn close(self) {
self.inner.remove();
}
}
impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_tuple("Task")
.field(&self.id())
.finish()
}
}
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> Stream for HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
type Item = HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr>;
type Error = Void; // TODO: use ! once stable
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(self.poll().map(Option::Some))
}
}
/// Message to transmit from a task to the public API.
#[derive(Debug)]
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// A connection to a node has succeeded.
NodeReached(PeerId),
/// The task closed.
TaskClosed(Result<(), TaskClosedEvent<TReachErr, THandlerErr>>, Option<TIntoHandler>),
/// An event from the node.
NodeEvent(TOutEvent),
}
/// Implementation of `Future` that handles a single node, and all the communications between
/// the various components of the `HandledNodesTasks`.
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr>
where
TMuxer: StreamMuxer,
TIntoHandler: IntoNodeHandler,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
/// Sender to transmit events to the outside.
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error>, TaskId)>,
/// Receiving end for events sent from the main `HandledNodesTasks`.
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<TInEvent>>,
/// Inner state of the `NodeTask`.
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent>,
/// Identifier of the attempt.
id: TaskId,
}
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent>
where
TMuxer: StreamMuxer,
TIntoHandler: IntoNodeHandler,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
/// Future to resolve to connect to the node.
Future {
/// The future that will attempt to reach the node.
future: TFut,
/// The handler that will be used to build the `HandledNode`.
handler: TIntoHandler,
/// While we are dialing the future, we need to buffer the events received on
/// `in_events_rx` so that they get delivered once dialing succeeds. We can't simply leave
/// events in `in_events_rx` because we have to detect if it gets closed.
events_buffer: Vec<TInEvent>,
},
/// Fully functional node.
Node(HandledNode<TMuxer, TIntoHandler::Handler>),
/// A panic happened while polling.
Poisoned,
}
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr> Future for
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr>
where
TMuxer: StreamMuxer,
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr>,
TIntoHandler: IntoNodeHandler,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
match mem::replace(&mut self.inner, NodeTaskInner::Poisoned) {
// First possibility: we are still trying to reach a node.
NodeTaskInner::Future { mut future, handler, mut events_buffer } => {
// If self.in_events_rx is closed, we stop the task.
loop {
match self.in_events_rx.poll() {
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(event))) => events_buffer.push(event),
Ok(Async::NotReady) => break,
Err(_) => unreachable!("An UnboundedReceiver never errors"),
}
}
// Check whether dialing succeeded.
match future.poll() {
Ok(Async::Ready((peer_id, muxer))) => {
let mut node = HandledNode::new(muxer, handler.into_handler(&peer_id));
let event = InToExtMessage::NodeReached(peer_id);
for event in events_buffer {
node.inject_event(event);
}
if self.events_tx.unbounded_send((event, self.id)).is_err() {
node.shutdown();
}
self.inner = NodeTaskInner::Node(node);
}
Ok(Async::NotReady) => {
self.inner = NodeTaskInner::Future { future, handler, events_buffer };
return Ok(Async::NotReady);
},
Err(err) => {
// End the task
let event = InToExtMessage::TaskClosed(Err(TaskClosedEvent::Reach(err)), Some(handler));
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(()));
}
}
},
// Second possibility: we have a node.
NodeTaskInner::Node(mut node) => {
// Start by handling commands received from the outside of the task.
if !self.in_events_rx.is_done() {
loop {
match self.in_events_rx.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(event))) => {
Tests for HandledNodesTasks (#584) * Add unit tests for core::nodes::NodeStream * Move DummyMuxer to core/tests * Address grumbles * Impl Debug for SubstreamRef<P> * Add test for poll() * Don't need to open a substream * pretty printer test * More tests for NodeStream poll() * ListenerStream unit tests: transport() and listeners() * Tests for nodes/listeners.rs * Add a few tests to help illustrate the "drowning" behaviour of busy listeners * Tests for HandledNode * Address grumbles * Remove non-project specific stuff * Address grumbles * Prefer freestanding function * Untangle the code for old shutdown test from the new tests Add HandlerState and use it in TestBuilder Shorter test names * WIP – tests pass * Use a newtype to lighten up the function signatures a bit Test NotReady case * Cleanup Event enum Track events as they reach the Handler Describe complex test logic * Assert on the event trace * More tests for poll() * Switch to using usize as the OutboundOpenInfo so we can assert on event contents More tests for poll() * whitespace * whitespace and spelling * WIP tests for handled_node_tasks:Task * wip * Move Handler related code to dummy_handler * Sort out the events going to/from node * WIP tests for poll * Add a TestBuilder for NodeTask tests More NodeTask tests * Fixes broken test after upstream changes * Clarify the behaviour of is_shutting_down * Fix broken test * Test for task exit when muxers in- and outbound are closed * Add question about impossible Async::NotReady * Fix tests after recent changes on master * Upstream changes * Tests for HandledNodesTasks * Add test for HandledNodesTasks poll * Test we reach all nodes and then closed all nodes * Test event emission by HandledNodesTasks * Rustfmt * More rustfmt * Less noise * cleanup * Address grumbles * Better debug impl for HandledNodesTasks * Remove tests for Task we don't need Test Task.send_event() and id() using a HandledNodesTasks * Rename test builders * Don't organise tests in submodules * whitespace * Revert changes to Debug impl for HandledNodesTasks
2018-11-14 11:51:38 +01:00
node.inject_event(event)
},
Ok(Async::Ready(None)) => {
2018-10-29 20:38:32 +11:00
// Node closed by the external API; start shutdown process.
node.shutdown();
break;
}
Err(()) => unreachable!("An unbounded receiver never errors"),
}
}
}
// Process the node.
loop {
match node.poll() {
Ok(Async::NotReady) => {
self.inner = NodeTaskInner::Node(node);
return Ok(Async::NotReady);
},
Ok(Async::Ready(Some(event))) => {
let event = InToExtMessage::NodeEvent(event);
if self.events_tx.unbounded_send((event, self.id)).is_err() {
node.shutdown();
}
}
Ok(Async::Ready(None)) => {
let event = InToExtMessage::TaskClosed(Ok(()), None);
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(())); // End the task.
}
Err(err) => {
let event = InToExtMessage::TaskClosed(Err(TaskClosedEvent::Node(err)), None);
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(())); // End the task.
}
}
}
},
// This happens if a previous poll has ended unexpectedly. The API of futures
// guarantees that we shouldn't be polled again.
NodeTaskInner::Poisoned => panic!("the node task panicked or errored earlier")
}
}
}
}