diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 45300882..90384314 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -25,15 +25,13 @@ use nodes::node::Substream; use nodes::handled_node_tasks::{HandledNodesEvent, HandledNodesTasks}; use nodes::handled_node_tasks::{Task as HandledNodesTask, TaskId}; use nodes::handled_node::NodeHandler; -use std::collections::hash_map::Entry; +use std::{collections::hash_map::Entry, fmt, mem}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use void::Void; use {Multiaddr, PeerId}; // TODO: make generic over PeerId /// Implementation of `Stream` that handles a collection of nodes. -// TODO: implement Debug pub struct CollectionStream { /// Object that handles the tasks. inner: HandledNodesTasks, @@ -45,6 +43,23 @@ pub struct CollectionStream { tasks: FnvHashMap, } +impl fmt::Debug for CollectionStream { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + let mut list = f.debug_list(); + for (id, task) in &self.tasks { + match *task { + TaskState::Pending => { + list.entry(&format!("Pending({:?})", ReachAttemptId(*id))) + }, + TaskState::Connected(ref peer_id) => { + list.entry(&format!("Connected({:?})", peer_id)) + } + }; + } + list.finish() + } +} + /// State of a task. #[derive(Debug, Clone, PartialEq, Eq)] enum TaskState { @@ -55,26 +70,10 @@ enum TaskState { } /// Event that can happen on the `CollectionStream`. -// TODO: implement Debug -pub enum CollectionEvent { - /// A connection to a node has succeeded. - NodeReached { - /// Identifier of the node. - peer_id: PeerId, - /// Identifier of the reach attempt that succeeded. - id: ReachAttemptId, - }, - - /// A connection to a node has succeeded and replaces a former connection. - /// - /// The opened substreams of the former node will keep working (unless the remote decides to - /// close them). - NodeReplaced { - /// Identifier of the node. - peer_id: PeerId, - /// Identifier of the reach attempt that succeeded. - id: ReachAttemptId, - }, +pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a> { + /// A connection to a node has succeeded. You must use the provided event in order to accept + /// the connection. + NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent>), /// A connection to a node has been closed. /// @@ -110,6 +109,148 @@ pub enum CollectionEvent { }, } +impl<'a, TInEvent, TOutEvent> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent> +where TOutEvent: fmt::Debug +{ + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + CollectionEvent::NodeReached(ref inner) => { + f.debug_tuple("CollectionEvent::NodeReached") + .field(inner) + .finish() + }, + CollectionEvent::NodeClosed { ref peer_id } => { + f.debug_struct("CollectionEvent::NodeClosed") + .field("peer_id", peer_id) + .finish() + }, + CollectionEvent::NodeError { ref peer_id, ref error } => { + f.debug_struct("CollectionEvent::NodeError") + .field("peer_id", peer_id) + .field("error", error) + .finish() + }, + CollectionEvent::ReachError { ref id, ref error } => { + f.debug_struct("CollectionEvent::ReachError") + .field("id", id) + .field("error", error) + .finish() + }, + CollectionEvent::NodeEvent { ref peer_id, ref event } => { + f.debug_struct("CollectionEvent::NodeEvent") + .field("peer_id", peer_id) + .field("event", event) + .finish() + }, + } + } +} + +/// Event that happens when we reach a node. +#[must_use = "The node reached event is used to accept the newly-opened connection"] +pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a> { + /// Peer id we connected to. + peer_id: PeerId, + /// The task id that reached the node. + id: TaskId, + /// The `CollectionStream` we are referencing. + parent: &'a mut CollectionStream, +} + +impl<'a, TInEvent, TOutEvent> CollectionReachEvent<'a, TInEvent, TOutEvent> { + /// Returns the peer id the node that has been reached. + #[inline] + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } + + /// Returns the reach attempt that reached the node. + #[inline] + pub fn reach_attempt_id(&self) -> ReachAttemptId { + ReachAttemptId(self.id) + } + + /// Returns `true` if accepting this reached node would replace an existing connection to that + /// node. + #[inline] + pub fn would_replace(&self) -> bool { + self.parent.nodes.contains_key(&self.peer_id) + } + + /// Accepts the new node. + pub fn accept(self) -> (CollectionNodeAccept, PeerId) { + // Set the state of the task to `Connected`. + let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id); + let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone())); + debug_assert_eq!(_former_state, Some(TaskState::Pending)); + + // It is possible that we already have a task connected to the same peer. In this + // case, we need to emit a `NodeReplaced` event. + let ret_value = if let Some(former_task_id) = former_task_id { + self.parent.inner.task(former_task_id) + .expect("whenever we receive a TaskClosed event or close a node, we remove the \ + corresponding entry from self.nodes ; therefore all elements in \ + self.nodes are valid tasks in the HandledNodesTasks ; qed") + .close(); + let _former_other_state = self.parent.tasks.remove(&former_task_id); + debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone()))); + + // TODO: we unfortunately have to clone the peer id here + (CollectionNodeAccept::ReplacedExisting, self.peer_id.clone()) + } else { + // TODO: we unfortunately have to clone the peer id here + (CollectionNodeAccept::NewEntry, self.peer_id.clone()) + }; + + // Don't run the destructor. + mem::forget(self); + + ret_value + } + + /// Denies the node. + /// + /// Has the same effect as dropping the event without accepting it. + #[inline] + pub fn deny(self) -> PeerId { + // TODO: we unfortunately have to clone the id here, in order to be explicit + let peer_id = self.peer_id.clone(); + drop(self); // Just to be explicit + peer_id + } +} + +impl<'a, TInEvent, TOutEvent> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent> { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_struct("CollectionReachEvent") + .field("peer_id", &self.peer_id) + .field("reach_attempt_id", &self.reach_attempt_id()) + .finish() + } +} + +impl<'a, TInEvent, TOutEvent> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent> { + fn drop(&mut self) { + let task_state = self.parent.tasks.remove(&self.id); + debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false }); + self.parent.inner.task(self.id) + .expect("we create the CollectionReachEvent with a valid task id ; the \ + CollectionReachEvent mutably borrows the collection, therefore nothing \ + can delete this task during the lifetime of the CollectionReachEvent ; \ + therefore the task is still valid when we delete it ; qed") + .close(); + } +} + +/// Outcome of accepting a node. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CollectionNodeAccept { + /// We replaced an existing node. + ReplacedExisting, + /// We didn't replace anything existing. + NewEntry, +} + /// Identifier for a future that attempts to reach a node. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ReachAttemptId(TaskId); @@ -215,6 +356,81 @@ impl CollectionStream { pub fn connections(&self) -> impl Iterator { self.nodes.keys() } + + /// Provides an API similar to `Stream`, except that it cannot error. + /// + /// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to + /// > remove the `Err` variant, but also because we want the `CollectionStream` to stay + /// > borrowed if necessary. + pub fn poll(&mut self) -> Async>> { + let item = match self.inner.poll() { + Async::Ready(item) => item, + Async::NotReady => return Async::NotReady, + }; + + match item { + Some(HandledNodesEvent::TaskClosed { id, result }) => { + match (self.tasks.remove(&id), result) { + (Some(TaskState::Pending), Err(err)) => { + Async::Ready(Some(CollectionEvent::ReachError { + id: ReachAttemptId(id), + error: err, + })) + }, + (Some(TaskState::Pending), Ok(())) => { + // TODO: this variant shouldn't happen ; prove this + Async::Ready(Some(CollectionEvent::ReachError { + id: ReachAttemptId(id), + error: IoError::new(IoErrorKind::Other, "couldn't reach the node"), + })) + }, + (Some(TaskState::Connected(peer_id)), Ok(())) => { + let _node_task_id = self.nodes.remove(&peer_id); + debug_assert_eq!(_node_task_id, Some(id)); + Async::Ready(Some(CollectionEvent::NodeClosed { + peer_id, + })) + }, + (Some(TaskState::Connected(peer_id)), Err(err)) => { + let _node_task_id = self.nodes.remove(&peer_id); + debug_assert_eq!(_node_task_id, Some(id)); + Async::Ready(Some(CollectionEvent::NodeError { + peer_id, + error: err, + })) + }, + (None, _) => { + panic!("self.tasks is always kept in sync with the tasks in self.inner ; \ + when we add a task in self.inner we add a corresponding entry in \ + self.tasks, and remove the entry only when the task is closed ; \ + qed") + }, + } + }, + Some(HandledNodesEvent::NodeReached { id, peer_id }) => { + Async::Ready(Some(CollectionEvent::NodeReached(CollectionReachEvent { + parent: self, + id, + peer_id, + }))) + }, + Some(HandledNodesEvent::NodeEvent { id, event }) => { + let peer_id = match self.tasks.get(&id) { + Some(TaskState::Connected(peer_id)) => peer_id.clone(), + _ => panic!("we can only receive NodeEvent events from a task after we \ + received a corresponding NodeReached event from that same task ; \ + when we receive a NodeReached event, we ensure that the entry in \ + self.tasks is switched to the Connected state ; qed"), + }; + + Async::Ready(Some(CollectionEvent::NodeEvent { + peer_id, + event, + })) + } + None => Async::Ready(None), + } + } } /// Access to a peer in the collection. @@ -247,98 +463,3 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> { self.inner.close(); } } - -impl Stream for CollectionStream { - type Item = CollectionEvent; - type Error = Void; // TODO: use ! once stable - - fn poll(&mut self) -> Poll, Self::Error> { - let item = try_ready!(self.inner.poll()); - - match item { - Some(HandledNodesEvent::TaskClosed { id, result }) => { - match (self.tasks.remove(&id), result) { - (Some(TaskState::Pending), Err(err)) => { - Ok(Async::Ready(Some(CollectionEvent::ReachError { - id: ReachAttemptId(id), - error: err, - }))) - }, - (Some(TaskState::Pending), Ok(())) => { - // TODO: this variant shouldn't happen ; prove this - Ok(Async::Ready(Some(CollectionEvent::ReachError { - id: ReachAttemptId(id), - error: IoError::new(IoErrorKind::Other, "couldn't reach the node"), - }))) - }, - (Some(TaskState::Connected(peer_id)), Ok(())) => { - let _node_task_id = self.nodes.remove(&peer_id); - debug_assert_eq!(_node_task_id, Some(id)); - Ok(Async::Ready(Some(CollectionEvent::NodeClosed { - peer_id, - }))) - }, - (Some(TaskState::Connected(peer_id)), Err(err)) => { - let _node_task_id = self.nodes.remove(&peer_id); - debug_assert_eq!(_node_task_id, Some(id)); - Ok(Async::Ready(Some(CollectionEvent::NodeError { - peer_id, - error: err, - }))) - }, - (None, _) => { - panic!("self.tasks is always kept in sync with the tasks in self.inner ; \ - when we add a task in self.inner we add a corresponding entry in \ - self.tasks, and remove the entry only when the task is closed ; \ - qed") - }, - } - }, - Some(HandledNodesEvent::NodeReached { id, peer_id }) => { - // Set the state of the task to `Connected`. - let former_task_id = self.nodes.insert(peer_id.clone(), id); - let _former_state = self.tasks.insert(id, TaskState::Connected(peer_id.clone())); - debug_assert_eq!(_former_state, Some(TaskState::Pending)); - - // It is possible that we already have a task connected to the same peer. In this - // case, we need to emit a `NodeReplaced` event. - if let Some(former_task_id) = former_task_id { - self.inner.task(former_task_id) - .expect("whenever we receive a TaskClosed event or close a node, we \ - remove the corresponding entry from self.nodes ; therefore all \ - elements in self.nodes are valid tasks in the \ - HandledNodesTasks ; qed") - .close(); - let _former_other_state = self.tasks.remove(&former_task_id); - debug_assert_eq!(_former_other_state, Some(TaskState::Connected(peer_id.clone()))); - - Ok(Async::Ready(Some(CollectionEvent::NodeReplaced { - peer_id, - id: ReachAttemptId(id), - }))) - - } else { - Ok(Async::Ready(Some(CollectionEvent::NodeReached { - peer_id, - id: ReachAttemptId(id), - }))) - } - }, - Some(HandledNodesEvent::NodeEvent { id, event }) => { - let peer_id = match self.tasks.get(&id) { - Some(TaskState::Connected(peer_id)) => peer_id.clone(), - _ => panic!("we can only receive NodeEvent events from a task after we \ - received a corresponding NodeReached event from that same task ; \ - when we receive a NodeReached event, we ensure that the entry in \ - self.tasks is switched to the Connected state ; qed"), - }; - - Ok(Async::Ready(Some(CollectionEvent::NodeEvent { - peer_id, - event, - }))) - } - None => Ok(Async::Ready(None)), - } - } -} diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index 1636e4e2..3b51c55f 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -28,6 +28,9 @@ use Multiaddr; /// /// > Note: When implementing the various methods, don't forget that you have to register the /// > task that was the latest to poll and notify it. +// TODO: right now it is possible for a node handler to be built, then shut down right after if we +// realize we dialed the wrong peer for example ; this could be surprising and should either +// be documented or changed (favouring the "documented" right now) pub trait NodeHandler { /// Custom event that can be received from the outside. type InEvent; diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 104e12f2..55457205 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -26,7 +26,7 @@ use nodes::handled_node::{HandledNode, NodeHandler}; use smallvec::SmallVec; use std::collections::hash_map::{Entry, OccupiedEntry}; use std::io::Error as IoError; -use std::mem; +use std::{fmt, mem}; use tokio_executor; use void::Void; use {Multiaddr, PeerId}; @@ -69,6 +69,14 @@ pub struct HandledNodesTasks { events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, } +impl fmt::Debug for HandledNodesTasks { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_list() + .entries(self.tasks.keys().cloned()) + .finish() + } +} + /// Event that can happen on the `HandledNodesTasks`. #[derive(Debug)] pub enum HandledNodesEvent { @@ -183,6 +191,55 @@ impl HandledNodesTasks { pub fn tasks<'a>(&'a self) -> impl Iterator + 'a { self.tasks.keys().cloned() } + + /// Provides an API similar to `Stream`, except that it cannot error. + pub fn poll(&mut self) -> Async>> { + for to_spawn in self.to_spawn.drain() { + tokio_executor::spawn(to_spawn); + } + + loop { + match self.events_rx.poll() { + Ok(Async::Ready(Some((message, task_id)))) => { + // If the task id is no longer in `self.tasks`, that means that the user called + // `close()` on this task earlier. Therefore no new event should be generated + // for this task. + if !self.tasks.contains_key(&task_id) { + continue; + }; + + match message { + InToExtMessage::NodeEvent(event) => { + break Async::Ready(Some(HandledNodesEvent::NodeEvent { + id: task_id, + event, + })); + }, + InToExtMessage::NodeReached(peer_id) => { + break Async::Ready(Some(HandledNodesEvent::NodeReached { + id: task_id, + peer_id, + })); + }, + InToExtMessage::TaskClosed(result) => { + let _ = self.tasks.remove(&task_id); + break Async::Ready(Some(HandledNodesEvent::TaskClosed { + id: task_id, result + })); + }, + } + } + Ok(Async::NotReady) => { + break Async::NotReady; + } + Ok(Async::Ready(None)) => { + unreachable!("The sender is in self as well, therefore the receiver never \ + closes.") + }, + Err(()) => unreachable!("An unbounded receiver never errors"), + } + } + } } /// Access to a task in the collection. @@ -214,60 +271,26 @@ impl<'a, TInEvent> Task<'a, TInEvent> { } } +impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_tuple("Task") + .field(&self.id()) + .finish() + } +} + impl Stream for HandledNodesTasks { type Item = HandledNodesEvent; type Error = Void; // TODO: use ! once stable + #[inline] fn poll(&mut self) -> Poll, Self::Error> { - for to_spawn in self.to_spawn.drain() { - tokio_executor::spawn(to_spawn); - } - - loop { - match self.events_rx.poll() { - Ok(Async::Ready(Some((message, task_id)))) => { - // If the task id is no longer in `self.tasks`, that means that the user called - // `close()` on this task earlier. Therefore no new event should be generated - // for this task. - if !self.tasks.contains_key(&task_id) { - continue; - }; - - match message { - InToExtMessage::NodeEvent(event) => { - break Ok(Async::Ready(Some(HandledNodesEvent::NodeEvent { - id: task_id, - event, - }))); - }, - InToExtMessage::NodeReached(peer_id) => { - break Ok(Async::Ready(Some(HandledNodesEvent::NodeReached { - id: task_id, - peer_id, - }))); - }, - InToExtMessage::TaskClosed(result) => { - let _ = self.tasks.remove(&task_id); - break Ok(Async::Ready(Some(HandledNodesEvent::TaskClosed { - id: task_id, result - }))); - }, - } - } - Ok(Async::NotReady) => { - break Ok(Async::NotReady); - } - Ok(Async::Ready(None)) => { - unreachable!("The sender is in self as well, therefore the receiver never \ - closes.") - }, - Err(()) => unreachable!("An unbounded receiver never errors"), - } - } + Ok(self.poll()) } } /// Message to transmit from a task to the public API. +#[derive(Debug)] enum InToExtMessage { /// A connection to a node has succeeded. NodeReached(PeerId), diff --git a/core/src/nodes/listeners.rs b/core/src/nodes/listeners.rs index e177edcf..8e1011d1 100644 --- a/core/src/nodes/listeners.rs +++ b/core/src/nodes/listeners.rs @@ -126,16 +126,9 @@ where pub fn listeners(&self) -> impl Iterator { self.listeners.iter().map(|l| &l.address) } -} -impl Stream for ListenersStream -where - TTrans: Transport, -{ - type Item = ListenersEvent; - type Error = Void; // TODO: use ! once stable - - fn poll(&mut self) -> Poll, Self::Error> { + /// Provides an API similar to `Stream`, except that it cannot error. + pub fn poll(&mut self) -> Async>> { // We remove each element from `listeners` one by one and add them back. for n in (0..self.listeners.len()).rev() { let mut listener = self.listeners.swap_remove(n); @@ -146,30 +139,43 @@ where Ok(Async::Ready(Some(upgrade))) => { let listen_addr = listener.address.clone(); self.listeners.push(listener); - return Ok(Async::Ready(Some(ListenersEvent::Incoming { + return Async::Ready(Some(ListenersEvent::Incoming { upgrade, listen_addr, - }))); + })); } Ok(Async::Ready(None)) => { - return Ok(Async::Ready(Some(ListenersEvent::Closed { + return Async::Ready(Some(ListenersEvent::Closed { listen_addr: listener.address, listener: listener.listener, result: Ok(()), - }))); + })); } Err(err) => { - return Ok(Async::Ready(Some(ListenersEvent::Closed { + return Async::Ready(Some(ListenersEvent::Closed { listen_addr: listener.address, listener: listener.listener, result: Err(err), - }))); + })); } } } // We register the current task to be waken up if a new listener is added. - Ok(Async::NotReady) + Async::NotReady + } +} + +impl Stream for ListenersStream +where + TTrans: Transport, +{ + type Item = ListenersEvent; + type Error = Void; // TODO: use ! once stable + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + Ok(self.poll()) } } diff --git a/core/src/nodes/swarm.rs b/core/src/nodes/swarm.rs index e35710bb..97e96250 100644 --- a/core/src/nodes/swarm.rs +++ b/core/src/nodes/swarm.rs @@ -22,7 +22,7 @@ use fnv::FnvHashMap; use futures::{prelude::*, future}; use muxing::StreamMuxer; use nodes::collection::{ - CollectionEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId, + CollectionEvent, CollectionNodeAccept, CollectionReachEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId, }; use nodes::handled_node::NodeHandler; use nodes::listeners::{ListenersEvent, ListenersStream}; @@ -43,6 +43,15 @@ where /// The nodes currently active. active_nodes: CollectionStream, + /// The reach attempts of the swarm. + /// This needs to be a separate struct in order to handle multiple mutable borrows issues. + reach_attempts: ReachAttempts, + + /// Object that builds new handlers. + handler_build: THandlerBuild, +} + +struct ReachAttempts { /// Attempts to reach a peer. out_reach_attempts: FnvHashMap, @@ -52,9 +61,6 @@ where /// For each peer ID we're connected to, contains the multiaddress we're connected to. connected_multiaddresses: FnvHashMap, - - /// Object that builds new handlers. - handler_build: THandlerBuild, } /// Attempt to reach a peer. @@ -271,9 +277,11 @@ where Swarm { listeners: ListenersStream::new(transport), active_nodes: CollectionStream::new(), - out_reach_attempts: Default::default(), - other_reach_attempts: Vec::new(), - connected_multiaddresses: Default::default(), + reach_attempts: ReachAttempts { + out_reach_attempts: Default::default(), + other_reach_attempts: Vec::new(), + connected_multiaddresses: Default::default(), + }, handler_build: Default::default, } } @@ -285,9 +293,11 @@ where Swarm { listeners: ListenersStream::new(transport), active_nodes: CollectionStream::new(), - out_reach_attempts: Default::default(), - other_reach_attempts: Vec::new(), - connected_multiaddresses: Default::default(), + reach_attempts: ReachAttempts { + out_reach_attempts: Default::default(), + other_reach_attempts: Vec::new(), + connected_multiaddresses: Default::default(), + }, handler_build, } } @@ -347,7 +357,7 @@ where }; let reach_id = self.active_nodes.add_reach_attempt(future, self.handler_build.new_handler()); - self.other_reach_attempts + self.reach_attempts.other_reach_attempts .push((reach_id, ConnectedPoint::Dialer { address: addr })); Ok(()) } @@ -361,7 +371,7 @@ where // a lot of API changes #[inline] pub fn num_incoming_negotiated(&self) -> usize { - self.other_reach_attempts + self.reach_attempts.other_reach_attempts .iter() .filter(|&(_, endpoint)| endpoint.is_listener()) .count() @@ -382,21 +392,21 @@ where // the borrow checker yells at us. if self.active_nodes.peer_mut(&peer_id).is_some() { - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); + debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id)); return Peer::Connected(PeerConnected { peer: self .active_nodes .peer_mut(&peer_id) .expect("we checked for Some just above"), peer_id, - connected_multiaddresses: &mut self.connected_multiaddresses, + connected_multiaddresses: &mut self.reach_attempts.connected_multiaddresses, }); } - if self.out_reach_attempts.get_mut(&peer_id).is_some() { - debug_assert!(!self.connected_multiaddresses.contains_key(&peer_id)); + if self.reach_attempts.out_reach_attempts.get_mut(&peer_id).is_some() { + debug_assert!(!self.reach_attempts.connected_multiaddresses.contains_key(&peer_id)); return Peer::PendingConnect(PeerPendingConnect { - attempt: match self.out_reach_attempts.entry(peer_id.clone()) { + attempt: match self.reach_attempts.out_reach_attempts.entry(peer_id.clone()) { Entry::Occupied(e) => e, Entry::Vacant(_) => panic!("we checked for Some just above"), }, @@ -404,25 +414,19 @@ where }); } - debug_assert!(!self.connected_multiaddresses.contains_key(&peer_id)); + debug_assert!(!self.reach_attempts.connected_multiaddresses.contains_key(&peer_id)); Peer::NotConnected(PeerNotConnected { nodes: self, peer_id, }) } - /// Handles a node reached event from the collection. + /// Starts dialing out a multiaddress. `rest` is the list of multiaddresses to attempt if + /// `first` fails. /// - /// Returns an event to return from the stream. - /// - /// > **Note**: The event **must** have been produced by the collection of nodes, otherwise - /// > panics will likely happen. - fn handle_node_reached( - &mut self, - peer_id: PeerId, - reach_id: ReachAttemptId, - replaced: bool, - ) -> SwarmEvent + /// It is a logic error to call this method if we already have an outgoing attempt to the + /// given peer. + fn start_dial_out(&mut self, peer_id: PeerId, first: Multiaddr, rest: Vec) where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, @@ -433,217 +437,354 @@ where TInEvent: Send + 'static, TOutEvent: Send + 'static, { - // We first start looking in the incoming attempts. While this makes the code less optimal, - // it also makes the logic easier. - if let Some(in_pos) = self - .other_reach_attempts - .iter() - .position(|i| i.0 == reach_id) - { - let (_, endpoint) = self.other_reach_attempts.swap_remove(in_pos); - - // Clear the known multiaddress for this peer. - let closed_multiaddr = self.connected_multiaddresses.remove(&peer_id); - // Cancel any outgoing attempt to this peer. - if let Some(attempt) = self.out_reach_attempts.remove(&peer_id) { - debug_assert_ne!(attempt.id, reach_id); - self.active_nodes - .interrupt(attempt.id) - .expect("We insert in out_reach_attempts only when we call \ - active_nodes.add_reach_attempt, and we remove only when we call \ - interrupt or when a reach attempt succeeds or errors. Therefore the \ - out_reach_attempts should always be in sync with the actual attempts"); - } - - if replaced { - return SwarmEvent::Replaced { - peer_id, - endpoint, - closed_multiaddr, - }; - } else { - return SwarmEvent::Connected { peer_id, endpoint }; - } - } - - // Otherwise, try for outgoing attempts. - let is_outgoing_and_ok = if let Some(attempt) = self.out_reach_attempts.get(&peer_id) { - attempt.id == reach_id - } else { - false + let reach_id = match self.transport().clone().dial(first.clone()) { + Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()), + Err((_, addr)) => { + let msg = format!("unsupported multiaddr {}", addr); + let fut = future::err(IoError::new(IoErrorKind::Other, msg)); + self.active_nodes.add_reach_attempt::<_, _, future::FutureResult, _>(fut, self.handler_build.new_handler()) + }, }; - // We only remove the attempt from `out_reach_attempts` if it both matches the reach id - // and the expected peer id. - if is_outgoing_and_ok { - let attempt = self.out_reach_attempts.remove(&peer_id) - .expect("is_outgoing_and_ok is true only if self.out_reach_attempts.get(&peer_id) \ - returned Some"); + let former = self.reach_attempts.out_reach_attempts.insert( + peer_id, + OutReachAttempt { + id: reach_id, + cur_attempted: first, + next_attempts: rest, + }, + ); - let closed_multiaddr = self.connected_multiaddresses - .insert(peer_id.clone(), attempt.cur_attempted.clone()); - let endpoint = ConnectedPoint::Dialer { - address: attempt.cur_attempted, - }; - - if replaced { - return SwarmEvent::Replaced { - peer_id, - endpoint, - closed_multiaddr, - }; - } else { - return SwarmEvent::Connected { peer_id, endpoint }; - } - } - - // If in neither, check outgoing reach attempts again as we may have a public - // key mismatch. - let expected_peer_id = self - .out_reach_attempts - .iter() - .find(|(_, a)| a.id == reach_id) - .map(|(p, _)| p.clone()); - if let Some(expected_peer_id) = expected_peer_id { - let attempt = self.out_reach_attempts.remove(&expected_peer_id) - .expect("expected_peer_id is a key that is grabbed from out_reach_attempts"); - - let num_remain = attempt.next_attempts.len(); - let failed_addr = attempt.cur_attempted.clone(); - - // Since the `peer_id` (the unexpected peer id) is now successfully connected, we have - // to drop it from active_nodes. - // TODO: at the moment, a peer id mismatch can drop a legitimate connection, which is - // why we have to purge `connected_multiaddresses`. - // See https://github.com/libp2p/rust-libp2p/issues/502 - self.connected_multiaddresses.remove(&peer_id); - self.active_nodes.peer_mut(&peer_id) - .expect("When we receive a NodeReached or NodeReplaced event from active_nodes, \ - it is guaranteed that the PeerId is valid and therefore that \ - active_nodes.peer_mut succeeds with this ID. handle_node_reached is \ - called only to handle these events.") - .close(); - - if !attempt.next_attempts.is_empty() { - let mut attempt = attempt; - attempt.cur_attempted = attempt.next_attempts.remove(0); - attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) { - Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()), - Err((_, addr)) => { - let msg = format!("unsupported multiaddr {}", addr); - let fut = future::err(IoError::new(IoErrorKind::Other, msg)); - self.active_nodes.add_reach_attempt::<_, _, future::FutureResult, _>(fut, self.handler_build.new_handler()) - }, - }; - - self.out_reach_attempts.insert(expected_peer_id.clone(), attempt); - } - - return SwarmEvent::PublicKeyMismatch { - remain_addrs_attempt: num_remain, - expected_peer_id, - actual_peer_id: peer_id, - multiaddr: failed_addr, - }; - } - - // We didn't find any entry in neither the outgoing connections not ingoing connections. - panic!("The API of collection guarantees that the id sent back in NodeReached and \ - NodeReplaced events (which is where we call handle_node_reached) is one that was \ - passed to add_reach_attempt. Whenever we call add_reach_attempt, we also insert \ - at the same time an entry either in out_reach_attempts or in \ - other_reach_attempts. It is therefore guaranteed that we find back this ID in \ - either of these two sets"); + debug_assert!(former.is_none()); } - /// Handles a reach error event from the collection. - /// - /// Optionally returns an event to return from the stream. - /// - /// > **Note**: The event **must** have been produced by the collection of nodes, otherwise - /// > panics will likely happen. - fn handle_reach_error( - &mut self, - reach_id: ReachAttemptId, - error: IoError, - ) -> Option> + /// Provides an API similar to `Stream`, except that it cannot error. + pub fn poll(&mut self) -> Async>> where TTrans: Transport + Clone, TTrans::Dial: Send + 'static, - TTrans::MultiaddrFuture: Send + 'static, + TTrans::MultiaddrFuture: Future + Send + 'static, + TTrans::ListenerUpgrade: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, + THandlerBuild: HandlerFactory, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, + THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary { - // Search for the attempt in `out_reach_attempts`. - // TODO: could be more optimal than iterating over everything - let out_reach_peer_id = self - .out_reach_attempts - .iter() - .find(|(_, a)| a.id == reach_id) - .map(|(p, _)| p.clone()); - if let Some(peer_id) = out_reach_peer_id { - let mut attempt = self.out_reach_attempts.remove(&peer_id) - .expect("out_reach_peer_id is a key that is grabbed from out_reach_attempts"); - - let num_remain = attempt.next_attempts.len(); - let failed_addr = attempt.cur_attempted.clone(); - - if !attempt.next_attempts.is_empty() { - let mut attempt = attempt; - attempt.cur_attempted = attempt.next_attempts.remove(0); - attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) { - Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()), - Err((_, addr)) => { - let msg = format!("unsupported multiaddr {}", addr); - let fut = future::err(IoError::new(IoErrorKind::Other, msg)); - self.active_nodes.add_reach_attempt::<_, _, future::FutureResult, _>(fut, self.handler_build.new_handler()) + // Start by polling the listeners for events. + match self.listeners.poll() { + Async::NotReady => (), + Async::Ready(Some(ListenersEvent::Incoming { + upgrade, + listen_addr, + })) => { + let id = self.active_nodes.add_reach_attempt(upgrade, self.handler_build.new_handler()); + self.reach_attempts.other_reach_attempts.push(( + id, + ConnectedPoint::Listener { + listen_addr: listen_addr.clone(), }, - }; - - self.out_reach_attempts.insert(peer_id.clone(), attempt); + )); + return Async::Ready(Some(SwarmEvent::IncomingConnection { + listen_addr, + })); } - - return Some(SwarmEvent::DialError { - remain_addrs_attempt: num_remain, - peer_id, - multiaddr: failed_addr, - error, - }); + Async::Ready(Some(ListenersEvent::Closed { + listen_addr, + listener, + result, + })) => { + return Async::Ready(Some(SwarmEvent::ListenerClosed { + listen_addr, + listener, + result, + })); + } + Async::Ready(None) => unreachable!("The listeners stream never finishes"), } - // If this is not an outgoing reach attempt, check the incoming reach attempts. - if let Some(in_pos) = self - .other_reach_attempts - .iter() - .position(|i| i.0 == reach_id) - { - let (_, endpoint) = self.other_reach_attempts.swap_remove(in_pos); - match endpoint { - ConnectedPoint::Dialer { address } => { - return Some(SwarmEvent::UnknownPeerDialError { - multiaddr: address, + // Poll the existing nodes. + loop { + let (action, out_event); + match self.active_nodes.poll() { + Async::NotReady => break, + Async::Ready(Some(CollectionEvent::NodeReached(reach_event))) => { + let (a, e) = handle_node_reached(&mut self.reach_attempts, reach_event); + action = a; + out_event = e; + } + Async::Ready(Some(CollectionEvent::ReachError { id, error })) => { + let (a, e) = handle_reach_error(&mut self.reach_attempts, id, error); + action = a; + out_event = e; + } + Async::Ready(Some(CollectionEvent::NodeError { + peer_id, + error, + })) => { + let address = self.reach_attempts.connected_multiaddresses.remove(&peer_id); + debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id)); + action = Default::default(); + out_event = SwarmEvent::NodeError { + peer_id, + address, error, - }); + }; } - ConnectedPoint::Listener { listen_addr } => { - return Some(SwarmEvent::IncomingConnectionError { listen_addr, error }); + Async::Ready(Some(CollectionEvent::NodeClosed { peer_id })) => { + let address = self.reach_attempts.connected_multiaddresses.remove(&peer_id); + debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id)); + action = Default::default(); + out_event = SwarmEvent::NodeClosed { peer_id, address }; } + Async::Ready(Some(CollectionEvent::NodeEvent { peer_id, event })) => { + action = Default::default(); + out_event = SwarmEvent::NodeEvent { peer_id, event }; + } + Async::Ready(None) => unreachable!("CollectionStream never ends"), + }; + + if let Some((peer_id, first, rest)) = action.start_dial_out { + self.start_dial_out(peer_id, first, rest); } + + if let Some(interrupt) = action.interrupt { + // TODO: improve proof or remove ; this is too complicated right now + self.active_nodes + .interrupt(interrupt) + .expect("interrupt is guaranteed to be gathered from `out_reach_attempts` ; + we insert in out_reach_attempts only when we call \ + active_nodes.add_reach_attempt, and we remove only when we call \ + interrupt or when a reach attempt succeeds or errors ; therefore the \ + out_reach_attempts should always be in sync with the actual \ + attempts ; qed"); + } + + return Async::Ready(Some(out_event)); } - // The id was neither in the outbound list nor the inbound list. - panic!("The API of collection guarantees that the id sent back in ReachError events \ - (which is where we call handle_reach_error) is one that was passed to \ - add_reach_attempt. Whenever we call add_reach_attempt, we also insert \ - at the same time an entry either in out_reach_attempts or in \ - other_reach_attempts. It is therefore guaranteed that we find back this ID in \ - either of these two sets"); + Async::NotReady } } +/// Internal struct indicating an action to perform of the swarm. +#[derive(Debug, Default)] +#[must_use] +struct ActionItem { + start_dial_out: Option<(PeerId, Multiaddr, Vec)>, + interrupt: Option, +} + +/// Handles a node reached event from the collection. +/// +/// Returns an event to return from the stream. +/// +/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise +/// > panics will likely happen. +fn handle_node_reached( + reach_attempts: &mut ReachAttempts, + event: CollectionReachEvent +) -> (ActionItem, SwarmEvent) +where + TTrans: Transport + Clone, + TTrans::Dial: Send + 'static, + TTrans::MultiaddrFuture: Send + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer::OutboundSubstream: Send, + TMuxer::Substream: Send, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, +{ + // We first start looking in the incoming attempts. While this makes the code less optimal, + // it also makes the logic easier. + if let Some(in_pos) = reach_attempts + .other_reach_attempts + .iter() + .position(|i| i.0 == event.reach_attempt_id()) + { + let (_, endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos); + + // Clear the known multiaddress for this peer. + let closed_multiaddr = reach_attempts.connected_multiaddresses.remove(&event.peer_id()); + // Cancel any outgoing attempt to this peer. + let action = if let Some(attempt) = reach_attempts.out_reach_attempts.remove(&event.peer_id()) { + debug_assert_ne!(attempt.id, event.reach_attempt_id()); + ActionItem { + interrupt: Some(attempt.id), + .. Default::default() + } + } else { + ActionItem::default() + }; + + let (outcome, peer_id) = event.accept(); + if outcome == CollectionNodeAccept::ReplacedExisting { + return (action, SwarmEvent::Replaced { + peer_id, + endpoint, + closed_multiaddr, + }); + } else { + return (action, SwarmEvent::Connected { peer_id, endpoint }); + } + } + + // Otherwise, try for outgoing attempts. + let is_outgoing_and_ok = if let Some(attempt) = reach_attempts.out_reach_attempts.get(event.peer_id()) { + attempt.id == event.reach_attempt_id() + } else { + false + }; + + // We only remove the attempt from `out_reach_attempts` if it both matches the reach id + // and the expected peer id. + if is_outgoing_and_ok { + let attempt = reach_attempts.out_reach_attempts.remove(event.peer_id()) + .expect("is_outgoing_and_ok is true only if reach_attempts.out_reach_attempts.get(event.peer_id()) \ + returned Some"); + + let closed_multiaddr = reach_attempts.connected_multiaddresses + .insert(event.peer_id().clone(), attempt.cur_attempted.clone()); + let endpoint = ConnectedPoint::Dialer { + address: attempt.cur_attempted, + }; + + let (outcome, peer_id) = event.accept(); + if outcome == CollectionNodeAccept::ReplacedExisting { + return (Default::default(), SwarmEvent::Replaced { + peer_id, + endpoint, + closed_multiaddr, + }); + } else { + return (Default::default(), SwarmEvent::Connected { peer_id, endpoint }); + } + } + + // If in neither, check outgoing reach attempts again as we may have a public + // key mismatch. + let expected_peer_id = reach_attempts + .out_reach_attempts + .iter() + .find(|(_, a)| a.id == event.reach_attempt_id()) + .map(|(p, _)| p.clone()); + if let Some(expected_peer_id) = expected_peer_id { + debug_assert_ne!(&expected_peer_id, event.peer_id()); + let attempt = reach_attempts.out_reach_attempts.remove(&expected_peer_id) + .expect("expected_peer_id is a key that is grabbed from out_reach_attempts"); + + let num_remain = attempt.next_attempts.len(); + let failed_addr = attempt.cur_attempted.clone(); + + let peer_id = event.deny(); + + let action = if !attempt.next_attempts.is_empty() { + let mut attempt = attempt; + let next = attempt.next_attempts.remove(0); + ActionItem { + start_dial_out: Some((expected_peer_id.clone(), next, attempt.next_attempts)), + .. Default::default() + } + } else { + Default::default() + }; + + return (action, SwarmEvent::PublicKeyMismatch { + remain_addrs_attempt: num_remain, + expected_peer_id, + actual_peer_id: peer_id, + multiaddr: failed_addr, + }); + } + + // We didn't find any entry in neither the outgoing connections not ingoing connections. + // TODO: improve proof or remove ; this is too complicated right now + panic!("The API of collection guarantees that the id sent back in NodeReached (which is where \ + we call handle_node_reached) is one that was passed to add_reach_attempt. Whenever we \ + call add_reach_attempt, we also insert at the same time an entry either in \ + out_reach_attempts or in other_reach_attempts. It is therefore guaranteed that we \ + find back this ID in either of these two sets"); +} + +/// Handles a reach error event from the collection. +/// +/// Optionally returns an event to return from the stream. +/// +/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise +/// > panics will likely happen. +fn handle_reach_error( + reach_attempts: &mut ReachAttempts, + reach_id: ReachAttemptId, + error: IoError, +) -> (ActionItem, SwarmEvent) +where TTrans: Transport +{ + // Search for the attempt in `out_reach_attempts`. + // TODO: could be more optimal than iterating over everything + let out_reach_peer_id = reach_attempts + .out_reach_attempts + .iter() + .find(|(_, a)| a.id == reach_id) + .map(|(p, _)| p.clone()); + if let Some(peer_id) = out_reach_peer_id { + let mut attempt = reach_attempts.out_reach_attempts.remove(&peer_id) + .expect("out_reach_peer_id is a key that is grabbed from out_reach_attempts"); + + let num_remain = attempt.next_attempts.len(); + let failed_addr = attempt.cur_attempted.clone(); + + let action = if !attempt.next_attempts.is_empty() { + let mut attempt = attempt; + let next_attempt = attempt.next_attempts.remove(0); + ActionItem { + start_dial_out: Some((peer_id.clone(), next_attempt, attempt.next_attempts)), + .. Default::default() + } + } else { + Default::default() + }; + + return (action, SwarmEvent::DialError { + remain_addrs_attempt: num_remain, + peer_id, + multiaddr: failed_addr, + error, + }); + } + + // If this is not an outgoing reach attempt, check the incoming reach attempts. + if let Some(in_pos) = reach_attempts + .other_reach_attempts + .iter() + .position(|i| i.0 == reach_id) + { + let (_, endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos); + match endpoint { + ConnectedPoint::Dialer { address } => { + return (Default::default(), SwarmEvent::UnknownPeerDialError { + multiaddr: address, + error, + }); + } + ConnectedPoint::Listener { listen_addr } => { + return (Default::default(), SwarmEvent::IncomingConnectionError { listen_addr, error }); + } + } + } + + // The id was neither in the outbound list nor the inbound list. + // TODO: improve proof or remove ; this is too complicated right now + panic!("The API of collection guarantees that the id sent back in ReachError events \ + (which is where we call handle_reach_error) is one that was passed to \ + add_reach_attempt. Whenever we call add_reach_attempt, we also insert \ + at the same time an entry either in out_reach_attempts or in \ + other_reach_attempts. It is therefore guaranteed that we find back this ID in \ + either of these two sets"); +} + /// State of a peer in the system. pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandlerBuild: 'a> where @@ -838,6 +979,7 @@ impl<'a, TInEvent, TOutEvent> PeerPendingConnect<'a, TInEvent, TOutEvent> { pub fn interrupt(self) { let attempt = self.attempt.remove(); if let Err(_) = self.active_nodes.interrupt(attempt.id) { + // TODO: improve proof or remove ; this is too complicated right now panic!("We retreived this attempt.id from out_reach_attempts. We insert in \ out_reach_attempts only at the same time as we call add_reach_attempt. \ Whenever we receive a NodeReached, NodeReplaced or ReachError event, which \ @@ -947,25 +1089,10 @@ where TInEvent: Send + 'static, TOutEvent: Send + 'static, { - let future = match self.nodes.transport().clone().dial(first.clone()) { - Ok(fut) => fut, - Err(_) => return Err(self), - }; - - let reach_id = self.nodes.active_nodes.add_reach_attempt(future, self.nodes.handler_build.new_handler()); - - let former = self.nodes.out_reach_attempts.insert( - self.peer_id.clone(), - OutReachAttempt { - id: reach_id, - cur_attempted: first, - next_attempts: rest, - }, - ); - debug_assert!(former.is_none()); + self.nodes.start_dial_out(self.peer_id.clone(), first, rest); Ok(PeerPendingConnect { - attempt: match self.nodes.out_reach_attempts.entry(self.peer_id) { + attempt: match self.nodes.reach_attempts.out_reach_attempts.entry(self.peer_id) { Entry::Occupied(e) => e, Entry::Vacant(_) => { panic!("We called out_reach_attempts.insert with this peer id just above") @@ -995,85 +1122,8 @@ where type Item = SwarmEvent; type Error = Void; // TODO: use `!` once stable + #[inline] fn poll(&mut self) -> Poll, Self::Error> { - // Start by polling the listeners for events. - match self.listeners.poll() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(Some(ListenersEvent::Incoming { - upgrade, - listen_addr, - }))) => { - let id = self.active_nodes.add_reach_attempt(upgrade, self.handler_build.new_handler()); - self.other_reach_attempts.push(( - id, - ConnectedPoint::Listener { - listen_addr: listen_addr.clone(), - }, - )); - return Ok(Async::Ready(Some(SwarmEvent::IncomingConnection { - listen_addr, - }))); - } - Ok(Async::Ready(Some(ListenersEvent::Closed { - listen_addr, - listener, - result, - }))) => { - return Ok(Async::Ready(Some(SwarmEvent::ListenerClosed { - listen_addr, - listener, - result, - }))); - } - Ok(Async::Ready(None)) => unreachable!("The listeners stream never finishes"), - Err(_) => unreachable!("The listeners stream never errors"), // TODO: remove variant - } - - // Poll the existing nodes. - loop { - match self.active_nodes.poll() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(CollectionEvent::NodeReached { peer_id, id }))) => { - let event = self.handle_node_reached(peer_id, id, false); - return Ok(Async::Ready(Some(event))); - } - Ok(Async::Ready(Some(CollectionEvent::NodeReplaced { - peer_id, - id, - }))) => { - let event = self.handle_node_reached(peer_id, id, true); - return Ok(Async::Ready(Some(event))); - } - Ok(Async::Ready(Some(CollectionEvent::ReachError { id, error }))) => { - if let Some(event) = self.handle_reach_error(id, error) { - return Ok(Async::Ready(Some(event))); - } - } - Ok(Async::Ready(Some(CollectionEvent::NodeError { - peer_id, - error, - }))) => { - let address = self.connected_multiaddresses.remove(&peer_id); - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); - return Ok(Async::Ready(Some(SwarmEvent::NodeError { - peer_id, - address, - error, - }))); - } - Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id }))) => { - let address = self.connected_multiaddresses.remove(&peer_id); - debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); - return Ok(Async::Ready(Some(SwarmEvent::NodeClosed { peer_id, address }))); - } - Ok(Async::Ready(Some(CollectionEvent::NodeEvent { peer_id, event }))) => { - return Ok(Async::Ready(Some(SwarmEvent::NodeEvent { peer_id, event }))); - } - Ok(Async::Ready(None)) => unreachable!("CollectionStream never ends"), - Err(_) => unreachable!("CollectionStream never errors"), - } - } - - Ok(Async::NotReady) + Ok(self.poll()) } }