diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index e5009d69..12e8d1ca 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -24,7 +24,7 @@ use crate::{ nodes::{ node::Substream, handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent}, - handled_node_tasks::{Task as HandledNodesTask, TaskId}, + handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId}, handled_node::{HandledNodeError, NodeHandler} } }; @@ -285,12 +285,13 @@ impl CollectionStream ReachAttemptId where TFut: Future + Send + 'static, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, + THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? TReachErr: error::Error + Send + 'static, THandlerErr: error::Error + Send + 'static, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required { @@ -459,7 +460,7 @@ impl CollectionStream fmt::Result { match *self { - InterruptError::ReachAttemptNotFound => + InterruptError::ReachAttemptNotFound => write!(f, "The reach attempt could not be found."), InterruptError::AlreadyReached => write!(f, "The reach attempt has already completed or reached the node."), diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 42c9bdfd..32c713da 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -59,7 +59,7 @@ mod tests; // conditions in the user's code. See similar comments in the documentation of `NodeStream`. /// Implementation of `Stream` that handles a collection of nodes. -pub struct HandledNodesTasks { +pub struct HandledNodesTasks { /// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts /// the task. It is possible that we receive messages from tasks that used to be in this list /// but no longer are, in which case we should ignore them. @@ -73,12 +73,12 @@ pub struct HandledNodesTasks + Send>; 8]>, /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. - events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, /// Receiver side for the events. - events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, + events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, } -impl fmt::Debug for HandledNodesTasks { +impl fmt::Debug for HandledNodesTasks { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { f.debug_list() .entries(self.tasks.keys().cloned()) @@ -121,9 +121,31 @@ where } } +/// Prototype for a `NodeHandler`. +pub trait IntoNodeHandler { + /// The node handler. + type Handler: NodeHandler; + + /// Builds the node handler. + /// + /// The `PeerId` is the id of the node the handler is going to handle. + fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler; +} + +impl IntoNodeHandler for T +where T: NodeHandler +{ + type Handler = Self; + + #[inline] + fn into_handler(self, _: &PeerId) -> Self { + self + } +} + /// Event that can happen on the `HandledNodesTasks`. #[derive(Debug)] -pub enum HandledNodesEvent { +pub enum HandledNodesEvent { /// A task has been closed. /// /// This happens once the node handler closes or an error happens. @@ -135,7 +157,7 @@ pub enum HandledNodesEvent { result: Result<(), TaskClosedEvent>, /// If the task closed before reaching the node, this contains the handler that was passed /// to `add_reach_attempt`. - handler: Option, + handler: Option, }, /// A task has successfully connected to a node. @@ -159,7 +181,7 @@ pub enum HandledNodesEvent { #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct TaskId(usize); -impl HandledNodesTasks { +impl HandledNodesTasks { /// Creates a new empty collection. #[inline] pub fn new() -> Self { @@ -178,15 +200,16 @@ impl HandledNodesTasks(&mut self, future: TFut, handler: THandler) -> TaskId + pub fn add_reach_attempt(&mut self, future: TFut, handler: TIntoHandler) -> TaskId where TFut: Future + Send + 'static, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + TIntoHandler: IntoNodeHandler + Send + 'static, + TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, TReachErr: error::Error + Send + 'static, THandlerErr: error::Error + Send + 'static, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required { @@ -241,7 +264,7 @@ impl HandledNodesTasks Async> { + pub fn poll(&mut self) -> Async> { for to_spawn in self.to_spawn.drain() { tokio_executor::spawn(to_spawn); } @@ -327,8 +350,8 @@ impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> { } } -impl Stream for HandledNodesTasks { - type Item = HandledNodesEvent; +impl Stream for HandledNodesTasks { + type Item = HandledNodesEvent; type Error = Void; // TODO: use ! once stable #[inline] @@ -339,43 +362,45 @@ impl Stream for HandledNo /// Message to transmit from a task to the public API. #[derive(Debug)] -enum InToExtMessage { +enum InToExtMessage { /// A connection to a node has succeeded. NodeReached(PeerId), /// The task closed. - TaskClosed(Result<(), TaskClosedEvent>, Option), + TaskClosed(Result<(), TaskClosedEvent>, Option), /// An event from the node. NodeEvent(TOutEvent), } /// Implementation of `Future` that handles a single node, and all the communications between /// the various components of the `HandledNodesTasks`. -struct NodeTask +struct NodeTask where TMuxer: StreamMuxer, - THandler: NodeHandler>, + TIntoHandler: IntoNodeHandler, + TIntoHandler::Handler: NodeHandler>, { /// Sender to transmit events to the outside. - events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage::Error>, TaskId)>, /// Receiving end for events sent from the main `HandledNodesTasks`. in_events_rx: stream::Fuse>, /// Inner state of the `NodeTask`. - inner: NodeTaskInner, + inner: NodeTaskInner, /// Identifier of the attempt. id: TaskId, } -enum NodeTaskInner +enum NodeTaskInner where TMuxer: StreamMuxer, - THandler: NodeHandler>, + TIntoHandler: IntoNodeHandler, + TIntoHandler::Handler: NodeHandler>, { /// Future to resolve to connect to the node. Future { /// The future that will attempt to reach the node. future: TFut, /// The handler that will be used to build the `HandledNode`. - handler: THandler, + handler: TIntoHandler, /// While we are dialing the future, we need to buffer the events received on /// `in_events_rx` so that they get delivered once dialing succeeds. We can't simply leave /// events in `in_events_rx` because we have to detect if it gets closed. @@ -383,18 +408,19 @@ where }, /// Fully functional node. - Node(HandledNode), + Node(HandledNode), /// A panic happened while polling. Poisoned, } -impl Future for - NodeTask +impl Future for + NodeTask where TMuxer: StreamMuxer, TFut: Future, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, + TIntoHandler: IntoNodeHandler, + TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, { type Item = (); type Error = (); @@ -416,8 +442,8 @@ where // Check whether dialing succeeded. match future.poll() { Ok(Async::Ready((peer_id, muxer))) => { + let mut node = HandledNode::new(muxer, handler.into_handler(&peer_id)); let event = InToExtMessage::NodeReached(peer_id); - let mut node = HandledNode::new(muxer, handler); for event in events_buffer { node.inject_event(event); } diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 25055a20..c6a987c1 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -34,6 +34,7 @@ use crate::{ HandledNodeError, NodeHandler }, + handled_node_tasks::IntoNodeHandler, node::Substream }, nodes::listeners::{ListenersEvent, ListenersStream}, @@ -377,8 +378,9 @@ where TTrans: Transport, TTrans::Error: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + THandler: IntoNodeHandler + Send + 'static, + THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, @@ -523,8 +525,9 @@ impl where TTrans: Transport + Clone, TMuxer: StreamMuxer, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + THandler: IntoNodeHandler + Send + 'static, + THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, { /// Creates a new node events stream. @@ -752,8 +755,9 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + THandler: IntoNodeHandler + Send + 'static, + THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, { // Start by polling the listeners for events. @@ -1138,8 +1142,9 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - THandler::OutboundOpenInfo: Send + 'static, + THandler: IntoNodeHandler + Send + 'static, + THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, { /// If we are connected, returns the `PeerConnected`. @@ -1362,8 +1367,9 @@ where TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + THandler: IntoNodeHandler + Send + 'static, + THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, TInEvent: Send + 'static, TOutEvent: Send + 'static, diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index 0741b2c3..956e8a0a 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -33,6 +33,7 @@ //! > connection with a remote. In order to handle a protocol that requires knowledge of //! > the network as a whole, see the `NetworkBehaviour` trait. +use crate::PeerId; use crate::upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -46,7 +47,7 @@ pub use self::dummy::DummyProtocolsHandler; pub use self::map_in::MapInEvent; pub use self::map_out::MapOutEvent; pub use self::node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder}; -pub use self::select::ProtocolsHandlerSelect; +pub use self::select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; mod dummy; mod map_in; @@ -207,17 +208,19 @@ pub trait ProtocolsHandler { where Self: Sized, { - NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10), Duration::from_secs(5)) + IntoProtocolsHandler::into_node_handler_builder(self) } /// Builds an implementation of `NodeHandler` that handles this protocol exclusively. /// /// > **Note**: This is a shortcut for `self.into_node_handler_builder().build()`. #[inline] + #[deprecated(note = "Use into_node_handler_builder instead")] fn into_node_handler(self) -> NodeHandlerWrapper where Self: Sized, { + #![allow(deprecated)] self.into_node_handler_builder().build() } } @@ -353,3 +356,45 @@ where } } } + +/// Prototype for a `ProtocolsHandler`. +pub trait IntoProtocolsHandler { + /// The protocols handler. + type Handler: ProtocolsHandler; + + /// Builds the protocols handler. + /// + /// The `PeerId` is the id of the node the handler is going to handle. + fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler; + + /// Builds an implementation of `IntoProtocolsHandler` that handles both this protocol and the + /// other one together. + #[inline] + fn select(self, other: TProto2) -> IntoProtocolsHandlerSelect + where + Self: Sized, + { + IntoProtocolsHandlerSelect::new(self, other) + } + + /// Creates a builder that will allow creating a `NodeHandler` that handles this protocol + /// exclusively. + #[inline] + fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder + where + Self: Sized, + { + NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10), Duration::from_secs(5)) + } +} + +impl IntoProtocolsHandler for T +where T: ProtocolsHandler +{ + type Handler = Self; + + #[inline] + fn into_handler(self, _: &PeerId) -> Self { + self + } +} diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index 6a3d2833..fbf84133 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -19,8 +19,10 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + PeerId, nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}, - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + nodes::handled_node_tasks::IntoNodeHandler, + protocols_handler::{ProtocolsHandler, IntoProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ self, OutboundUpgrade, @@ -33,12 +35,9 @@ use std::time::{Duration, Instant}; use tokio_timer::{Delay, Timeout}; /// Prototype for a `NodeHandlerWrapper`. -pub struct NodeHandlerWrapperBuilder -where - TProtoHandler: ProtocolsHandler, -{ +pub struct NodeHandlerWrapperBuilder { /// The underlying handler. - handler: TProtoHandler, + handler: TIntoProtoHandler, /// Timeout for incoming substreams negotiation. in_timeout: Duration, /// Timeout for outgoing substreams negotiation. @@ -47,13 +46,13 @@ where useless_timeout: Duration, } -impl NodeHandlerWrapperBuilder +impl NodeHandlerWrapperBuilder where - TProtoHandler: ProtocolsHandler + TIntoProtoHandler: IntoProtocolsHandler { /// Builds a `NodeHandlerWrapperBuilder`. #[inline] - pub(crate) fn new(handler: TProtoHandler, in_timeout: Duration, out_timeout: Duration, useless_timeout: Duration) -> Self { + pub(crate) fn new(handler: TIntoProtoHandler, in_timeout: Duration, out_timeout: Duration, useless_timeout: Duration) -> Self { NodeHandlerWrapperBuilder { handler, in_timeout, @@ -85,8 +84,11 @@ where } /// Builds the `NodeHandlerWrapper`. + #[deprecated(note = "Pass the NodeHandlerWrapperBuilder directly")] #[inline] - pub fn build(self) -> NodeHandlerWrapper { + pub fn build(self) -> NodeHandlerWrapper + where TIntoProtoHandler: ProtocolsHandler + { NodeHandlerWrapper { handler: self.handler, negotiating_in: Vec::new(), @@ -101,6 +103,30 @@ where } } +impl IntoNodeHandler for NodeHandlerWrapperBuilder +where + TIntoProtoHandler: IntoProtocolsHandler, + TProtoHandler: ProtocolsHandler, + // TODO: meh for Debug + ::Substream>>::Error: std::fmt::Debug +{ + type Handler = NodeHandlerWrapper; + + fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler { + NodeHandlerWrapper { + handler: self.handler.into_handler(remote_peer_id), + negotiating_in: Vec::new(), + negotiating_out: Vec::new(), + in_timeout: self.in_timeout, + out_timeout: self.out_timeout, + queued_dial_upgrades: Vec::new(), + unique_dial_upgrade_id: 0, + connection_shutdown: None, + useless_timeout: self.useless_timeout, + } + } +} + /// Wraps around an implementation of `ProtocolsHandler`, and implements `NodeHandler`. // TODO: add a caching system for protocols that are supported or not pub struct NodeHandlerWrapper @@ -138,6 +164,7 @@ where impl NodeHandler for NodeHandlerWrapper where TProtoHandler: ProtocolsHandler, + // TODO: meh for Debug ::Substream>>::Error: std::fmt::Debug { type InEvent = TProtoHandler::InEvent; diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index 41031f10..4bb277f8 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -19,9 +19,15 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + PeerId, either::EitherError, either::EitherOutput, - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{ + IntoProtocolsHandler, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, + }, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -33,6 +39,46 @@ use crate::{ use futures::prelude::*; use tokio_io::{AsyncRead, AsyncWrite}; +/// Implementation of `IntoProtocolsHandler` that combines two protocols into one. +#[derive(Debug, Clone)] +pub struct IntoProtocolsHandlerSelect { + proto1: TProto1, + proto2: TProto2, +} + +impl IntoProtocolsHandlerSelect { + /// Builds a `IntoProtocolsHandlerSelect`. + #[inline] + pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { + IntoProtocolsHandlerSelect { + proto1, + proto2, + } + } +} + +impl IntoProtocolsHandler for IntoProtocolsHandlerSelect +where + TProto1: IntoProtocolsHandler, + TProto2: IntoProtocolsHandler, + TProto1::Handler: ProtocolsHandler, + TProto2::Handler: ProtocolsHandler, + TSubstream: AsyncRead + AsyncWrite, + ::InboundProtocol: InboundUpgrade, + ::InboundProtocol: InboundUpgrade, + ::OutboundProtocol: OutboundUpgrade, + ::OutboundProtocol: OutboundUpgrade +{ + type Handler = ProtocolsHandlerSelect; + + fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler { + ProtocolsHandlerSelect { + proto1: self.proto1.into_handler(remote_peer_id), + proto2: self.proto2.into_handler(remote_peer_id), + } + } +} + /// Implementation of `ProtocolsHandler` that combines two protocols into one. #[derive(Debug, Clone)] pub struct ProtocolsHandlerSelect { diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 0f09f50b..e073fdb2 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -26,7 +26,7 @@ //! # Initializing a Swarm //! //! Creating a `Swarm` requires three things: -//! +//! //! - An implementation of the `Transport` trait. This is the type that will be used in order to //! reach nodes on the network based on their address. See the `transport` module for more //! information. @@ -49,7 +49,7 @@ use crate::{ node::Substream, raw_swarm::{RawSwarm, RawSwarmEvent} }, - protocols_handler::{NodeHandlerWrapper, ProtocolsHandler}, + protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapper, IntoProtocolsHandler, ProtocolsHandler}, topology::Topology, transport::TransportError, topology::DisconnectReason, @@ -67,10 +67,10 @@ where TTransport: Transport, { raw_swarm: RawSwarm< TTransport, - <>::ProtocolsHandler as ProtocolsHandler>::InEvent, - <>::ProtocolsHandler as ProtocolsHandler>::OutEvent, - NodeHandlerWrapper, - <>::ProtocolsHandler as ProtocolsHandler>::Error, + <<>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, + <<>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent, + NodeHandlerWrapperBuilder, + <<>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error, >, /// Handles which nodes to connect to and how to handle the events sent back by the protocol @@ -120,24 +120,25 @@ where TBehaviour: NetworkBehaviour, TTransport::Listener: Send + 'static, TTransport::ListenerUpgrade: Send + 'static, TTransport::Dial: Send + 'static, - TBehaviour::ProtocolsHandler: ProtocolsHandler> + Send + 'static, - ::InEvent: Send + 'static, - ::OutEvent: Send + 'static, - ::Error: Send + 'static, - ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - ::InboundProtocol: InboundUpgrade> + Send + 'static, - <::InboundProtocol as UpgradeInfo>::Info: Send + 'static, - <::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, - <<::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - <::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, - <::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, - ::OutboundProtocol: OutboundUpgrade> + Send + 'static, - <::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, - <::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, - <<::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - <::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, - <::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, - as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + TBehaviour::ProtocolsHandler: Send + 'static, + ::Handler: ProtocolsHandler> + Send + 'static, + <::Handler as ProtocolsHandler>::InEvent: Send + 'static, + <::Handler as ProtocolsHandler>::OutEvent: Send + 'static, + <::Handler as ProtocolsHandler>::Error: Send + 'static, + <::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + <::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade> + Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, + <::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade> + Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, + ::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary TTopology: Topology, { /// Builds a new `Swarm`. @@ -145,6 +146,7 @@ where TBehaviour: NetworkBehaviour, pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology) -> Self { let supported_protocols = behaviour .new_handler() + .into_handler(topology.local_peer_id()) .listen_protocol() .protocol_info() .into_iter() @@ -187,7 +189,7 @@ where TBehaviour: NetworkBehaviour, #[inline] pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), TransportError> { let handler = me.behaviour.new_handler(); - me.raw_swarm.dial(addr, handler.into_node_handler()) + me.raw_swarm.dial(addr, handler.into_node_handler_builder()) } /// Tries to reach the given peer using the elements in the topology. @@ -197,7 +199,7 @@ where TBehaviour: NetworkBehaviour, #[inline] pub fn dial(me: &mut Self, peer_id: PeerId) { let addrs = me.topology.addresses_of_peer(&peer_id); - let handler = me.behaviour.new_handler().into_node_handler(); + let handler = me.behaviour.new_handler().into_node_handler_builder(); if let Some(peer) = me.raw_swarm.peer(peer_id).as_not_connected() { let _ = peer.connect_iter(addrs, handler); } @@ -238,24 +240,25 @@ where TBehaviour: NetworkBehaviour, TTransport::Listener: Send + 'static, TTransport::ListenerUpgrade: Send + 'static, TTransport::Dial: Send + 'static, - TBehaviour::ProtocolsHandler: ProtocolsHandler> + Send + 'static, - ::InEvent: Send + 'static, - ::OutEvent: Send + 'static, - ::Error: Send + 'static, - ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - ::InboundProtocol: InboundUpgrade> + Send + 'static, - <::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, - <::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, - <::InboundProtocol as UpgradeInfo>::Info: Send + 'static, - <::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, - <<::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - ::OutboundProtocol: OutboundUpgrade> + Send + 'static, - <::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, - <::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, - <::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, - <::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, - <<::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + TBehaviour::ProtocolsHandler: Send + 'static, + ::Handler: ProtocolsHandler> + Send + 'static, + <::Handler as ProtocolsHandler>::InEvent: Send + 'static, + <::Handler as ProtocolsHandler>::OutEvent: Send + 'static, + <::Handler as ProtocolsHandler>::Error: Send + 'static, + <::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + <::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade> + Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, + <::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade> + Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, + ::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary TTopology: Topology, { type Item = TBehaviour::OutEvent; @@ -291,7 +294,7 @@ where TBehaviour: NetworkBehaviour, }, Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => { let handler = self.behaviour.new_handler(); - incoming.accept(handler.into_node_handler()); + incoming.accept(handler.into_node_handler_builder()); }, Async::Ready(RawSwarmEvent::ListenerClosed { .. }) => {}, Async::Ready(RawSwarmEvent::IncomingConnectionError { .. }) => {}, @@ -345,7 +348,7 @@ where TBehaviour: NetworkBehaviour, /// one that handles all the behaviours at once. pub trait NetworkBehaviour { /// Handler for all the protocols the network supports. - type ProtocolsHandler: ProtocolsHandler; + type ProtocolsHandler: IntoProtocolsHandler; /// Event generated by the swarm. type OutEvent; @@ -367,13 +370,13 @@ pub trait NetworkBehaviour { fn inject_node_event( &mut self, peer_id: PeerId, - event: ::OutEvent + event: <::Handler as ProtocolsHandler>::OutEvent ); /// Polls for things that swarm should do. /// /// This API mimics the API of the `Stream` trait. - fn poll(&mut self, topology: &mut PollParameters) -> Async::InEvent, Self::OutEvent>>; + fn poll(&mut self, topology: &mut PollParameters) -> Async::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>; } /// Used when deriving `NetworkBehaviour`. When deriving `NetworkBehaviour`, must be implemented diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 1e420540..ad696f7b 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -53,8 +53,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let net_behv_event_proc = quote!{::libp2p::core::swarm::NetworkBehaviourEventProcess}; let either_ident = quote!{::libp2p::core::either::EitherOutput}; let network_behaviour_action = quote!{::libp2p::core::swarm::NetworkBehaviourAction}; + let into_protocols_handler = quote!{::libp2p::core::protocols_handler::IntoProtocolsHandler}; let protocols_handler = quote!{::libp2p::core::protocols_handler::ProtocolsHandler}; - let proto_select_ident = quote!{::libp2p::core::protocols_handler::ProtocolsHandlerSelect}; + let into_proto_select_ident = quote!{::libp2p::core::protocols_handler::IntoProtocolsHandlerSelect}; let peer_id = quote!{::libp2p::core::PeerId}; let connected_point = quote!{::libp2p::core::swarm::ConnectedPoint}; @@ -99,10 +100,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { vec![ quote!{#ty: #trait_to_impl<#topology_generic>}, quote!{Self: #net_behv_event_proc<<#ty as #trait_to_impl<#topology_generic>>::OutEvent>}, - quote!{<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler: #protocols_handler}, + quote!{<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler: #protocols_handler}, // Note: this bound is required because of https://github.com/rust-lang/rust/issues/55697 - quote!{<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<#substream_generic>}, - quote!{<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<#substream_generic>}, + quote!{<<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<#substream_generic>}, + quote!{<<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<#substream_generic>}, ] }) .collect::>(); @@ -213,7 +214,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let ty = &field.ty; let field_info = quote!{ <#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler }; match ph_ty { - Some(ev) => ph_ty = Some(quote!{ #proto_select_ident<#ev, #field_info> }), + Some(ev) => ph_ty = Some(quote!{ #into_proto_select_ident<#ev, #field_info> }), ref mut ev @ None => *ev = Some(field_info), } } @@ -324,7 +325,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #[inline] fn new_handler(&mut self) -> Self::ProtocolsHandler { - use #protocols_handler; + use #into_protocols_handler; #new_handler } @@ -342,17 +343,17 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { fn inject_node_event( &mut self, peer_id: #peer_id, - event: ::OutEvent + event: <::Handler as #protocols_handler>::OutEvent ) { match event { #(#inject_node_event_stmts),* } } - fn poll(&mut self, poll_params: &mut #poll_parameters) -> ::libp2p::futures::Async<#network_behaviour_action<::InEvent, Self::OutEvent>> { + fn poll(&mut self, poll_params: &mut #poll_parameters) -> ::libp2p::futures::Async<#network_behaviour_action<<::Handler as #protocols_handler>::InEvent, Self::OutEvent>> { use libp2p::futures::prelude::*; #(#poll_stmts)* - let f: ::libp2p::futures::Async<#network_behaviour_action<::InEvent, Self::OutEvent>> = #poll_method; + let f: ::libp2p::futures::Async<#network_behaviour_action<<::Handler as #protocols_handler>::InEvent, Self::OutEvent>> = #poll_method; f } }