diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index c31707e3..c0578b50 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -21,7 +21,8 @@ use fnv::FnvHashMap; use futures::{prelude::*, sync::mpsc, sync::oneshot, task}; use muxing::StreamMuxer; -use nodes::node::{NodeEvent, NodeStream, Substream}; +use nodes::node::Substream; +use nodes::handled_node::{HandledNode, NodeHandler}; use smallvec::SmallVec; use std::collections::hash_map::{Entry, OccupiedEntry}; use std::io::Error as IoError; @@ -49,12 +50,9 @@ use {Multiaddr, PeerId}; /// Implementation of `Stream` that handles a collection of nodes. // TODO: implement Debug -pub struct CollectionStream -where - TMuxer: StreamMuxer, -{ +pub struct CollectionStream { /// List of nodes, with a sender allowing to communicate messages. - nodes: FnvHashMap)>, + nodes: FnvHashMap)>, /// Known state of a task. Tasks are identified by the reach attempt ID. tasks: FnvHashMap, /// Identifier for the next task to spawn. @@ -67,18 +65,9 @@ where to_notify: Option, /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. - events_tx: mpsc::UnboundedSender<(InToExtMessage, ReachAttemptId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage, ReachAttemptId)>, /// Receiver side for the events. - events_rx: mpsc::UnboundedReceiver<(InToExtMessage, ReachAttemptId)>, - - /// Instead of passing directly the user data when opening an outbound substream attempt, we - /// store it here and pass a `usize` to the node. This makes it possible to instantly close - /// some attempts if necessary. - // TODO: use something else than hashmap? we often need to iterate over everything, and a - // SmallVec may be better - outbound_attempts: FnvHashMap, - /// Identifier for the next entry in `outbound_attempts`. - next_outbound_attempt: usize, + events_rx: mpsc::UnboundedReceiver<(InToExtMessage, ReachAttemptId)>, } /// State of a task, as known by the frontend (the `ColletionStream`). Asynchronous compared to @@ -106,10 +95,7 @@ impl TaskKnownState { /// Event that can happen on the `CollectionStream`. // TODO: implement Debug -pub enum CollectionEvent -where - TMuxer: StreamMuxer, -{ +pub enum CollectionEvent { /// A connection to a node has succeeded. NodeReached { /// Identifier of the node. @@ -125,8 +111,6 @@ where NodeReplaced { /// Identifier of the node. peer_id: PeerId, - /// Outbound substream attempts that have been closed in the process. - closed_outbound_substreams: Vec, /// Identifier of the reach attempt that succeeded. id: ReachAttemptId, }, @@ -146,8 +130,6 @@ where peer_id: PeerId, /// The error that happened. error: IoError, - /// Pending outbound substreams that were cancelled. - closed_outbound_substreams: Vec, }, /// An error happened on the future that was trying to reach a node. @@ -158,46 +140,12 @@ where error: IoError, }, - /// The multiaddress of the node has been resolved. - NodeMultiaddr { + /// A node has produced an event. + NodeEvent { /// Identifier of the node. peer_id: PeerId, - /// Address that has been resolved, or error that occured on the substream. - address: Result, - }, - - /// A new inbound substream arrived. - InboundSubstream { - /// Identifier of the node. - peer_id: PeerId, - /// The newly-opened substream. - substream: Substream, - }, - - /// An outbound substream has successfully been opened. - OutboundSubstream { - /// Identifier of the node. - peer_id: PeerId, - /// Identifier of the substream. Same as what was returned by `open_substream`. - user_data: TUserData, - /// The newly-opened substream. - substream: Substream, - }, - - /// The inbound side of a muxer has been gracefully closed. No more inbound substreams will - /// be produced. - InboundClosed { - /// Identifier of the node. - peer_id: PeerId, - }, - - /// An outbound substream couldn't be opened because the muxer is no longer capable of opening - /// more substreams. - OutboundClosed { - /// Identifier of the node. - peer_id: PeerId, - /// Identifier of the substream. Same as what was returned by `open_substream`. - user_data: TUserData, + /// The produced event. + event: TOutEvent, }, } @@ -205,10 +153,7 @@ where #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ReachAttemptId(usize); -impl CollectionStream -where - TMuxer: StreamMuxer, -{ +impl CollectionStream { /// Creates a new empty collection. #[inline] pub fn new() -> Self { @@ -222,8 +167,6 @@ where to_notify: None, events_tx, events_rx, - outbound_attempts: Default::default(), - next_outbound_attempt: 0, } } @@ -231,14 +174,17 @@ where /// /// This method spawns a task dedicated to resolving this future and processing the node's /// events. - pub fn add_reach_attempt(&mut self, future: TFut) -> ReachAttemptId + pub fn add_reach_attempt(&mut self, future: TFut, handler: THandler) + -> ReachAttemptId where TFut: Future + Send + 'static, - TMuxer: Send + Sync + 'static, - TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, TAddrFut: Future + Send + 'static, - TUserData: Send + 'static, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? + TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required + TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required { let reach_attempt_id = self.next_task_id; self.next_task_id.0 += 1; @@ -255,6 +201,7 @@ where inner: NodeTaskInner::Future { future, interrupt: interrupt_rx, + handler: Some(handler), }, events_tx: self.events_tx.clone(), id: reach_attempt_id, @@ -294,20 +241,24 @@ where Ok(()) } + /// Sends an event to all nodes. + pub fn broadcast_event(&mut self, event: &TInEvent) + where TInEvent: Clone, + { + for &(_, ref sender) in self.nodes.values() { + let _ = sender.unbounded_send(event.clone()); // TODO: unwrap + } + } + /// Grants access to an object that allows controlling a node of the collection. /// /// Returns `None` if we don't have a connection to this peer. #[inline] - pub fn peer_mut(&mut self, id: &PeerId) -> Option> - where - TUserData: Send + 'static, - { + pub fn peer_mut(&mut self, id: &PeerId) -> Option> { match self.nodes.entry(id.clone()) { Entry::Occupied(inner) => Some(PeerMut { inner, tasks: &mut self.tasks, - next_outbound_attempt: &mut self.next_outbound_attempt, - outbound_attempts: &mut self.outbound_attempts, }), Entry::Vacant(_) => None, } @@ -331,43 +282,25 @@ where } /// Access to a peer in the collection. -pub struct PeerMut<'a, TUserData> -where - TUserData: Send + 'static, -{ - next_outbound_attempt: &'a mut usize, - outbound_attempts: &'a mut FnvHashMap, - inner: OccupiedEntry<'a, PeerId, (ReachAttemptId, mpsc::UnboundedSender)>, +pub struct PeerMut<'a, TInEvent: 'a> { + inner: OccupiedEntry<'a, PeerId, (ReachAttemptId, mpsc::UnboundedSender)>, tasks: &'a mut FnvHashMap, } -impl<'a, TUserData> PeerMut<'a, TUserData> -where - TUserData: Send + 'static, -{ - /// Starts the process of opening a new outbound substream towards the peer. - pub fn open_substream(&mut self, user_data: TUserData) { - let id = *self.next_outbound_attempt; - *self.next_outbound_attempt += 1; - - self.outbound_attempts - .insert(id, (self.inner.key().clone(), user_data)); - - let _ = self - .inner - .get_mut() - .1 - .unbounded_send(ExtToInMessage::OpenSubstream(id)); +impl<'a, TInEvent> PeerMut<'a, TInEvent> { + /// Sends an event to the given node. + #[inline] + pub fn send_event(&mut self, event: TInEvent) { + // It is possible that the sender is closed if the task has already finished but we + // haven't been polled in the meanwhile. + let _ = self.inner.get_mut().1.unbounded_send(event); } /// Closes the connections to this node. /// - /// This cancels all the attempted outgoing substream attempts, and returns them. - /// - /// No event will be generated for this node. - pub fn close(self) -> Vec { + /// No further event will be generated for this node. + pub fn close(self) { let (peer_id, (task_id, _)) = self.inner.remove_entry(); - let user_datas = extract_from_attempt(self.outbound_attempts, &peer_id); // Set the task to `Interrupted` so that we ignore further messages from this closed node. match self.tasks.insert(task_id, TaskKnownState::Interrupted) { Some(TaskKnownState::Connected(ref p)) if p == &peer_id => (), @@ -382,36 +315,11 @@ where only when we remove from self.nodes at the same time.") }, } - user_datas } } -/// Extract from the hashmap the entries matching `node`. -fn extract_from_attempt( - outbound_attempts: &mut FnvHashMap, - node: &PeerId, -) -> Vec { - let to_remove: Vec = outbound_attempts - .iter() - .filter(|(_, &(ref key, _))| key == node) - .map(|(&k, _)| k) - .collect(); - - let mut user_datas = Vec::with_capacity(to_remove.len()); - for to_remove in to_remove { - let (_, user_data) = outbound_attempts.remove(&to_remove) - .expect("The elements in to_remove were found by iterating once over the hashmap and \ - are therefore known to be valid and unique"); - user_datas.push(user_data); - } - user_datas -} - -impl Stream for CollectionStream -where - TMuxer: StreamMuxer, -{ - type Item = CollectionEvent; +impl Stream for CollectionStream { + type Item = CollectionEvent; type Error = Void; // TODO: use ! once stable fn poll(&mut self) -> Poll, Self::Error> { @@ -433,67 +341,10 @@ where } }; - match event { - NodeEvent::Multiaddr(address) => { - Ok(Async::Ready(Some(CollectionEvent::NodeMultiaddr { - peer_id, - address, - }))) - } - NodeEvent::InboundSubstream { substream } => { - Ok(Async::Ready(Some(CollectionEvent::InboundSubstream { - peer_id, - substream, - }))) - } - NodeEvent::OutboundSubstream { - user_data, - substream, - } => { - let (_peer_id, actual_data) = self - .outbound_attempts - .remove(&user_data) - .expect("We insert a unique usize in outbound_attempts at the \ - same time as we ask the node to open a substream with \ - this usize. The API of the node is guaranteed to produce \ - the value we passed when the substream is actually \ - opened. The only other places where we remove from \ - outbound_attempts are if the outbound failed, or if the - node's task errored or was closed. If the node's task - is closed by us, we set its state to `Interrupted` so - that event that it produces are not processed."); - debug_assert_eq!(_peer_id, peer_id); - Ok(Async::Ready(Some(CollectionEvent::OutboundSubstream { - peer_id, - user_data: actual_data, - substream, - }))) - } - NodeEvent::InboundClosed => { - Ok(Async::Ready(Some(CollectionEvent::InboundClosed { - peer_id, - }))) - } - NodeEvent::OutboundClosed { user_data } => { - let (_peer_id, actual_data) = self - .outbound_attempts - .remove(&user_data) - .expect("We insert a unique usize in outbound_attempts at the \ - same time as we ask the node to open a substream with \ - this usize. The API of the node is guaranteed to produce \ - the value we passed when the substream is actually \ - opened. The only other places where we remove from \ - outbound_attempts are if the outbound succeeds, or if the - node's task errored or was closed. If the node's task - is closed by us, we set its state to `Interrupted` so - that event that it produces are not processed."); - debug_assert_eq!(_peer_id, peer_id); - Ok(Async::Ready(Some(CollectionEvent::OutboundClosed { - peer_id, - user_data: actual_data, - }))) - } - } + Ok(Async::Ready(Some(CollectionEvent::NodeEvent { + peer_id, + event, + }))) } Ok(Async::Ready(Some((InToExtMessage::NodeReached(peer_id, sender), task_id)))) => { { @@ -523,13 +374,11 @@ where } let replaced_node = self.nodes.insert(peer_id.clone(), (task_id, sender)); - let user_datas = extract_from_attempt(&mut self.outbound_attempts, &peer_id); if let Some(replaced_node) = replaced_node { let old = self.tasks.insert(replaced_node.0, TaskKnownState::Interrupted); debug_assert_eq!(old.map(|s| s.is_pending()), Some(false)); Ok(Async::Ready(Some(CollectionEvent::NodeReplaced { peer_id, - closed_outbound_substreams: user_datas, id: task_id, }))) } else { @@ -553,9 +402,6 @@ where let val = self.nodes.remove(&peer_id); debug_assert!(val.is_some()); - debug_assert!( - extract_from_attempt(&mut self.outbound_attempts, &peer_id).is_empty() - ); Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id }))) } Ok(Async::Ready(Some((InToExtMessage::NodeError(err), task_id)))) => { @@ -572,11 +418,9 @@ where let val = self.nodes.remove(&peer_id); debug_assert!(val.is_some()); - let user_datas = extract_from_attempt(&mut self.outbound_attempts, &peer_id); Ok(Async::Ready(Some(CollectionEvent::NodeError { peer_id, error: err, - closed_outbound_substreams: user_datas, }))) } Ok(Async::Ready(Some((InToExtMessage::ReachError(err), task_id)))) => { @@ -610,45 +454,37 @@ where } } -/// Message to transmit from the public API to a task. -#[derive(Debug, Clone)] -enum ExtToInMessage { - /// A new substream shall be opened. - OpenSubstream(usize), -} - /// Message to transmit from a task to the public API. -enum InToExtMessage -where - TMuxer: StreamMuxer, -{ +enum InToExtMessage { /// A connection to a node has succeeded. /// Closing the returned sender will end the task. - NodeReached(PeerId, mpsc::UnboundedSender), + NodeReached(PeerId, mpsc::UnboundedSender), NodeClosed, NodeError(IoError), ReachError(IoError), /// An event from the node. - NodeEvent(NodeEvent), + NodeEvent(TOutEvent), } /// Implementation of `Future` that handles a single node, and all the communications between /// the various components of the `CollectionStream`. -struct NodeTask +struct NodeTask where TMuxer: StreamMuxer, + THandler: NodeHandler>, { /// Sender to transmit events to the outside. - events_tx: mpsc::UnboundedSender<(InToExtMessage, ReachAttemptId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage, ReachAttemptId)>, /// Inner state of the `NodeTask`. - inner: NodeTaskInner, + inner: NodeTaskInner, /// Identifier of the attempt. id: ReachAttemptId, } -enum NodeTaskInner +enum NodeTaskInner where TMuxer: StreamMuxer, + THandler: NodeHandler>, { /// Future to resolve to connect to the node. Future { @@ -656,23 +492,26 @@ where future: TFut, /// Allows interrupting the attempt. interrupt: oneshot::Receiver<()>, + /// The handler that will be used to build the `HandledNode`. + handler: Option, }, /// Fully functional node. Node { /// The object that is actually processing things. - /// This is an `Option` because we need to be able to extract it. - node: NodeStream, - /// Receiving end for events sent from the main `CollectionStream`. - in_events_rx: mpsc::UnboundedReceiver, + node: HandledNode, + /// Receiving end for events sent from the main `CollectionStream`. `None` if closed. + in_events_rx: Option>, }, } -impl Future for NodeTask +impl Future for + NodeTask where TMuxer: StreamMuxer, TFut: Future, TAddrFut: Future, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, { type Item = (); type Error = (); @@ -685,6 +524,7 @@ where let new_state = if let NodeTaskInner::Future { ref mut future, ref mut interrupt, + ref mut handler, } = self.inner { match interrupt.poll() { @@ -698,9 +538,12 @@ where let event = InToExtMessage::NodeReached(peer_id, sender); let _ = self.events_tx.unbounded_send((event, self.id)); + let handler = handler.take() + .expect("The handler is only extracted right before we switch state"); + Some(NodeTaskInner::Node { - node: NodeStream::new(muxer, addr_fut), - in_events_rx: rx, + node: HandledNode::new(muxer, addr_fut, handler), + in_events_rx: Some(rx), }) } Ok(Async::NotReady) => { @@ -728,25 +571,21 @@ where } = self.inner { // Start by handling commands received from the outside of the task. - loop { - match in_events_rx.poll() { - Ok(Async::Ready(Some(ExtToInMessage::OpenSubstream(user_data)))) => match node - .open_substream(user_data) - { - Ok(()) => (), - Err(user_data) => { - let event = - InToExtMessage::NodeEvent(NodeEvent::OutboundClosed { user_data }); - let _ = self.events_tx.unbounded_send((event, self.id)); + if let Some(mut local_rx) = in_events_rx.take() { + *in_events_rx = loop { + match local_rx.poll() { + Ok(Async::Ready(Some(event))) => { + node.inject_event(event); + }, + Ok(Async::Ready(None)) => { + // Node closed by the external API ; start shutdown process. + node.shutdown(); + break None; } - }, - Ok(Async::Ready(None)) => { - // Node closed by the external API ; end the task - return Ok(Async::Ready(())); + Ok(Async::NotReady) => break Some(local_rx), + Err(()) => unreachable!("An unbounded receiver never errors"), } - Ok(Async::NotReady) => break, - Err(()) => unreachable!("An unbounded receiver never errors"), - } + }; } // Process the node. diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs new file mode 100644 index 00000000..ea1d675b --- /dev/null +++ b/core/src/nodes/handled_node.rs @@ -0,0 +1,261 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use muxing::StreamMuxer; +use nodes::node::{NodeEvent, NodeStream, Substream}; +use futures::prelude::*; +use std::io::Error as IoError; +use Multiaddr; + +/// Handler for the substreams of a node. +/// +/// > Note: When implementing the various methods, don't forget that you have to register the +/// > task that was the latest to poll and notify it. +pub trait NodeHandler { + /// Custom event that can be received from the outside. + type InEvent; + /// Custom event that can be produced by the handler and that will be returned by the swarm. + type OutEvent; + /// Information about a substream. Can be sent to the handler through a `NodeHandlerEndpoint`, + /// and will be passed back in `inject_substream` or `inject_outbound_closed`. + type OutboundOpenInfo; + + /// Sends a new substream to the handler. + /// + /// The handler is responsible for upgrading the substream to whatever protocol it wants. + fn inject_substream(&mut self, substream: TSubstream, endpoint: NodeHandlerEndpoint); + + /// Indicates the handler that the inbound part of the muxer has been closed, and that + /// therefore no more inbound substream will be produced. + fn inject_inbound_closed(&mut self); + + /// Indicates the handler that an outbound substream failed to open because the outbound + /// part of the muxer has been closed. + fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo); + + /// Indicates the handler that the multiaddr future has resolved. + fn inject_multiaddr(&mut self, multiaddr: Result); + + /// Injects an event coming from the outside in the handler. + fn inject_event(&mut self, event: Self::InEvent); + + /// Indicates the node that it should shut down. After that, it is expected that `poll()` + /// returns `Ready(None)` as soon as possible. + /// + /// This method allows an implementation to perform a graceful shutdown of the substreams, and + /// send back various events. + fn shutdown(&mut self); + + /// Should behave like `Stream::poll()`. Should close if no more event can be produced and the + /// node should be closed. + fn poll(&mut self) -> Poll>, IoError>; +} + +/// Endpoint for a received substream. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum NodeHandlerEndpoint { + Dialer(TOutboundOpenInfo), + Listener, +} + +/// Event produces by a handler. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum NodeHandlerEvent { + /// Require a new outbound substream to be opened with the remote. + OutboundSubstreamRequest(TOutboundOpenInfo), + + /// Other event. + Custom(TCustom), +} + +/// Event produces by a handler. +impl NodeHandlerEvent { + /// If this is `OutboundSubstreamRequest`, maps the content to something else. + #[inline] + pub fn map_outbound_open_info(self, map: F) -> NodeHandlerEvent + where F: FnOnce(TOutboundOpenInfo) -> I + { + match self { + NodeHandlerEvent::OutboundSubstreamRequest(val) => { + NodeHandlerEvent::OutboundSubstreamRequest(map(val)) + }, + NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val), + } + } + + /// If this is `Custom`, maps the content to something else. + #[inline] + pub fn map_custom(self, map: F) -> NodeHandlerEvent + where F: FnOnce(TCustom) -> I + { + match self { + NodeHandlerEvent::OutboundSubstreamRequest(val) => { + NodeHandlerEvent::OutboundSubstreamRequest(val) + }, + NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)), + } + } +} + +/// A node combined with an implementation of `NodeHandler`. +// TODO: impl Debug +pub struct HandledNode +where + TMuxer: StreamMuxer, + THandler: NodeHandler>, +{ + /// Node that handles the muxing. Can be `None` if the handled node is shutting down. + node: Option>, + /// Handler that processes substreams. + handler: THandler, +} + +impl HandledNode +where + TMuxer: StreamMuxer, + THandler: NodeHandler>, + TAddrFut: Future, +{ + /// Builds a new `HandledNode`. + #[inline] + pub fn new(muxer: TMuxer, multiaddr_future: TAddrFut, handler: THandler) -> Self { + HandledNode { + node: Some(NodeStream::new(muxer, multiaddr_future)), + handler, + } + } + + /// Injects an event to the handler. + #[inline] + pub fn inject_event(&mut self, event: THandler::InEvent) { + self.handler.inject_event(event); + } + + /// Returns true if the inbound channel of the muxer is closed. + /// + /// If `true` is returned, then no more inbound substream will be received. + #[inline] + pub fn is_inbound_closed(&self) -> bool { + self.node.as_ref().map(|n| n.is_inbound_closed()).unwrap_or(true) + } + + /// Returns true if the outbound channel of the muxer is closed. + /// + /// If `true` is returned, then no more outbound substream will be opened. + #[inline] + pub fn is_outbound_closed(&self) -> bool { + self.node.as_ref().map(|n| n.is_outbound_closed()).unwrap_or(true) + } + + /// Returns true if the handled node is in the process of shutting down. + #[inline] + pub fn is_shutting_down(&self) -> bool { + self.node.is_none() + } + + /// Indicates the handled node that it should shut down. After calling this method, the + /// `Stream` will end in the not-so-distant future. + /// + /// After this method returns, `is_shutting_down()` should return true. + pub fn shutdown(&mut self) { + if let Some(node) = self.node.take() { + for user_data in node.close() { + self.handler.inject_outbound_closed(user_data); + } + } + + self.handler.shutdown(); + } +} + +impl Stream for HandledNode +where + TMuxer: StreamMuxer, + THandler: NodeHandler>, + TAddrFut: Future, +{ + type Item = THandler::OutEvent; + type Error = IoError; + + fn poll(&mut self) -> Poll, Self::Error> { + // We extract the value from `self.node` and put it back in place if `NotReady`. + if let Some(mut node) = self.node.take() { + loop { + match node.poll() { + Ok(Async::NotReady) => { + self.node = Some(node); + break; + }, + Ok(Async::Ready(Some(NodeEvent::InboundSubstream { substream }))) => { + self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener); + }, + Ok(Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream }))) => { + let endpoint = NodeHandlerEndpoint::Dialer(user_data); + self.handler.inject_substream(substream, endpoint); + }, + Ok(Async::Ready(None)) => { + // Breaking from the loop without putting back the node. + break; + }, + Ok(Async::Ready(Some(NodeEvent::Multiaddr(result)))) => { + self.handler.inject_multiaddr(result); + }, + Ok(Async::Ready(Some(NodeEvent::OutboundClosed { user_data }))) => { + self.handler.inject_outbound_closed(user_data); + }, + Ok(Async::Ready(Some(NodeEvent::InboundClosed))) => { + self.handler.inject_inbound_closed(); + }, + Err(err) => { + // Breaking from the loop without putting back the node. + return Err(err); + }, + } + } + } + + loop { + match self.handler.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(user_data)))) => { + if let Some(node) = self.node.as_mut() { + match node.open_substream(user_data) { + Ok(()) => (), + Err(user_data) => self.handler.inject_outbound_closed(user_data), + } + } else { + self.handler.inject_outbound_closed(user_data); + } + }, + Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event)))) => { + return Ok(Async::Ready(Some(event))); + }, + Ok(Async::Ready(None)) => { + return Ok(Async::Ready(None)); + }, + Err(err) => { + return Err(err); + }, + } + } + + Ok(Async::NotReady) + } +} diff --git a/core/src/nodes/mod.rs b/core/src/nodes/mod.rs index ba6c3568..fd0d8015 100644 --- a/core/src/nodes/mod.rs +++ b/core/src/nodes/mod.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. pub mod collection; +pub mod handled_node; pub mod listeners; pub mod node; pub mod swarm; diff --git a/core/src/nodes/swarm.rs b/core/src/nodes/swarm.rs index 3ed76926..bf9f49cd 100644 --- a/core/src/nodes/swarm.rs +++ b/core/src/nodes/swarm.rs @@ -20,10 +20,11 @@ use fnv::FnvHashMap; use futures::{prelude::*, future}; -use muxing; +use muxing::StreamMuxer; use nodes::collection::{ CollectionEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId, }; +use nodes::handled_node::NodeHandler; use nodes::listeners::{ListenersEvent, ListenersStream}; use nodes::node::Substream; use std::collections::hash_map::{Entry, OccupiedEntry}; @@ -32,16 +33,15 @@ use void::Void; use {Endpoint, Multiaddr, PeerId, Transport}; /// Implementation of `Stream` that handles the nodes. -pub struct Swarm +pub struct Swarm where TTrans: Transport, - TMuxer: muxing::StreamMuxer, { /// Listeners for incoming connections. listeners: ListenersStream, /// The nodes currently active. - active_nodes: CollectionStream, + active_nodes: CollectionStream, /// Attempts to reach a peer. out_reach_attempts: FnvHashMap, @@ -52,6 +52,9 @@ where /// For each peer ID we're connected to, contains the multiaddress we're connected to. connected_multiaddresses: FnvHashMap, + + /// Object that builds new handlers. + handler_build: THandlerBuild, } /// Attempt to reach a peer. @@ -66,10 +69,9 @@ struct OutReachAttempt { } /// Event that can happen on the `Swarm`. -pub enum SwarmEvent +pub enum SwarmEvent where TTrans: Transport, - TMuxer: muxing::StreamMuxer, { /// One of the listeners gracefully closed. ListenerClosed { @@ -108,8 +110,6 @@ where Replaced { /// Id of the peer. peer_id: PeerId, - /// Outbound substream attempts that have been closed in the process. - closed_outbound_substreams: Vec, /// Multiaddr we were connected to, or `None` if it was unknown. closed_multiaddr: Option, /// If `Listener`, then we received the connection. If `Dial`, then it's a connection that @@ -136,8 +136,6 @@ where address: Option, /// The error that happened. error: IoError, - /// Pending outbound substreams that were cancelled. - closed_outbound_substreams: Vec, }, /// Failed to reach a peer that we were trying to dial. @@ -183,46 +181,12 @@ where remain_addrs_attempt: usize, }, - /// A new inbound substream arrived. - InboundSubstream { - /// Id of the peer we received a substream from. + /// A node produced a custom event. + NodeEvent { + /// Id of the node that produced the event. peer_id: PeerId, - /// The newly-opened substream. - substream: Substream, - }, - - /// An outbound substream has successfully been opened. - OutboundSubstream { - /// Id of the peer we received a substream from. - peer_id: PeerId, - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - /// The newly-opened substream. - substream: Substream, - }, - - /// The inbound side of a muxer has been gracefully closed. No more inbound substreams will - /// be produced. - InboundClosed { - /// Id of the peer. - peer_id: PeerId, - }, - - /// An outbound substream couldn't be opened because the muxer is no longer capable of opening - /// more substreams. - OutboundClosed { - /// Id of the peer we were trying to open a substream with. - peer_id: PeerId, - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - }, - - /// The multiaddress of the node has been resolved. - NodeMultiaddr { - /// Identifier of the node. - peer_id: PeerId, - /// Address that has been resolved. - address: Result, + /// Event that was produced by the node. + event: TOutEvent, }, } @@ -271,14 +235,38 @@ impl ConnectedPoint { } } -impl Swarm +/// Trait for structures that can create new factories. +pub trait HandlerFactory { + /// The generated handler. + type Handler; + + /// Creates a new handler. + fn new_handler(&self) -> Self::Handler; +} + +impl HandlerFactory for T where T: Fn() -> THandler { + type Handler = THandler; + + #[inline] + fn new_handler(&self) -> THandler { + (*self)() + } +} + +impl + Swarm where - TTrans: Transport + Clone, - TMuxer: muxing::StreamMuxer, + TTrans: Transport + Clone, + TMuxer: StreamMuxer, + THandlerBuild: HandlerFactory, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, + THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary { /// Creates a new node events stream. #[inline] - pub fn new(transport: TTrans) -> Self { + pub fn new(transport: TTrans) -> Swarm THandler> + where THandler: Default, + { // TODO: with_capacity? Swarm { listeners: ListenersStream::new(transport), @@ -286,6 +274,21 @@ where out_reach_attempts: Default::default(), other_reach_attempts: Vec::new(), connected_multiaddresses: Default::default(), + handler_build: Default::default, + } + } + + /// Same as `new`, but lets you specify a way to build a node handler. + #[inline] + pub fn with_handler_builder(transport: TTrans, handler_build: THandlerBuild) -> Self { + // TODO: with_capacity? + Swarm { + listeners: ListenersStream::new(transport), + active_nodes: CollectionStream::new(), + out_reach_attempts: Default::default(), + other_reach_attempts: Vec::new(), + connected_multiaddresses: Default::default(), + handler_build, } } @@ -318,7 +321,10 @@ where pub fn nat_traversal<'a>( &'a self, observed_addr: &'a Multiaddr, - ) -> impl Iterator + 'a { + ) -> impl Iterator + 'a + where TMuxer: 'a, + THandler: 'a, + { self.listeners() .flat_map(move |server| self.transport().nat_traversal(server, observed_addr)) } @@ -329,17 +335,18 @@ where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - TUserData: Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { let future = match self.transport().clone().dial(addr.clone()) { Ok(fut) => fut, Err((_, addr)) => return Err(addr), }; - let reach_id = self.active_nodes.add_reach_attempt(future); + let reach_id = self.active_nodes.add_reach_attempt(future, self.handler_build.new_handler()); self.other_reach_attempts .push((reach_id, ConnectedPoint::Dialer { address: addr })); Ok(()) @@ -360,12 +367,17 @@ where .count() } + /// Sends an event to all nodes. + #[inline] + pub fn broadcast_event(&mut self, event: &TInEvent) + where TInEvent: Clone, + { + self.active_nodes.broadcast_event(event) + } + /// Grants access to a struct that represents a peer. #[inline] - pub fn peer(&mut self, peer_id: PeerId) -> Peer - where - TUserData: Send + 'static, - { + pub fn peer(&mut self, peer_id: PeerId) -> Peer { // TODO: we do `peer_mut(...).is_some()` followed with `peer_mut(...).unwrap()`, otherwise // the borrow checker yells at us. @@ -409,16 +421,17 @@ where &mut self, peer_id: PeerId, reach_id: ReachAttemptId, - closed_outbound_substreams: Option>, - ) -> SwarmEvent + replaced: bool, + ) -> SwarmEvent where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - TUserData: Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { // We first start looking in the incoming attempts. While this makes the code less optimal, // it also makes the logic easier. @@ -442,12 +455,11 @@ where out_reach_attempts should always be in sync with the actual attempts"); } - if let Some(closed_outbound_substreams) = closed_outbound_substreams { + if replaced { return SwarmEvent::Replaced { peer_id, endpoint, closed_multiaddr, - closed_outbound_substreams, }; } else { return SwarmEvent::Connected { peer_id, endpoint }; @@ -474,12 +486,11 @@ where address: attempt.cur_attempted, }; - if let Some(closed_outbound_substreams) = closed_outbound_substreams { + if replaced { return SwarmEvent::Replaced { peer_id, endpoint, closed_multiaddr, - closed_outbound_substreams, }; } else { return SwarmEvent::Connected { peer_id, endpoint }; @@ -500,23 +511,22 @@ where let num_remain = attempt.next_attempts.len(); let failed_addr = attempt.cur_attempted.clone(); - let opened_attempts = self.active_nodes.peer_mut(&peer_id) + self.active_nodes.peer_mut(&peer_id) .expect("When we receive a NodeReached or NodeReplaced event from active_nodes, \ it is guaranteed that the PeerId is valid and therefore that \ active_nodes.peer_mut succeeds with this ID. handle_node_reached is \ called only to handle these events.") .close(); - debug_assert!(opened_attempts.is_empty()); if !attempt.next_attempts.is_empty() { let mut attempt = attempt; attempt.cur_attempted = attempt.next_attempts.remove(0); attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) { - Ok(fut) => self.active_nodes.add_reach_attempt(fut), + Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()), Err((_, addr)) => { let msg = format!("unsupported multiaddr {}", addr); let fut = future::err(IoError::new(IoErrorKind::Other, msg)); - self.active_nodes.add_reach_attempt::<_, future::FutureResult>(fut) + self.active_nodes.add_reach_attempt::<_, _, future::FutureResult, _>(fut, self.handler_build.new_handler()) }, }; @@ -550,15 +560,16 @@ where &mut self, reach_id: ReachAttemptId, error: IoError, - ) -> Option> + ) -> Option> where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - TUserData: Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { // Search for the attempt in `out_reach_attempts`. // TODO: could be more optimal than iterating over everything @@ -578,11 +589,11 @@ where let mut attempt = attempt; attempt.cur_attempted = attempt.next_attempts.remove(0); attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) { - Ok(fut) => self.active_nodes.add_reach_attempt(fut), + Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()), Err((_, addr)) => { let msg = format!("unsupported multiaddr {}", addr); let fut = future::err(IoError::new(IoErrorKind::Other, msg)); - self.active_nodes.add_reach_attempt::<_, future::FutureResult>(fut) + self.active_nodes.add_reach_attempt::<_, _, future::FutureResult, _>(fut, self.handler_build.new_handler()) }, }; @@ -628,35 +639,36 @@ where } /// State of a peer in the system. -pub enum Peer<'a, TTrans, TMuxer, TUserData> +pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandlerBuild: 'a> where - TTrans: Transport + 'a, - TMuxer: muxing::StreamMuxer + 'a, - TUserData: Send + 'static, + TTrans: Transport, { /// We are connected to this peer. - Connected(PeerConnected<'a, TUserData>), + Connected(PeerConnected<'a, TInEvent>), /// We are currently attempting to connect to this peer. - PendingConnect(PeerPendingConnect<'a, TMuxer, TUserData>), + PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent>), /// We are not connected to this peer at all. /// /// > **Note**: It is however possible that a pending incoming connection is being negotiated /// > and will connect to this peer, but we don't know it yet. - NotConnected(PeerNotConnected<'a, TTrans, TMuxer, TUserData>), + NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandlerBuild>), } // TODO: add other similar methods that wrap to the ones of `PeerNotConnected` -impl<'a, TTrans, TMuxer, TUserData> Peer<'a, TTrans, TMuxer, TUserData> +impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerBuild> + Peer<'a, TTrans, TInEvent, TOutEvent, THandlerBuild> where - TTrans: Transport, - TMuxer: muxing::StreamMuxer, - TUserData: Send + 'static, + TTrans: Transport, + TMuxer: StreamMuxer, + THandlerBuild: HandlerFactory, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, + THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary { /// If we are connected, returns the `PeerConnected`. #[inline] - pub fn as_connected(self) -> Option> { + pub fn as_connected(self) -> Option> { match self { Peer::Connected(peer) => Some(peer), _ => None, @@ -665,7 +677,7 @@ where /// If a connection is pending, returns the `PeerPendingConnect`. #[inline] - pub fn as_pending_connect(self) -> Option> { + pub fn as_pending_connect(self) -> Option> { match self { Peer::PendingConnect(peer) => Some(peer), _ => None, @@ -674,7 +686,7 @@ where /// If we are not connected, returns the `PeerNotConnected`. #[inline] - pub fn as_not_connected(self) -> Option> { + pub fn as_not_connected(self) -> Option> { match self { Peer::NotConnected(peer) => Some(peer), _ => None, @@ -686,14 +698,16 @@ where pub fn or_connect( self, addr: Multiaddr, - ) -> Result, Self> + ) -> Result, Self> where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { self.or_connect_with(move |_| addr) } @@ -704,15 +718,17 @@ where pub fn or_connect_with( self, addr: TFn, - ) -> Result, Self> + ) -> Result, Self> where TFn: FnOnce(&PeerId) -> Multiaddr, TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { match self { Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)), @@ -729,42 +745,31 @@ where } /// Peer we are potentially going to connect to. -pub enum PeerPotentialConnect<'a, TMuxer, TUserData> -where - TUserData: Send + 'static, - TMuxer: muxing::StreamMuxer + 'a, -{ +pub enum PeerPotentialConnect<'a, TInEvent: 'a, TOutEvent: 'a> { /// We are connected to this peer. - Connected(PeerConnected<'a, TUserData>), + Connected(PeerConnected<'a, TInEvent>), /// We are currently attempting to connect to this peer. - PendingConnect(PeerPendingConnect<'a, TMuxer, TUserData>), + PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent>), } -impl<'a, TMuxer, TUserData> PeerPotentialConnect<'a, TMuxer, TUserData> -where - TUserData: Send + 'static, - TMuxer: muxing::StreamMuxer, -{ +impl<'a, TInEvent, TOutEvent> PeerPotentialConnect<'a, TInEvent, TOutEvent> { /// Closes the connection or the connection attempt. /// /// If the connection was active, returns the list of outbound substream openings that were /// closed in the process. // TODO: consider returning a `PeerNotConnected` #[inline] - pub fn close(self) -> Vec { + pub fn close(self) { match self { PeerPotentialConnect::Connected(peer) => peer.close(), - PeerPotentialConnect::PendingConnect(peer) => { - peer.interrupt(); - Vec::new() - } + PeerPotentialConnect::PendingConnect(peer) => peer.interrupt(), } } /// If we are connected, returns the `PeerConnected`. #[inline] - pub fn as_connected(self) -> Option> { + pub fn as_connected(self) -> Option> { match self { PeerPotentialConnect::Connected(peer) => Some(peer), _ => None, @@ -773,7 +778,7 @@ where /// If a connection is pending, returns the `PeerPendingConnect`. #[inline] - pub fn as_pending_connect(self) -> Option> { + pub fn as_pending_connect(self) -> Option> { match self { PeerPotentialConnect::PendingConnect(peer) => Some(peer), _ => None, @@ -782,27 +787,20 @@ where } /// Access to a peer we are connected to. -pub struct PeerConnected<'a, TUserData> -where - TUserData: Send + 'static, -{ - peer: CollecPeerMut<'a, TUserData>, +pub struct PeerConnected<'a, TInEvent: 'a> { + peer: CollecPeerMut<'a, TInEvent>, /// Reference to the `connected_multiaddresses` field of the parent. connected_multiaddresses: &'a mut FnvHashMap, peer_id: PeerId, } -impl<'a, TUserData> PeerConnected<'a, TUserData> -where - TUserData: Send + 'static, -{ +impl<'a, TInEvent> PeerConnected<'a, TInEvent> { /// Closes the connection to this node. /// - /// This interrupts all the current substream opening attempts and returns them. /// No `NodeClosed` message will be generated for this node. // TODO: consider returning a `PeerNotConnected` ; however this makes all the borrows things // much more annoying to deal with - pub fn close(self) -> Vec { + pub fn close(self) { self.connected_multiaddresses.remove(&self.peer_id); self.peer.close() } @@ -813,28 +811,20 @@ where self.connected_multiaddresses.get(&self.peer_id) } - /// Starts the process of opening a new outbound substream towards the peer. + /// Sends an event to the node. #[inline] - pub fn open_substream(&mut self, user_data: TUserData) { - self.peer.open_substream(user_data) + pub fn send_event(&mut self, event: TInEvent) { + self.peer.send_event(event) } } /// Access to a peer we are attempting to connect to. -pub struct PeerPendingConnect<'a, TMuxer, TUserData> -where - TUserData: Send + 'static, - TMuxer: muxing::StreamMuxer + 'a, -{ +pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a> { attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>, - active_nodes: &'a mut CollectionStream, + active_nodes: &'a mut CollectionStream, } -impl<'a, TMuxer, TUserData> PeerPendingConnect<'a, TMuxer, TUserData> -where - TUserData: Send + 'static, - TMuxer: muxing::StreamMuxer, -{ +impl<'a, TInEvent, TOutEvent> PeerPendingConnect<'a, TInEvent, TOutEvent> { /// Interrupt this connection attempt. // TODO: consider returning a PeerNotConnected ; however that is really pain in terms of // borrows @@ -875,33 +865,35 @@ where } /// Access to a peer we're not connected to. -pub struct PeerNotConnected<'a, TTrans, TMuxer, TUserData> -where - TTrans: Transport + 'a, - TMuxer: muxing::StreamMuxer + 'a, - TUserData: Send + 'a, -{ - peer_id: PeerId, - nodes: &'a mut Swarm, -} - -impl<'a, TTrans, TMuxer, TUserData> PeerNotConnected<'a, TTrans, TMuxer, TUserData> +pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandlerBuild: 'a> where TTrans: Transport, - TMuxer: muxing::StreamMuxer, - TUserData: Send, +{ + peer_id: PeerId, + nodes: &'a mut Swarm, +} + +impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerBuild> + PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandlerBuild> +where + TTrans: Transport, + TMuxer: StreamMuxer, + THandlerBuild: HandlerFactory, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, + THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary { /// Attempts a new connection to this node using the given multiaddress. #[inline] - pub fn connect(self, addr: Multiaddr) -> Result, Self> + pub fn connect(self, addr: Multiaddr) -> Result, Self> where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - TUserData: 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { self.connect_inner(addr, Vec::new()) } @@ -915,16 +907,17 @@ where pub fn connect_iter( self, addrs: TIter, - ) -> Result, Self> + ) -> Result, Self> where TIter: IntoIterator, TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - TUserData: 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { let mut addrs = addrs.into_iter(); let first = addrs.next().unwrap(); // TODO: bad @@ -937,22 +930,23 @@ where self, first: Multiaddr, rest: Vec, - ) -> Result, Self> + ) -> Result, Self> where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Send + 'static, - TMuxer: Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - TUserData: 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, { let future = match self.nodes.transport().clone().dial(first.clone()) { Ok(fut) => fut, Err(_) => return Err(self), }; - let reach_id = self.nodes.active_nodes.add_reach_attempt(future); + let reach_id = self.nodes.active_nodes.add_reach_attempt(future, self.nodes.handler_build.new_handler()); let former = self.nodes.out_reach_attempts.insert( self.peer_id.clone(), @@ -976,18 +970,23 @@ where } } -impl Stream for Swarm +impl Stream for + Swarm where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, TTrans::MultiaddrFuture: Future + Send + 'static, TTrans::ListenerUpgrade: Send + 'static, - TMuxer: muxing::StreamMuxer + Send + Sync + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - TUserData: Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + THandlerBuild: HandlerFactory, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, + THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary { - type Item = SwarmEvent; + type Item = SwarmEvent; type Error = Void; // TODO: use `!` once stable fn poll(&mut self) -> Poll, Self::Error> { @@ -998,7 +997,7 @@ where upgrade, listen_addr, }))) => { - let id = self.active_nodes.add_reach_attempt(upgrade); + let id = self.active_nodes.add_reach_attempt(upgrade, self.handler_build.new_handler()); self.other_reach_attempts.push(( id, ConnectedPoint::Listener { @@ -1029,15 +1028,14 @@ where match self.active_nodes.poll() { Ok(Async::NotReady) => break, Ok(Async::Ready(Some(CollectionEvent::NodeReached { peer_id, id }))) => { - let event = self.handle_node_reached(peer_id, id, None); + let event = self.handle_node_reached(peer_id, id, false); return Ok(Async::Ready(Some(event))); } Ok(Async::Ready(Some(CollectionEvent::NodeReplaced { peer_id, id, - closed_outbound_substreams, }))) => { - let event = self.handle_node_reached(peer_id, id, Some(closed_outbound_substreams)); + let event = self.handle_node_reached(peer_id, id, true); return Ok(Async::Ready(Some(event))); } Ok(Async::Ready(Some(CollectionEvent::ReachError { id, error }))) => { @@ -1048,7 +1046,6 @@ where Ok(Async::Ready(Some(CollectionEvent::NodeError { peer_id, error, - closed_outbound_substreams, }))) => { let address = self.connected_multiaddresses.remove(&peer_id); debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); @@ -1056,7 +1053,6 @@ where peer_id, address, error, - closed_outbound_substreams, }))); } Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id }))) => { @@ -1064,49 +1060,8 @@ where debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); return Ok(Async::Ready(Some(SwarmEvent::NodeClosed { peer_id, address }))); } - Ok(Async::Ready(Some(CollectionEvent::NodeMultiaddr { peer_id, address }))) => { - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); - if let Ok(ref addr) = address { - self.connected_multiaddresses - .insert(peer_id.clone(), addr.clone()); - } - return Ok(Async::Ready(Some(SwarmEvent::NodeMultiaddr { - peer_id, - address, - }))); - } - Ok(Async::Ready(Some(CollectionEvent::InboundSubstream { - peer_id, - substream, - }))) => { - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); - return Ok(Async::Ready(Some(SwarmEvent::InboundSubstream { - peer_id, - substream, - }))); - } - Ok(Async::Ready(Some(CollectionEvent::OutboundSubstream { - peer_id, - user_data, - substream, - }))) => { - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); - return Ok(Async::Ready(Some(SwarmEvent::OutboundSubstream { - peer_id, - substream, - user_data, - }))); - } - Ok(Async::Ready(Some(CollectionEvent::InboundClosed { peer_id }))) => { - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); - return Ok(Async::Ready(Some(SwarmEvent::InboundClosed { peer_id }))); - } - Ok(Async::Ready(Some(CollectionEvent::OutboundClosed { peer_id, user_data }))) => { - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); - return Ok(Async::Ready(Some(SwarmEvent::OutboundClosed { - peer_id, - user_data, - }))); + Ok(Async::Ready(Some(CollectionEvent::NodeEvent { peer_id, event }))) => { + return Ok(Async::Ready(Some(SwarmEvent::NodeEvent { peer_id, event }))); } Ok(Async::Ready(None)) => unreachable!("CollectionStream never ends"), Err(_) => unreachable!("CollectionStream never errors"),