diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 2c0dc823..d0a6d54f 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -35,20 +35,20 @@ use std::{error, fmt, hash::Hash, mem}; mod tests; /// Implementation of `Stream` that handles a collection of nodes. -pub struct CollectionStream { +pub struct CollectionStream { /// Object that handles the tasks. /// /// The user data contains the state of the task. If `Connected`, then a corresponding entry /// must be present in `nodes`. - inner: HandledNodesTasks, TPeerId>, + inner: HandledNodesTasks, TPeerId>, /// List of nodes, with the task id that handles this node. The corresponding entry in `tasks` /// must always be in the `Connected` state. nodes: FnvHashMap, } -impl fmt::Debug for - CollectionStream +impl fmt::Debug for + CollectionStream where TPeerId: fmt::Debug, { @@ -59,18 +59,18 @@ where /// State of a task. #[derive(Debug, Clone, PartialEq, Eq)] -enum TaskState { +enum TaskState { /// Task is attempting to reach a peer. Pending, /// The task is connected to a peer. - Connected(TPeerId), + Connected(TPeerId, TUserData), } /// Event that can happen on the `CollectionStream`. -pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TPeerId> { +pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TUserData, TPeerId> { /// A connection to a node has succeeded. You must use the provided event in order to accept /// the connection. - NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>), + NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>), /// A connection to a node has been closed. /// @@ -79,6 +79,8 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr NodeClosed { /// Identifier of the node. peer_id: TPeerId, + /// User data that was passed when accepting. + user_data: TUserData, }, /// A connection to a node has errored. @@ -89,6 +91,8 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr peer_id: TPeerId, /// The error that happened. error: HandledNodeError, + /// User data that was passed when accepting. + user_data: TUserData, }, /// An error happened on the future that was trying to reach a node. @@ -103,19 +107,20 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr /// A node has produced an event. NodeEvent { - /// Identifier of the node. - peer_id: TPeerId, + /// The node that has generated the event. + peer: PeerMut<'a, TInEvent, TUserData, TPeerId>, /// The produced event. event: TOutEvent, }, } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for - CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for + CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> where TOutEvent: fmt::Debug, TReachErr: fmt::Debug, THandlerErr: fmt::Debug, TPeerId: Eq + Hash + Clone + fmt::Debug, + TUserData: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match *self { @@ -124,14 +129,16 @@ where TOutEvent: fmt::Debug, .field(inner) .finish() }, - CollectionEvent::NodeClosed { ref peer_id } => { + CollectionEvent::NodeClosed { ref peer_id, ref user_data } => { f.debug_struct("CollectionEvent::NodeClosed") .field("peer_id", peer_id) + .field("user_data", user_data) .finish() }, - CollectionEvent::NodeError { ref peer_id, ref error } => { + CollectionEvent::NodeError { ref peer_id, ref error, ref user_data } => { f.debug_struct("CollectionEvent::NodeError") .field("peer_id", peer_id) + .field("user_data", user_data) .field("error", error) .finish() }, @@ -141,9 +148,9 @@ where TOutEvent: fmt::Debug, .field("error", error) .finish() }, - CollectionEvent::NodeEvent { ref peer_id, ref event } => { + CollectionEvent::NodeEvent { ref peer, ref event } => { f.debug_struct("CollectionEvent::NodeEvent") - .field("peer_id", peer_id) + .field("peer_id", peer.id()) .field("event", event) .finish() }, @@ -153,17 +160,17 @@ where TOutEvent: fmt::Debug, /// Event that happens when we reach a node. #[must_use = "The node reached event is used to accept the newly-opened connection"] -pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId = PeerId> { +pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> { /// Peer id we connected to. peer_id: TPeerId, /// The task id that reached the node. id: TaskId, /// The `CollectionStream` we are referencing. - parent: &'a mut CollectionStream, + parent: &'a mut CollectionStream, } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> - CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> where TPeerId: Eq + Hash + Clone, { @@ -187,21 +194,28 @@ where } /// Accepts the new node. - pub fn accept(self) -> (CollectionNodeAccept, TPeerId) { + pub fn accept(self, user_data: TUserData) -> (CollectionNodeAccept, TPeerId) { // Set the state of the task to `Connected`. let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id); *self.parent.inner.task(self.id) .expect("A CollectionReachEvent is only ever created from a valid attempt; QED") - .user_data_mut() = TaskState::Connected(self.peer_id.clone()); + .user_data_mut() = TaskState::Connected(self.peer_id.clone(), user_data); // It is possible that we already have a task connected to the same peer. In this // case, we need to emit a `NodeReplaced` event. let tasks = &mut self.parent.inner; let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) { - debug_assert!(*former_task.user_data() == TaskState::Connected(self.peer_id.clone())); - former_task.close(); + debug_assert!(match *former_task.user_data() { + TaskState::Connected(ref p, _) if *p == self.peer_id => true, + _ => false + }); + let user_data = match former_task.close() { + TaskState::Connected(_, user_data) => user_data, + _ => panic!("The former task was picked from `nodes`; all the nodes in `nodes` \ + are always in the connected state") + }; // TODO: we unfortunately have to clone the peer id here - (CollectionNodeAccept::ReplacedExisting, self.peer_id.clone()) + (CollectionNodeAccept::ReplacedExisting(user_data), self.peer_id.clone()) } else { // TODO: we unfortunately have to clone the peer id here (CollectionNodeAccept::NewEntry, self.peer_id.clone()) @@ -225,8 +239,8 @@ where } } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for - CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> where TPeerId: Eq + Hash + Clone + fmt::Debug, { @@ -238,8 +252,8 @@ where } } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop for - CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> Drop for + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> { fn drop(&mut self) { let task = self.parent.inner.task(self.id) @@ -254,9 +268,9 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop fo /// Outcome of accepting a node. #[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CollectionNodeAccept { - /// We replaced an existing node. - ReplacedExisting, +pub enum CollectionNodeAccept { + /// We replaced an existing node. Returns the user data that was assigned to this node. + ReplacedExisting(TUserData), /// We didn't replace anything existing. NewEntry, } @@ -265,8 +279,8 @@ pub enum CollectionNodeAccept { #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ReachAttemptId(TaskId); -impl - CollectionStream +impl + CollectionStream where TPeerId: Eq + Hash + Clone, { @@ -309,7 +323,7 @@ where None => Err(InterruptError::ReachAttemptNotFound), Some(task) => { match task.user_data() { - TaskState::Connected(_) => return Err(InterruptError::AlreadyReached), + TaskState::Connected(_, _) => return Err(InterruptError::AlreadyReached), TaskState::Pending => (), }; @@ -332,7 +346,7 @@ where /// /// Returns `None` if we don't have a connection to this peer. #[inline] - pub fn peer_mut(&mut self, id: &TPeerId) -> Option> { + pub fn peer_mut(&mut self, id: &TPeerId) -> Option> { let task = match self.nodes.get(id) { Some(&task) => task, None => return None, @@ -368,7 +382,7 @@ where /// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to /// > remove the `Err` variant, but also because we want the `CollectionStream` to stay /// > borrowed if necessary. - pub fn poll(&mut self) -> Async> { + pub fn poll(&mut self) -> Async> { let item = match self.inner.poll() { Async::Ready(item) => item, Async::NotReady => return Async::NotReady, @@ -399,24 +413,26 @@ where panic!("The HandledNodesTasks is guaranteed to always return the handler \ when producing a TaskClosedEvent::Reach error"); }, - (TaskState::Connected(peer_id), Ok(()), _handler) => { + (TaskState::Connected(peer_id, user_data), Ok(()), _handler) => { debug_assert!(_handler.is_none()); let _node_task_id = self.nodes.remove(&peer_id); debug_assert_eq!(_node_task_id, Some(id)); Async::Ready(CollectionEvent::NodeClosed { peer_id, + user_data, }) }, - (TaskState::Connected(peer_id), Err(TaskClosedEvent::Node(err)), _handler) => { + (TaskState::Connected(peer_id, user_data), Err(TaskClosedEvent::Node(err)), _handler) => { debug_assert!(_handler.is_none()); let _node_task_id = self.nodes.remove(&peer_id); debug_assert_eq!(_node_task_id, Some(id)); Async::Ready(CollectionEvent::NodeError { peer_id, error: err, + user_data, }) }, - (TaskState::Connected(_), Err(TaskClosedEvent::Reach(_)), _) => { + (TaskState::Connected(_, _), Err(TaskClosedEvent::Reach(_)), _) => { panic!("A TaskClosedEvent::Reach can only happen before we are connected \ to a node; therefore the TaskState won't be Connected; QED"); }, @@ -433,15 +449,20 @@ where }, HandledNodesEvent::NodeEvent { task, event } => { let peer_id = match task.user_data() { - TaskState::Connected(peer_id) => peer_id.clone(), + TaskState::Connected(peer_id, _) => peer_id.clone(), _ => panic!("we can only receive NodeEvent events from a task after we \ received a corresponding NodeReached event from that same task; \ when we receive a NodeReached event, we ensure that the entry in \ self.tasks is switched to the Connected state; QED"), }; - + drop(task); Async::Ready(CollectionEvent::NodeEvent { - peer_id, + // TODO: normally we'd build a `PeerMut` manually here, but the borrow checker + // doesn't like it + peer: self.peer_mut(&peer_id) + .expect("we can only receive NodeEvent events from a task after we \ + received a corresponding NodeReached event from that same task;\ + when that happens, peer_mut will always return Some; QED"), event, }) } @@ -476,33 +497,60 @@ impl fmt::Display for InterruptError { impl error::Error for InterruptError {} /// Access to a peer in the collection. -pub struct PeerMut<'a, TInEvent, TPeerId = PeerId> { - inner: HandledNodesTask<'a, TInEvent, TaskState>, +pub struct PeerMut<'a, TInEvent, TUserData, TPeerId = PeerId> { + inner: HandledNodesTask<'a, TInEvent, TaskState>, nodes: &'a mut FnvHashMap, } -impl<'a, TInEvent, TPeerId> PeerMut<'a, TInEvent, TPeerId> +impl<'a, TInEvent, TUserData, TPeerId> PeerMut<'a, TInEvent, TUserData, TPeerId> where TPeerId: Eq + Hash, { + /// Returns the identifier of the peer. + pub fn id(&self) -> &TPeerId { + match self.inner.user_data() { + TaskState::Connected(peer_id, _) => peer_id, + _ => panic!("A PeerMut is only ever constructed from a peer in the connected \ + state; QED") + } + } + + /// Returns the user data that was stored in the collections when we accepted the connection. + pub fn user_data(&self) -> &TUserData { + match self.inner.user_data() { + TaskState::Connected(_, user_data) => user_data, + _ => panic!("A PeerMut is only ever constructed from a peer in the connected \ + state; QED") + } + } + + /// Returns the user data that was stored in the collections when we accepted the connection. + pub fn user_data_mut(&mut self) -> &mut TUserData { + match self.inner.user_data_mut() { + TaskState::Connected(_, user_data) => user_data, + _ => panic!("A PeerMut is only ever constructed from a peer in the connected \ + state; QED") + } + } + /// Sends an event to the given node. #[inline] pub fn send_event(&mut self, event: TInEvent) { self.inner.send_event(event) } - /// Closes the connections to this node. + /// Closes the connections to this node. Returns the user data. /// /// No further event will be generated for this node. - pub fn close(self) { - if let TaskState::Connected(peer_id) = self.inner.user_data() { + pub fn close(self) -> TUserData { + let task_id = self.inner.id(); + if let TaskState::Connected(peer_id, user_data) = self.inner.close() { let old_task_id = self.nodes.remove(&peer_id); - debug_assert_eq!(old_task_id, Some(self.inner.id())); + debug_assert_eq!(old_task_id, Some(task_id)); + user_data } else { panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \ nodes always matched a Connected entry in the tasks; QED"); - }; - - self.inner.close(); + } } } diff --git a/core/src/nodes/collection/tests.rs b/core/src/nodes/collection/tests.rs index 779d36fb..58125047 100644 --- a/core/src/nodes/collection/tests.rs +++ b/core/src/nodes/collection/tests.rs @@ -31,7 +31,7 @@ use crate::nodes::NodeHandlerEvent; use std::{io, sync::Arc}; use parking_lot::Mutex; -type TestCollectionStream = CollectionStream; +type TestCollectionStream = CollectionStream; #[test] fn has_connection_is_false_before_a_connection_has_been_made() { @@ -107,7 +107,7 @@ fn accepting_a_node_yields_new_entry() { } 2 => { assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - let (accept_ev, accepted_peer_id) = reach_ev.accept(); + let (accept_ev, accepted_peer_id) = reach_ev.accept(()); assert_eq!(accepted_peer_id, peer_id); assert_matches!(accept_ev, CollectionNodeAccept::NewEntry); }); @@ -159,7 +159,7 @@ fn events_in_a_node_reaches_the_collection_stream() { let mut cs = cs_fut.lock(); cs.broadcast_event(&InEvent::NextState); assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); + reach_ev.accept(()); }); Ok(Async::Ready(())) })).expect("tokio works"); @@ -168,7 +168,7 @@ fn events_in_a_node_reaches_the_collection_stream() { rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); cs.broadcast_event(&InEvent::NextState); - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => { assert_matches!(event, OutEvent::Custom("init")); }); Ok(Async::Ready(())) @@ -179,7 +179,7 @@ fn events_in_a_node_reaches_the_collection_stream() { rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); cs.broadcast_event(&InEvent::NextState); - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => { assert_matches!(event, OutEvent::Custom("from handler 1")); }); Ok(Async::Ready(())) @@ -189,7 +189,7 @@ fn events_in_a_node_reaches_the_collection_stream() { rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); cs.broadcast_event(&InEvent::NextState); - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => { assert_matches!(event, OutEvent::Custom("from handler 2")); }); Ok(Async::Ready(())) @@ -253,7 +253,7 @@ fn task_closed_with_error_when_task_is_connected_yields_node_error() { let mut cs = cs_fut.lock(); // NodeReached, accept the connection so the task transitions from Pending to Connected assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); + reach_ev.accept(()); }); Ok(Async::Ready(())) })).expect("tokio works"); @@ -299,7 +299,7 @@ fn task_closed_ok_when_task_is_connected_yields_node_closed() { let mut cs = cs_fut.lock(); // NodeReached, accept the connection so the task transitions from Pending to Connected assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); + reach_ev.accept(()); }); Ok(Async::Ready(())) })).expect("tokio works"); @@ -312,7 +312,7 @@ fn task_closed_ok_when_task_is_connected_yields_node_closed() { rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); // Node is closed normally: TaskClosed, Ok(()) - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event }) => { + assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event, .. }) => { assert_eq!(peer_id_in_event, peer_id); }); Ok(Async::Ready(())) @@ -363,7 +363,7 @@ fn interrupting_an_established_connection_is_err() { let mut cs = cs_fut.lock(); // NodeReached, accept the connection so the task transitions from Pending to Connected assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(); + reach_ev.accept(()); }); Ok(Async::Ready(())) })).expect("tokio works"); diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index e4b8f072..700724ed 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -369,11 +369,11 @@ impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> { *self.inner.key() } - /// Closes the task. + /// Closes the task. Returns the user data. /// /// No further event will be generated for this task. - pub fn close(self) { - self.inner.remove(); + pub fn close(self) -> TUserData { + self.inner.remove().1 } } diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 192f5eeb..faec6030 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -61,7 +61,7 @@ where listeners: ListenersStream, /// The nodes currently active. - active_nodes: CollectionStream, THandlerErr, TPeerId>, + active_nodes: CollectionStream, THandlerErr, (), TPeerId>, /// The reach attempts of the swarm. /// This needs to be a separate struct in order to handle multiple mutable borrows issues. @@ -507,7 +507,7 @@ where TTrans: Transport /// Address used to send back data to the remote. send_back_addr: Multiaddr, /// Reference to the `active_nodes` field of the swarm. - active_nodes: &'a mut CollectionStream, THandlerErr, TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), TPeerId>, /// Reference to the `other_reach_attempts` field of the swarm. other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>, } @@ -998,6 +998,7 @@ where Async::Ready(CollectionEvent::NodeError { peer_id, error, + .. }) => { let endpoint = self.reach_attempts.connected_points.remove(&peer_id) .expect("We insert into connected_points whenever a connection is \ @@ -1012,7 +1013,7 @@ where error, }; } - Async::Ready(CollectionEvent::NodeClosed { peer_id }) => { + Async::Ready(CollectionEvent::NodeClosed { peer_id, .. }) => { let endpoint = self.reach_attempts.connected_points.remove(&peer_id) .expect("We insert into connected_points whenever a connection is \ opened and remove only when a connection is closed; the \ @@ -1022,9 +1023,9 @@ where action = Default::default(); out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint }; } - Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => { + Async::Ready(CollectionEvent::NodeEvent { peer, event }) => { action = Default::default(); - out_event = RawSwarmEvent::NodeEvent { peer_id, event }; + out_event = RawSwarmEvent::NodeEvent { peer_id: peer.id().clone(), event }; } } @@ -1073,7 +1074,7 @@ impl Default for ActionItem { /// > panics will likely happen. fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>( reach_attempts: &mut ReachAttempts, - event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr, THandlerErr, TPeerId>, + event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr, THandlerErr, (), TPeerId>, ) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>) where TTrans: Transport + Clone, @@ -1133,8 +1134,8 @@ where } }; - let (outcome, peer_id) = event.accept(); - if outcome == CollectionNodeAccept::ReplacedExisting { + let (outcome, peer_id) = event.accept(()); + if let CollectionNodeAccept::ReplacedExisting(()) = outcome { let closed_endpoint = closed_endpoint .expect("We insert into connected_points whenever a connection is opened and \ remove only when a connection is closed; the underlying API is \ @@ -1171,8 +1172,8 @@ where let closed_endpoint = reach_attempts.connected_points .insert(event.peer_id().clone(), opened_endpoint.clone()); - let (outcome, peer_id) = event.accept(); - if outcome == CollectionNodeAccept::ReplacedExisting { + let (outcome, peer_id) = event.accept(()); + if let CollectionNodeAccept::ReplacedExisting(()) = outcome { let closed_endpoint = closed_endpoint .expect("We insert into connected_points whenever a connection is opened and \ remove only when a connection is closed; the underlying API is guaranteed \ @@ -1519,7 +1520,7 @@ pub struct PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, where TTrans: Transport, { /// Reference to the `active_nodes` of the parent. - active_nodes: &'a mut CollectionStream, THandlerErr, TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), TPeerId>, /// Reference to the `connected_points` field of the parent. connected_points: &'a mut FnvHashMap, /// Reference to the `out_reach_attempts` field of the parent. @@ -1576,7 +1577,7 @@ where TTrans: Transport { attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>, - active_nodes: &'a mut CollectionStream, THandlerErr, TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), TPeerId>, } impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>