Add IntoNodeHandler and IntoProtocolsHandler traits (#848)

* Add IntoNodeHandler

* Add IntoProtocolsHandler
This commit is contained in:
Pierre Krieger
2019-01-14 14:22:25 +01:00
committed by GitHub
parent 60db872c31
commit bf52e9bd19
8 changed files with 267 additions and 112 deletions

View File

@ -24,7 +24,7 @@ use crate::{
nodes::{
node::Substream,
handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent},
handled_node_tasks::{Task as HandledNodesTask, TaskId},
handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId},
handled_node::{HandledNodeError, NodeHandler}
}
};
@ -285,12 +285,13 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
-> ReachAttemptId
where
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr> + Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler: IntoNodeHandler + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
{
@ -459,7 +460,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
}
}
/// Reach attempt interrupt errors.
/// Reach attempt interrupt errors.
#[derive(Debug)]
pub enum InterruptError {
/// An invalid reach attempt has been used to try to interrupt. The task
@ -475,7 +476,7 @@ pub enum InterruptError {
impl fmt::Display for InterruptError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
InterruptError::ReachAttemptNotFound =>
InterruptError::ReachAttemptNotFound =>
write!(f, "The reach attempt could not be found."),
InterruptError::AlreadyReached =>
write!(f, "The reach attempt has already completed or reached the node."),

View File

@ -59,7 +59,7 @@ mod tests;
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
/// Implementation of `Stream` that handles a collection of nodes.
pub struct HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
/// the task. It is possible that we receive messages from tasks that used to be in this list
/// but no longer are, in which case we should ignore them.
@ -73,12 +73,12 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerE
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, THandler, TReachErr, THandlerErr>, TaskId)>,
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr>, TaskId)>,
/// Receiver side for the events.
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, THandler, TReachErr, THandlerErr>, TaskId)>,
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr>, TaskId)>,
}
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> fmt::Debug for HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_list()
.entries(self.tasks.keys().cloned())
@ -121,9 +121,31 @@ where
}
}
/// Prototype for a `NodeHandler`.
pub trait IntoNodeHandler {
/// The node handler.
type Handler: NodeHandler;
/// Builds the node handler.
///
/// The `PeerId` is the id of the node the handler is going to handle.
fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler;
}
impl<T> IntoNodeHandler for T
where T: NodeHandler
{
type Handler = Self;
#[inline]
fn into_handler(self, _: &PeerId) -> Self {
self
}
}
/// Event that can happen on the `HandledNodesTasks`.
#[derive(Debug)]
pub enum HandledNodesEvent<TOutEvent, THandler, TReachErr, THandlerErr> {
pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// A task has been closed.
///
/// This happens once the node handler closes or an error happens.
@ -135,7 +157,7 @@ pub enum HandledNodesEvent<TOutEvent, THandler, TReachErr, THandlerErr> {
result: Result<(), TaskClosedEvent<TReachErr, THandlerErr>>,
/// If the task closed before reaching the node, this contains the handler that was passed
/// to `add_reach_attempt`.
handler: Option<THandler>,
handler: Option<TIntoHandler>,
},
/// A task has successfully connected to a node.
@ -159,7 +181,7 @@ pub enum HandledNodesEvent<TOutEvent, THandler, TReachErr, THandlerErr> {
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(usize);
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// Creates a new empty collection.
#[inline]
pub fn new() -> Self {
@ -178,15 +200,16 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> HandledNodesTasks<TI
///
/// This method spawns a task dedicated to resolving this future and processing the node's
/// events.
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler) -> TaskId
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: TIntoHandler) -> TaskId
where
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr> + Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TIntoHandler: IntoNodeHandler + Send + 'static,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
{
@ -241,7 +264,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> HandledNodesTasks<TI
}
/// Provides an API similar to `Stream`, except that it cannot produce an error.
pub fn poll(&mut self) -> Async<HandledNodesEvent<TOutEvent, THandler, TReachErr, THandlerErr>> {
pub fn poll(&mut self) -> Async<HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr>> {
for to_spawn in self.to_spawn.drain() {
tokio_executor::spawn(to_spawn);
}
@ -327,8 +350,8 @@ impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> {
}
}
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> Stream for HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
type Item = HandledNodesEvent<TOutEvent, THandler, TReachErr, THandlerErr>;
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> Stream for HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
type Item = HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr>;
type Error = Void; // TODO: use ! once stable
#[inline]
@ -339,43 +362,45 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> Stream for HandledNo
/// Message to transmit from a task to the public API.
#[derive(Debug)]
enum InToExtMessage<TOutEvent, THandler, TReachErr, THandlerErr> {
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
/// A connection to a node has succeeded.
NodeReached(PeerId),
/// The task closed.
TaskClosed(Result<(), TaskClosedEvent<TReachErr, THandlerErr>>, Option<THandler>),
TaskClosed(Result<(), TaskClosedEvent<TReachErr, THandlerErr>>, Option<TIntoHandler>),
/// An event from the node.
NodeEvent(TOutEvent),
}
/// Implementation of `Future` that handles a single node, and all the communications between
/// the various components of the `HandledNodesTasks`.
struct NodeTask<TFut, TMuxer, THandler, TInEvent, TOutEvent, TReachErr>
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>>,
TIntoHandler: IntoNodeHandler,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
/// Sender to transmit events to the outside.
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, THandler, TReachErr, THandler::Error>, TaskId)>,
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error>, TaskId)>,
/// Receiving end for events sent from the main `HandledNodesTasks`.
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<TInEvent>>,
/// Inner state of the `NodeTask`.
inner: NodeTaskInner<TFut, TMuxer, THandler, TInEvent>,
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent>,
/// Identifier of the attempt.
id: TaskId,
}
enum NodeTaskInner<TFut, TMuxer, THandler, TInEvent>
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>>,
TIntoHandler: IntoNodeHandler,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
/// Future to resolve to connect to the node.
Future {
/// The future that will attempt to reach the node.
future: TFut,
/// The handler that will be used to build the `HandledNode`.
handler: THandler,
handler: TIntoHandler,
/// While we are dialing the future, we need to buffer the events received on
/// `in_events_rx` so that they get delivered once dialing succeeds. We can't simply leave
/// events in `in_events_rx` because we have to detect if it gets closed.
@ -383,18 +408,19 @@ where
},
/// Fully functional node.
Node(HandledNode<TMuxer, THandler>),
Node(HandledNode<TMuxer, TIntoHandler::Handler>),
/// A panic happened while polling.
Poisoned,
}
impl<TFut, TMuxer, THandler, TInEvent, TOutEvent, TReachErr> Future for
NodeTask<TFut, TMuxer, THandler, TInEvent, TOutEvent, TReachErr>
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr> Future for
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr>
where
TMuxer: StreamMuxer,
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr>,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
TIntoHandler: IntoNodeHandler,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
{
type Item = ();
type Error = ();
@ -416,8 +442,8 @@ where
// Check whether dialing succeeded.
match future.poll() {
Ok(Async::Ready((peer_id, muxer))) => {
let mut node = HandledNode::new(muxer, handler.into_handler(&peer_id));
let event = InToExtMessage::NodeReached(peer_id);
let mut node = HandledNode::new(muxer, handler);
for event in events_buffer {
node.inject_event(event);
}

View File

@ -34,6 +34,7 @@ use crate::{
HandledNodeError,
NodeHandler
},
handled_node_tasks::IntoNodeHandler,
node::Substream
},
nodes::listeners::{ListenersEvent, ListenersStream},
@ -377,8 +378,9 @@ where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandler: IntoNodeHandler + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandlerErr: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
@ -523,8 +525,9 @@ impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
where
TTrans: Transport + Clone,
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandler: IntoNodeHandler + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandlerErr: error::Error + Send + 'static,
{
/// Creates a new node events stream.
@ -752,8 +755,9 @@ where
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandler: IntoNodeHandler + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandlerErr: error::Error + Send + 'static,
{
// Start by polling the listeners for events.
@ -1138,8 +1142,9 @@ where
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static,
THandler: IntoNodeHandler + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandlerErr: error::Error + Send + 'static,
{
/// If we are connected, returns the `PeerConnected`.
@ -1362,8 +1367,9 @@ where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandler: IntoNodeHandler + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,