diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 2ba0f6ae..9c801a95 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -35,22 +35,22 @@ use std::{error, fmt, hash::Hash, mem}; mod tests; /// Implementation of `Stream` that handles a collection of nodes. -pub struct CollectionStream { +pub struct CollectionStream { /// Object that handles the tasks. /// /// The user data contains the state of the task. If `Connected`, then a corresponding entry /// must be present in `nodes`. - inner: HandledNodesTasks, TPeerId>, + inner: HandledNodesTasks, TConnInfo>, /// List of nodes, with the task id that handles this node. The corresponding entry in `tasks` /// must always be in the `Connected` state. nodes: FnvHashMap, } -impl fmt::Debug for - CollectionStream +impl fmt::Debug for + CollectionStream where - TPeerId: fmt::Debug, + TConnInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_tuple("CollectionStream").finish() @@ -59,25 +59,25 @@ where /// State of a task. #[derive(Debug, Clone, PartialEq, Eq)] -enum TaskState { +enum TaskState { /// Task is attempting to reach a peer. Pending, /// The task is connected to a peer. - Connected(TPeerId, TUserData), + Connected(TConnInfo, TUserData), } /// Event that can happen on the `CollectionStream`. -pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TUserData, TPeerId> { +pub enum CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> { /// A connection to a node has succeeded. You must use the provided event in order to accept /// the connection. - NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>), + NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>), /// A connection to a node has errored. /// /// Can only happen after a node has been successfully reached. NodeClosed { - /// Identifier of the node. - peer_id: TPeerId, + /// Information about the connection. + conn_info: TConnInfo, /// The error that happened. error: HandledNodeError, /// User data that was passed when accepting. @@ -97,18 +97,18 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr /// A node has produced an event. NodeEvent { /// The node that has generated the event. - peer: PeerMut<'a, TInEvent, TUserData, TPeerId>, + peer: PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId>, /// The produced event. event: TOutEvent, }, } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for - CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for + CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> where TOutEvent: fmt::Debug, TReachErr: fmt::Debug, THandlerErr: fmt::Debug, - TPeerId: Eq + Hash + Clone + fmt::Debug, + TConnInfo: fmt::Debug, TUserData: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { @@ -118,9 +118,9 @@ where TOutEvent: fmt::Debug, .field(inner) .finish() }, - CollectionEvent::NodeClosed { ref peer_id, ref error, ref user_data } => { + CollectionEvent::NodeClosed { ref conn_info, ref error, ref user_data } => { f.debug_struct("CollectionEvent::NodeClosed") - .field("peer_id", peer_id) + .field("conn_info", conn_info) .field("user_data", user_data) .field("error", error) .finish() @@ -133,7 +133,7 @@ where TOutEvent: fmt::Debug, }, CollectionEvent::NodeEvent { ref peer, ref event } => { f.debug_struct("CollectionEvent::NodeEvent") - .field("peer_id", peer.id()) + .field("conn_info", peer.info()) .field("event", event) .finish() }, @@ -143,24 +143,30 @@ where TOutEvent: fmt::Debug, /// Event that happens when we reach a node. #[must_use = "The node reached event is used to accept the newly-opened connection"] -pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> { - /// Peer id we connected to. - peer_id: TPeerId, +pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId, TPeerId = PeerId> { + /// Information about the connection, or `None` if it's been extracted. + conn_info: Option, /// The task id that reached the node. id: TaskId, /// The `CollectionStream` we are referencing. - parent: &'a mut CollectionStream, + parent: &'a mut CollectionStream, } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> - CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> -where - TPeerId: Eq + Hash + Clone, +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> { - /// Returns the peer id of the node that has been reached. - #[inline] - pub fn peer_id(&self) -> &TPeerId { - &self.peer_id + /// Returns the information of the connection. + pub fn connection_info(&self) -> &TConnInfo { + self.conn_info.as_ref().expect("conn_info is always Some when the object is alive; QED") + } + + /// Returns the identity of the node we connected to. + pub fn peer_id(&self) -> &TPeerId + where + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash, + { + self.connection_info().peer_id() } /// Returns the reach attempt that reached the node. @@ -168,40 +174,54 @@ where pub fn reach_attempt_id(&self) -> ReachAttemptId { ReachAttemptId(self.id) } +} +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> +where + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash, +{ /// Returns `true` if accepting this reached node would replace an existing connection to that /// node. #[inline] pub fn would_replace(&self) -> bool { - self.parent.nodes.contains_key(&self.peer_id) + self.parent.nodes.contains_key(self.connection_info().peer_id()) } /// Accepts the new node. - pub fn accept(self, user_data: TUserData) -> (CollectionNodeAccept, TPeerId) { + pub fn accept(mut self, user_data: TUserData) -> (CollectionNodeAccept, TConnInfo) + where + // TODO: these two clones shouldn't be necessary if we return references + TConnInfo: Clone, + TPeerId: Clone, + { + let self_conn_info = self.conn_info.take() + .expect("conn_info is always Some when the object is alive; QED"); + // Set the state of the task to `Connected`. - let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id); + let former_task_id = self.parent.nodes.insert(self_conn_info.peer_id().clone(), self.id); *self.parent.inner.task(self.id) .expect("A CollectionReachEvent is only ever created from a valid attempt; QED") - .user_data_mut() = TaskState::Connected(self.peer_id.clone(), user_data); + .user_data_mut() = TaskState::Connected(self_conn_info.clone(), user_data); // It is possible that we already have a task connected to the same peer. In this // case, we need to emit a `NodeReplaced` event. let tasks = &mut self.parent.inner; let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) { debug_assert!(match *former_task.user_data() { - TaskState::Connected(ref p, _) if *p == self.peer_id => true, + TaskState::Connected(ref p, _) if p.peer_id() == self_conn_info.peer_id() => true, _ => false }); - let user_data = match former_task.close().into_user_data() { - TaskState::Connected(_, user_data) => user_data, + let (old_info, user_data) = match former_task.close().into_user_data() { + TaskState::Connected(old_info, user_data) => (old_info, user_data), _ => panic!("The former task was picked from `nodes`; all the nodes in `nodes` \ are always in the connected state") }; - // TODO: we unfortunately have to clone the peer id here - (CollectionNodeAccept::ReplacedExisting(user_data), self.peer_id.clone()) + (CollectionNodeAccept::ReplacedExisting(old_info, user_data), self_conn_info) + } else { - // TODO: we unfortunately have to clone the peer id here - (CollectionNodeAccept::NewEntry, self.peer_id.clone()) + (CollectionNodeAccept::NewEntry, self_conn_info) }; // Don't run the destructor. @@ -214,29 +234,29 @@ where /// /// Has the same effect as dropping the event without accepting it. #[inline] - pub fn deny(self) -> TPeerId { - // TODO: we unfortunately have to clone the id here, in order to be explicit - let peer_id = self.peer_id.clone(); + pub fn deny(mut self) -> TConnInfo { + let conn_info = self.conn_info.take() + .expect("conn_info is always Some when the object is alive; QED"); drop(self); // Just to be explicit - peer_id + conn_info } } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for - CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> where - TPeerId: Eq + Hash + Clone + fmt::Debug, + TConnInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("CollectionReachEvent") - .field("peer_id", &self.peer_id) + .field("conn_info", &self.conn_info) .field("reach_attempt_id", &self.reach_attempt_id()) .finish() } } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> Drop for - CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> Drop for + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> { fn drop(&mut self) { let task = self.parent.inner.task(self.id) @@ -251,9 +271,10 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeer /// Outcome of accepting a node. #[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CollectionNodeAccept { - /// We replaced an existing node. Returns the user data that was assigned to this node. - ReplacedExisting(TUserData), +pub enum CollectionNodeAccept { + /// We replaced an existing node. Returns the information about the old connection and the + /// user data that was assigned to this node. + ReplacedExisting(TConnInfo, TUserData), /// We didn't replace anything existing. NewEntry, } @@ -262,10 +283,28 @@ pub enum CollectionNodeAccept { #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ReachAttemptId(TaskId); -impl - CollectionStream +/// Information about a connection. +pub trait ConnectionInfo { + /// Identity of the node we are connected to. + type PeerId: Eq + Hash; + + /// Returns the identity of the node we are connected to on this connection. + fn peer_id(&self) -> &Self::PeerId; +} + +impl ConnectionInfo for PeerId { + type PeerId = PeerId; + + fn peer_id(&self) -> &PeerId { + self + } +} + +impl + CollectionStream where - TPeerId: Eq + Hash + Clone, + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash, { /// Creates a new empty collection. #[inline] @@ -283,8 +322,8 @@ where pub fn add_reach_attempt(&mut self, future: TFut, handler: THandler) -> ReachAttemptId where - TFut: Future + Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + TFut: Future + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? TReachErr: error::Error + Send + 'static, @@ -293,7 +332,7 @@ where TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required - TPeerId: Send + 'static, + TConnInfo: Send + 'static, { ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler)) } @@ -301,7 +340,7 @@ where /// Interrupts a reach attempt. /// /// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid. - pub fn interrupt(&mut self, id: ReachAttemptId) -> Result, InterruptError> { + pub fn interrupt(&mut self, id: ReachAttemptId) -> Result, InterruptError> { match self.inner.task(id.0) { None => Err(InterruptError::ReachAttemptNotFound), Some(task) => { @@ -330,7 +369,7 @@ where /// /// Returns `None` if we don't have a connection to this peer. #[inline] - pub fn peer_mut(&mut self, id: &TPeerId) -> Option> { + pub fn peer_mut(&mut self, id: &TPeerId) -> Option> { let task = match self.nodes.get(id) { Some(&task) => task, None => return None, @@ -366,7 +405,10 @@ where /// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to /// > remove the `Err` variant, but also because we want the `CollectionStream` to stay /// > borrowed if necessary. - pub fn poll(&mut self) -> Async> { + pub fn poll(&mut self) -> Async> + where + TConnInfo: Clone, // TODO: Clone shouldn't be necessary + { let item = match self.inner.poll() { Async::Ready(item) => item, Async::NotReady => return Async::NotReady, @@ -395,12 +437,12 @@ where panic!("The HandledNodesTasks is guaranteed to always return the handler \ when producing a TaskClosedEvent::Reach error"); }, - (TaskState::Connected(peer_id, user_data), TaskClosedEvent::Node(err), _handler) => { + (TaskState::Connected(conn_info, user_data), TaskClosedEvent::Node(err), _handler) => { debug_assert!(_handler.is_none()); - let _node_task_id = self.nodes.remove(&peer_id); + let _node_task_id = self.nodes.remove(conn_info.peer_id()); debug_assert_eq!(_node_task_id, Some(id)); Async::Ready(CollectionEvent::NodeClosed { - peer_id, + conn_info, error: err, user_data, }) @@ -411,18 +453,18 @@ where }, } }, - HandledNodesEvent::NodeReached { task, peer_id } => { + HandledNodesEvent::NodeReached { task, conn_info } => { let id = task.id(); drop(task); Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent { parent: self, id, - peer_id, + conn_info: Some(conn_info), })) }, HandledNodesEvent::NodeEvent { task, event } => { - let peer_id = match task.user_data() { - TaskState::Connected(peer_id, _) => peer_id.clone(), + let conn_info = match task.user_data() { + TaskState::Connected(conn_info, _) => conn_info.clone(), _ => panic!("we can only receive NodeEvent events from a task after we \ received a corresponding NodeReached event from that same task; \ when we receive a NodeReached event, we ensure that the entry in \ @@ -432,7 +474,7 @@ where Async::Ready(CollectionEvent::NodeEvent { // TODO: normally we'd build a `PeerMut` manually here, but the borrow checker // doesn't like it - peer: self.peer_mut(&peer_id) + peer: self.peer_mut(&conn_info.peer_id()) .expect("we can only receive NodeEvent events from a task after we \ received a corresponding NodeReached event from that same task;\ when that happens, peer_mut will always return Some; QED"), @@ -470,14 +512,14 @@ impl fmt::Display for InterruptError { impl error::Error for InterruptError {} /// Reach attempt after it has been interrupted. -pub struct InterruptedReachAttempt { - inner: ClosedTask>, +pub struct InterruptedReachAttempt { + inner: ClosedTask>, } -impl fmt::Debug for InterruptedReachAttempt +impl fmt::Debug for InterruptedReachAttempt where TUserData: fmt::Debug, - TPeerId: fmt::Debug, + TConnInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_tuple("InterruptedReachAttempt") @@ -487,23 +529,31 @@ where } /// Access to a peer in the collection. -pub struct PeerMut<'a, TInEvent, TUserData, TPeerId = PeerId> { - inner: HandledNodesTask<'a, TInEvent, TaskState>, +pub struct PeerMut<'a, TInEvent, TUserData, TConnInfo = PeerId, TPeerId = PeerId> { + inner: HandledNodesTask<'a, TInEvent, TaskState>, nodes: &'a mut FnvHashMap, } -impl<'a, TInEvent, TUserData, TPeerId> PeerMut<'a, TInEvent, TUserData, TPeerId> -where - TPeerId: Eq + Hash, -{ - /// Returns the identifier of the peer. - pub fn id(&self) -> &TPeerId { +impl<'a, TInEvent, TUserData, TConnInfo, TPeerId> PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId> { + /// Returns the information of the connection with the peer. + pub fn info(&self) -> &TConnInfo { match self.inner.user_data() { - TaskState::Connected(peer_id, _) => peer_id, + TaskState::Connected(conn_info, _) => conn_info, _ => panic!("A PeerMut is only ever constructed from a peer in the connected \ state; QED") } } +} + +impl<'a, TInEvent, TUserData, TConnInfo, TPeerId> PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId> +where + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash, +{ + /// Returns the identity of the peer. + pub fn id(&self) -> &TPeerId { + self.info().peer_id() + } /// Returns the user data that was stored in the collections when we accepted the connection. pub fn user_data(&self) -> &TUserData { @@ -534,8 +584,8 @@ where /// No further event will be generated for this node. pub fn close(self) -> TUserData { let task_id = self.inner.id(); - if let TaskState::Connected(peer_id, user_data) = self.inner.close().into_user_data() { - let old_task_id = self.nodes.remove(&peer_id); + if let TaskState::Connected(conn_info, user_data) = self.inner.close().into_user_data() { + let old_task_id = self.nodes.remove(conn_info.peer_id()); debug_assert_eq!(old_task_id, Some(task_id)); user_data } else { @@ -551,7 +601,7 @@ where /// The reach attempt will only be effectively cancelled once the peer (the object you're /// manipulating) has received some network activity. However no event will be ever be /// generated from this reach attempt, and this takes effect immediately. - pub fn take_over(&mut self, id: InterruptedReachAttempt) { + pub fn take_over(&mut self, id: InterruptedReachAttempt) { let _state = self.inner.take_over(id.inner); debug_assert!(if let TaskState::Pending = _state { true } else { false }); } diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 367d5255..4079b351 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -56,7 +56,7 @@ mod tests; // conditions in the user's code. See similar comments in the documentation of `NodeStream`. /// Implementation of `Stream` that handles a collection of nodes. -pub struct HandledNodesTasks { +pub struct HandledNodesTasks { /// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts /// the task. It is possible that we receive messages from tasks that used to be in this list /// but no longer are, in which case we should ignore them. @@ -73,13 +73,13 @@ pub struct HandledNodesTasks + Send>>, /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. - events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, /// Receiver side for the events. - events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, + events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, } -impl fmt::Debug for - HandledNodesTasks +impl fmt::Debug for + HandledNodesTasks where TUserData: fmt::Debug { @@ -126,30 +126,31 @@ where } /// Prototype for a `NodeHandler`. -pub trait IntoNodeHandler { +pub trait IntoNodeHandler { /// The node handler. type Handler: NodeHandler; /// Builds the node handler. /// - /// The `TPeerId` is the id of the node the handler is going to handle. - fn into_handler(self, remote_peer_id: &TPeerId) -> Self::Handler; + /// The `TConnInfo` is the information about the connection that the handler is going to handle. + /// This is generated by the `Transport` and typically implements the `ConnectionInfo` trait. + fn into_handler(self, remote_conn_info: &TConnInfo) -> Self::Handler; } -impl IntoNodeHandler for T +impl IntoNodeHandler for T where T: NodeHandler { type Handler = Self; #[inline] - fn into_handler(self, _: &TPeerId) -> Self { + fn into_handler(self, _: &TConnInfo) -> Self { self } } /// Event that can happen on the `HandledNodesTasks`. #[derive(Debug)] -pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> { +pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> { /// A task has been closed. /// /// This happens once the node handler closes or an error happens. @@ -169,7 +170,7 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa /// The task that succeeded. task: Task<'a, TInEvent, TUserData>, /// Identifier of the node. - peer_id: TPeerId, + conn_info: TConnInfo, }, /// A task has produced an event. @@ -185,8 +186,8 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct TaskId(usize); -impl - HandledNodesTasks +impl + HandledNodesTasks { /// Creates a new empty collection. #[inline] @@ -209,8 +210,8 @@ impl(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId where - TFut: Future + Send + 'static, - TIntoHandler: IntoNodeHandler + Send + 'static, + TFut: Future + Send + 'static, + TIntoHandler: IntoNodeHandler + Send + 'static, TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, TReachErr: error::Error + Send + 'static, THandlerErr: error::Error + Send + 'static, @@ -219,7 +220,7 @@ impl::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required - TPeerId: Send + 'static, + TConnInfo: Send + 'static, { let task_id = self.next_task_id; self.next_task_id.0 += 1; @@ -273,7 +274,7 @@ impl Async> { + pub fn poll(&mut self) -> Async> { let (message, task_id) = match self.poll_inner() { Async::Ready(r) => r, Async::NotReady => return Async::NotReady, @@ -289,13 +290,13 @@ impl { + InToExtMessage::NodeReached(conn_info) => { HandledNodesEvent::NodeReached { task: match self.tasks.entry(task_id) { Entry::Occupied(inner) => Task { inner }, Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED") }, - peer_id + conn_info } }, InToExtMessage::TaskClosed(result, handler) => { @@ -318,7 +319,7 @@ impl Async<(InToExtMessage, TaskId)> { + fn poll_inner(&mut self) -> Async<(InToExtMessage, TaskId)> { for to_spawn in self.to_spawn.drain() { // We try to use the default executor, but fall back to polling the task manually if // no executor is available. This makes it possible to use the core in environments @@ -490,9 +491,9 @@ enum ExtToInMessage { /// Message to transmit from a task to the public API. #[derive(Debug)] -enum InToExtMessage { +enum InToExtMessage { /// A connection to a node has succeeded. - NodeReached(TPeerId), + NodeReached(TConnInfo), /// The task closed. TaskClosed(TaskClosedEvent, Option), /// An event from the node. @@ -501,28 +502,28 @@ enum InToExtMessage { /// Implementation of `Future` that handles a single node, and all the communications between /// the various components of the `HandledNodesTasks`. -struct NodeTask +struct NodeTask where TMuxer: StreamMuxer, - TIntoHandler: IntoNodeHandler, + TIntoHandler: IntoNodeHandler, TIntoHandler::Handler: NodeHandler>, { /// Sender to transmit events to the outside. - events_tx: mpsc::UnboundedSender<(InToExtMessage::Error, TPeerId>, TaskId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage::Error, TConnInfo>, TaskId)>, /// Receiving end for events sent from the main `HandledNodesTasks`. in_events_rx: stream::Fuse>>, /// Inner state of the `NodeTask`. - inner: NodeTaskInner, + inner: NodeTaskInner, /// Identifier of the attempt. id: TaskId, /// Channels to keep alive for as long as we don't have an acknowledgment from the remote. taken_over: SmallVec<[mpsc::UnboundedSender>; 1]>, } -enum NodeTaskInner +enum NodeTaskInner where TMuxer: StreamMuxer, - TIntoHandler: IntoNodeHandler, + TIntoHandler: IntoNodeHandler, TIntoHandler::Handler: NodeHandler>, { /// Future to resolve to connect to the node. @@ -547,12 +548,12 @@ where Poisoned, } -impl Future for - NodeTask +impl Future for + NodeTask where TMuxer: StreamMuxer, - TFut: Future, - TIntoHandler: IntoNodeHandler, + TFut: Future, + TIntoHandler: IntoNodeHandler, TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, { type Item = (); @@ -579,9 +580,9 @@ where } // Check whether dialing succeeded. match future.poll() { - Ok(Async::Ready((peer_id, muxer))) => { - let mut node = HandledNode::new(muxer, handler.into_handler(&peer_id)); - let event = InToExtMessage::NodeReached(peer_id); + Ok(Async::Ready((conn_info, muxer))) => { + let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info)); + let event = InToExtMessage::NodeReached(conn_info); for event in events_buffer { node.inject_event(event); } diff --git a/core/src/nodes/mod.rs b/core/src/nodes/mod.rs index a5d74d33..9950c8b6 100644 --- a/core/src/nodes/mod.rs +++ b/core/src/nodes/mod.rs @@ -33,6 +33,7 @@ pub mod listeners; pub mod node; pub mod raw_swarm; +pub use self::collection::ConnectionInfo; pub use self::node::Substream; pub use self::handled_node::{NodeHandlerEvent, NodeHandlerEndpoint}; pub use self::raw_swarm::{ConnectedPoint, Peer, RawSwarm, RawSwarmEvent}; diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 5f1c22a9..f7b2e493 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -27,6 +27,7 @@ use crate::{ CollectionNodeAccept, CollectionReachEvent, CollectionStream, + ConnectionInfo, ReachAttemptId }, handled_node::{ @@ -53,7 +54,7 @@ use std::{ mod tests; /// Implementation of `Stream` that handles the nodes. -pub struct RawSwarm +pub struct RawSwarm where TTrans: Transport, { @@ -61,7 +62,7 @@ where listeners: ListenersStream, /// The nodes currently active. - active_nodes: CollectionStream, THandlerErr, (), TPeerId>, + active_nodes: CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, /// The reach attempts of the swarm. /// This needs to be a separate struct in order to handle multiple mutable borrows issues. @@ -71,10 +72,11 @@ where incoming_limit: Option, } -impl fmt::Debug for - RawSwarm +impl fmt::Debug for + RawSwarm where - TTrans: Transport + fmt::Debug, + TTrans: fmt::Debug + Transport, + TConnInfo: fmt::Debug, TPeerId: fmt::Debug + Eq + Hash, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { @@ -130,7 +132,7 @@ struct OutReachAttempt { } /// Event that can happen on the `RawSwarm`. -pub enum RawSwarmEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a = PeerId> +pub enum RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId, TPeerId = PeerId> where TTrans: Transport, { @@ -145,7 +147,7 @@ where }, /// A new connection arrived on a listener. - IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), + IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>), /// A new connection was arriving on a listener, but an error happened when negotiating it. /// @@ -162,8 +164,8 @@ where /// A new connection to a peer has been opened. Connected { - /// Id of the peer. - peer_id: TPeerId, + /// Information about the connection, including the peer ID. + conn_info: TConnInfo, /// If `Listener`, then we received the connection. If `Dial`, then it's a connection that /// we opened. endpoint: ConnectedPoint, @@ -171,8 +173,12 @@ where /// A connection to a peer has been replaced with a new one. Replaced { - /// Id of the peer. - peer_id: TPeerId, + /// Information about the new connection. The `TPeerId` is the same as the one as the one + /// in `old_info`. + new_info: TConnInfo, + /// Information about the old connection. The `TPeerId` is the same as the one as the one + /// in `new_info`. + old_info: TConnInfo, /// Endpoint we were connected to. closed_endpoint: ConnectedPoint, /// If `Listener`, then we received the connection. If `Dial`, then it's a connection that @@ -182,8 +188,8 @@ where /// The handler of a node has produced an error. NodeClosed { - /// Identifier of the node. - peer_id: TPeerId, + /// Information about the connection that has been closed. + conn_info: TConnInfo, /// Endpoint we were connected to. endpoint: ConnectedPoint, /// The error that happened. @@ -202,7 +208,7 @@ where multiaddr: Multiaddr, /// The error that happened. - error: RawSwarmReachError, + error: RawSwarmReachError, }, /// Failed to reach a peer that we were trying to dial. @@ -219,20 +225,21 @@ where /// A node produced a custom event. NodeEvent { - /// Id of the node that produced the event. - peer_id: TPeerId, + /// Connection that produced the event. + conn_info: TConnInfo, /// Event that was produced by the node. event: TOutEvent, }, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for - RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> fmt::Debug for + RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TOutEvent: fmt::Debug, TTrans: Transport, TTrans::Error: fmt::Debug, THandlerErr: fmt::Debug, + TConnInfo: fmt::Debug, TPeerId: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { @@ -256,22 +263,23 @@ where .field("error", error) .finish() } - RawSwarmEvent::Connected { ref peer_id, ref endpoint } => { + RawSwarmEvent::Connected { ref conn_info, ref endpoint } => { f.debug_struct("Connected") - .field("peer_id", peer_id) + .field("conn_info", conn_info) .field("endpoint", endpoint) .finish() } - RawSwarmEvent::Replaced { ref peer_id, ref closed_endpoint, ref endpoint } => { + RawSwarmEvent::Replaced { ref new_info, ref old_info, ref closed_endpoint, ref endpoint } => { f.debug_struct("Replaced") - .field("peer_id", peer_id) + .field("new_info", new_info) + .field("old_info", old_info) .field("closed_endpoint", closed_endpoint) .field("endpoint", endpoint) .finish() } - RawSwarmEvent::NodeClosed { ref peer_id, ref endpoint, ref error } => { + RawSwarmEvent::NodeClosed { ref conn_info, ref endpoint, ref error } => { f.debug_struct("NodeClosed") - .field("peer_id", peer_id) + .field("conn_info", conn_info) .field("endpoint", endpoint) .field("error", error) .finish() @@ -290,9 +298,9 @@ where .field("error", error) .finish() } - RawSwarmEvent::NodeEvent { ref peer_id, ref event } => { + RawSwarmEvent::NodeEvent { ref conn_info, ref event } => { f.debug_struct("NodeEvent") - .field("peer_id", peer_id) + .field("conn_info", conn_info) .field("event", event) .finish() } @@ -302,23 +310,23 @@ where /// Internal error type that contains all the possible errors that can happen in a reach attempt. #[derive(Debug)] -enum InternalReachErr { +enum InternalReachErr { /// Error in the transport layer. Transport(TransportError), /// We successfully reached the peer, but there was a mismatch between the expected id and the /// actual id of the peer. PeerIdMismatch { - /// The peer id that the node reports. - obtained: TPeerId, + /// The information about the bad connection. + obtained: TConnInfo, }, /// The negotiated `PeerId` is the same as the one of the local node. FoundLocalPeerId, } -impl fmt::Display for InternalReachErr +impl fmt::Display for InternalReachErr where TTransErr: fmt::Display, - TPeerId: fmt::Debug, + TConnInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -333,10 +341,10 @@ where } } -impl error::Error for InternalReachErr +impl error::Error for InternalReachErr where TTransErr: error::Error + 'static, - TPeerId: fmt::Debug, + TConnInfo: fmt::Debug, { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { @@ -363,22 +371,22 @@ pub enum PeerState { /// Error that can happen when trying to reach a node. #[derive(Debug)] -pub enum RawSwarmReachError { +pub enum RawSwarmReachError { /// Error in the transport layer. Transport(TransportError), /// We successfully reached the peer, but there was a mismatch between the expected id and the /// actual id of the peer. PeerIdMismatch { - /// The peer id that the node reports. - obtained: TPeerId, + /// The information about the other connection. + obtained: TConnInfo, } } -impl fmt::Display for RawSwarmReachError +impl fmt::Display for RawSwarmReachError where TTransErr: fmt::Display, - TPeerId: fmt::Debug, + TConnInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -390,10 +398,10 @@ where } } -impl error::Error for RawSwarmReachError +impl error::Error for RawSwarmReachError where TTransErr: error::Error + 'static, - TPeerId: fmt::Debug, + TConnInfo: fmt::Debug, { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { @@ -478,7 +486,7 @@ where TTransErr: error::Error + 'static } /// A new connection arrived on a listener. -pub struct IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +pub struct IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport { /// The produced upgrade. @@ -490,18 +498,18 @@ where TTrans: Transport /// Address used to send back data to the remote. send_back_addr: Multiaddr, /// Reference to the `active_nodes` field of the swarm. - active_nodes: &'a mut CollectionStream, THandlerErr, (), TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, /// Reference to the `other_reach_attempts` field of the swarm. other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>, } -impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId> - IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo, TPeerId> + IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where - TTrans: Transport, + TTrans: Transport, TTrans::Error: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -510,7 +518,8 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - TPeerId: fmt::Debug + Eq + Hash + Clone + Send + 'static, + TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, + TPeerId: Eq + Hash + Clone + Send + 'static, { /// Starts processing the incoming connection and sets the handler to use for it. #[inline] @@ -528,7 +537,7 @@ where let upgrade = self.upgrade .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) .and_then(move |(peer_id, muxer)| { - if peer_id == local_peer_id { + if *peer_id.peer_id() == local_peer_id { Err(InternalReachErr::FoundLocalPeerId) } else { Ok((peer_id, muxer)) @@ -542,8 +551,8 @@ where } } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> - IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> + IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport { /// Returns the `IncomingInfo` corresponding to this incoming connection. @@ -655,16 +664,17 @@ impl<'a> IncomingInfo<'a> { } } -impl - RawSwarm +impl + RawSwarm where TTrans: Transport + Clone, TMuxer: StreamMuxer, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, - TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, + TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, + TPeerId: Eq + Hash + Clone, { /// Creates a new node events stream. #[inline] @@ -757,7 +767,7 @@ where /// The second parameter is the handler to use if we manage to reach a node. pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), TransportError> where - TTrans: Transport, + TTrans: Transport, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, @@ -765,12 +775,14 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, + TConnInfo: Send + 'static, + TPeerId: Send + 'static, { let local_peer_id = self.reach_attempts.local_peer_id.clone(); let future = self.transport().clone().dial(addr.clone())? .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) .and_then(move |(peer_id, muxer)| { - if peer_id == local_peer_id { + if *peer_id.peer_id() == local_peer_id { Err(InternalReachErr::FoundLocalPeerId) } else { Ok((peer_id, muxer)) @@ -843,7 +855,7 @@ where self.reach_attempts .out_reach_attempts .keys() - .filter(move |p| !self.active_nodes.has_connection(&p)) + .filter(move |p| !self.active_nodes.has_connection(p)) } /// Returns the list of addresses we're currently dialing without knowing the `PeerId` of. @@ -861,7 +873,7 @@ where /// Grants access to a struct that represents a peer. #[inline] - pub fn peer(&mut self, peer_id: TPeerId) -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> { + pub fn peer(&mut self, peer_id: TPeerId) -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> { if peer_id == self.reach_attempts.local_peer_id { return Peer::LocalNode; } @@ -904,7 +916,7 @@ where /// given peer. fn start_dial_out(&mut self, peer_id: TPeerId, handler: THandler, first: Multiaddr, rest: Vec) where - TTrans: Transport, + TTrans: Transport, TTrans::Dial: Send + 'static, TTrans::Error: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, @@ -912,17 +924,19 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, + TConnInfo: Send + 'static, + TPeerId: Send + 'static, { let reach_id = match self.transport().clone().dial(first.clone()) { Ok(fut) => { let expected_peer_id = peer_id.clone(); let fut = fut .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) - .and_then(move |(actual_peer_id, muxer)| { - if actual_peer_id == expected_peer_id { - Ok((actual_peer_id, muxer)) + .and_then(move |(actual_conn_info, muxer)| { + if *actual_conn_info.peer_id() == expected_peer_id { + Ok((actual_conn_info, muxer)) } else { - Err(InternalReachErr::PeerIdMismatch { obtained: actual_peer_id }) + Err(InternalReachErr::PeerIdMismatch { obtained: actual_conn_info }) } }); self.active_nodes.add_reach_attempt(fut, handler) @@ -946,9 +960,9 @@ where } /// Provides an API similar to `Stream`, except that it cannot error. - pub fn poll(&mut self) -> Async> + pub fn poll(&mut self) -> Async> where - TTrans: Transport, + TTrans: Transport, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, @@ -957,10 +971,12 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, + TConnInfo: Clone, + TPeerId: AsRef<[u8]> + Send + 'static, { // Start by polling the listeners for events, but only // if numer of incoming connection does not exceed the limit. @@ -1013,11 +1029,11 @@ where out_event = e; } Async::Ready(CollectionEvent::NodeClosed { - peer_id, + conn_info, error, .. }) => { - let endpoint = self.reach_attempts.connected_points.remove(&peer_id) + let endpoint = self.reach_attempts.connected_points.remove(conn_info.peer_id()) .expect("We insert into connected_points whenever a connection is \ opened and remove only when a connection is closed; the \ underlying API is guaranteed to always deliver a connection \ @@ -1025,14 +1041,14 @@ where messages; QED"); action = Default::default(); out_event = RawSwarmEvent::NodeClosed { - peer_id, + conn_info, endpoint, error, }; } Async::Ready(CollectionEvent::NodeEvent { peer, event }) => { action = Default::default(); - out_event = RawSwarmEvent::NodeEvent { peer_id: peer.id().clone(), event }; + out_event = RawSwarmEvent::NodeEvent { conn_info: peer.info().clone(), event }; } } @@ -1082,18 +1098,19 @@ impl Default for ActionItem { /// /// > **Note**: The event **must** have been produced by the collection of nodes, otherwise /// > panics will likely happen. -fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>( +fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>( reach_attempts: &mut ReachAttempts, - event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr, THandlerErr, (), TPeerId>, -) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>) + event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr, THandlerErr, (), TConnInfo, TPeerId>, +) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>) where - TTrans: Transport + Clone, + TTrans: Transport + Clone, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, + TConnInfo: ConnectionInfo + Clone + Send + 'static, + TPeerId: Eq + Hash + AsRef<[u8]> + Clone, { // We first start looking in the incoming attempts. While this makes the code less optimal, // it also makes the logic easier. @@ -1144,20 +1161,24 @@ where } }; - let (outcome, peer_id) = event.accept(()); - if let CollectionNodeAccept::ReplacedExisting(()) = outcome { + let (outcome, conn_info) = event.accept(()); + if let CollectionNodeAccept::ReplacedExisting(old_info, ()) = outcome { let closed_endpoint = closed_endpoint .expect("We insert into connected_points whenever a connection is opened and \ remove only when a connection is closed; the underlying API is \ guaranteed to always deliver a connection closed message after it has \ been opened, and no two closed messages; QED"); return (action, RawSwarmEvent::Replaced { - peer_id, + new_info: conn_info, + old_info, endpoint: opened_endpoint, closed_endpoint, }); } else { - return (action, RawSwarmEvent::Connected { peer_id, endpoint: opened_endpoint }); + return (action, RawSwarmEvent::Connected { + conn_info, + endpoint: opened_endpoint + }); } } @@ -1182,20 +1203,25 @@ where let closed_endpoint = reach_attempts.connected_points .insert(event.peer_id().clone(), opened_endpoint.clone()); - let (outcome, peer_id) = event.accept(()); - if let CollectionNodeAccept::ReplacedExisting(()) = outcome { + let (outcome, conn_info) = event.accept(()); + if let CollectionNodeAccept::ReplacedExisting(old_info, ()) = outcome { let closed_endpoint = closed_endpoint .expect("We insert into connected_points whenever a connection is opened and \ remove only when a connection is closed; the underlying API is guaranteed \ to always deliver a connection closed message after it has been opened, \ and no two closed messages; QED"); return (Default::default(), RawSwarmEvent::Replaced { - peer_id, + new_info: conn_info, + old_info, endpoint: opened_endpoint, closed_endpoint, }); + } else { - return (Default::default(), RawSwarmEvent::Connected { peer_id, endpoint: opened_endpoint }); + return (Default::default(), RawSwarmEvent::Connected { + conn_info, + endpoint: opened_endpoint + }); } } @@ -1226,14 +1252,15 @@ where /// /// > **Note**: The event **must** have been produced by the collection of nodes, otherwise /// > panics will likely happen. -fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>( +fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>( reach_attempts: &mut ReachAttempts, reach_id: ReachAttemptId, - error: InternalReachErr, + error: InternalReachErr, handler: THandler, -) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>) +) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>) where TTrans: Transport, + TConnInfo: ConnectionInfo + Send + 'static, TPeerId: Eq + Hash + Clone, { // Search for the attempt in `out_reach_attempts`. @@ -1343,31 +1370,32 @@ where } /// State of a peer in the system. -pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a> +pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, { /// We are connected to this peer. - Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), + Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>), /// We are currently attempting to connect to this peer. - PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), + PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>), /// We are not connected to this peer at all. /// /// > **Note**: It is however possible that a pending incoming connection is being negotiated /// > and will connect to this peer, but we don't know it yet. - NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), + NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>), /// The requested peer is the local node. LocalNode, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for - Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> fmt::Debug for + Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, - TPeerId: Eq + Hash + fmt::Debug, + TConnInfo: fmt::Debug + ConnectionInfo, + TPeerId: fmt::Debug + Eq + Hash, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match *self { @@ -1396,10 +1424,10 @@ where } // TODO: add other similar methods that wrap to the ones of `PeerNotConnected` -impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> - Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> + Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where - TTrans: Transport + Clone, + TTrans: Transport + Clone, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, @@ -1407,15 +1435,16 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, - TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, + TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, + TPeerId: Eq + Hash + Clone + Send + 'static, { /// If we are connected, returns the `PeerConnected`. #[inline] - pub fn into_connected(self) -> Option> { + pub fn into_connected(self) -> Option> { match self { Peer::Connected(peer) => Some(peer), _ => None, @@ -1424,7 +1453,7 @@ where /// If a connection is pending, returns the `PeerPendingConnect`. #[inline] - pub fn into_pending_connect(self) -> Option> { + pub fn into_pending_connect(self) -> Option> { match self { Peer::PendingConnect(peer) => Some(peer), _ => None, @@ -1433,7 +1462,7 @@ where /// If we are not connected, returns the `PeerNotConnected`. #[inline] - pub fn into_not_connected(self) -> Option> { + pub fn into_not_connected(self) -> Option> { match self { Peer::NotConnected(peer) => Some(peer), _ => None, @@ -1448,7 +1477,7 @@ where /// Returns an error if we are `LocalNode`. #[inline] pub fn or_connect(self, addr: Multiaddr, handler: THandler) - -> Result, Self> + -> Result, Self> { self.or_connect_with(move |_| addr, handler) } @@ -1462,7 +1491,7 @@ where /// Returns an error if we are `LocalNode`. #[inline] pub fn or_connect_with(self, addr: TFn, handler: THandler) - -> Result, Self> + -> Result, Self> where TFn: FnOnce(&TPeerId) -> Multiaddr, { @@ -1479,22 +1508,23 @@ where } /// Peer we are potentially going to connect to. -pub enum PeerPotentialConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a> +pub enum PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport { /// We are connected to this peer. - Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), + Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>), /// We are currently attempting to connect to this peer. - PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), + PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>), } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> - PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> + PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, - TPeerId: Eq + Hash + Clone, + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash, { /// Closes the connection or the connection attempt. // TODO: consider returning a `PeerNotConnected` @@ -1508,7 +1538,7 @@ where /// If we are connected, returns the `PeerConnected`. #[inline] - pub fn into_connected(self) -> Option> { + pub fn into_connected(self) -> Option> { match self { PeerPotentialConnect::Connected(peer) => Some(peer), _ => None, @@ -1517,7 +1547,7 @@ where /// If a connection is pending, returns the `PeerPendingConnect`. #[inline] - pub fn into_pending_connect(self) -> Option> { + pub fn into_pending_connect(self) -> Option> { match self { PeerPotentialConnect::PendingConnect(peer) => Some(peer), _ => None, @@ -1526,11 +1556,11 @@ where } /// Access to a peer we are connected to. -pub struct PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +pub struct PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, { /// Reference to the `active_nodes` of the parent. - active_nodes: &'a mut CollectionStream, THandlerErr, (), TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, /// Reference to the `connected_points` field of the parent. connected_points: &'a mut FnvHashMap, /// Reference to the `out_reach_attempts` field of the parent. @@ -1538,10 +1568,11 @@ where TTrans: Transport, peer_id: TPeerId, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, - TPeerId: Eq + Hash + Clone, + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash, { /// Closes the connection to this node. /// @@ -1582,19 +1613,20 @@ where /// Access to a peer we are attempting to connect to. #[derive(Debug)] -pub struct PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +pub struct PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport { attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>, - active_nodes: &'a mut CollectionStream, THandlerErr, (), TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> - PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> + PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, - TPeerId: Eq + Hash + Clone, + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash, { /// Interrupt this connection attempt. // TODO: consider returning a PeerNotConnected; however that is really pain in terms of @@ -1646,16 +1678,16 @@ where } /// Access to a peer we're not connected to. -pub struct PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +pub struct PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, { peer_id: TPeerId, - nodes: &'a mut RawSwarm, + nodes: &'a mut RawSwarm, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for - PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> fmt::Debug for + PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport, TPeerId: fmt::Debug, @@ -1667,16 +1699,16 @@ where } } -impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId> - PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo, TPeerId> + PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where - TTrans: Transport + Clone, + TTrans: Transport + Clone, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -1689,9 +1721,10 @@ where /// the whole connection is immediately closed. #[inline] pub fn connect(self, addr: Multiaddr, handler: THandler) - -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where - TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, + TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, + TPeerId: Eq + Hash + Clone + Send + 'static, { self.connect_inner(handler, addr, Vec::new()) } @@ -1706,10 +1739,11 @@ where /// the whole connection is immediately closed. #[inline] pub fn connect_iter(self, addrs: TIter, handler: THandler) - -> Result, Self> + -> Result, Self> where TIter: IntoIterator, - TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, + TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, + TPeerId: Eq + Hash + Clone + Send + 'static, { let mut addrs = addrs.into_iter(); let first = match addrs.next() { @@ -1722,9 +1756,10 @@ where /// Inner implementation of `connect`. fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec) - -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where - TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, + TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, + TPeerId: Eq + Hash + Clone + Send + 'static, { self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest); PeerPendingConnect { diff --git a/core/src/nodes/raw_swarm/tests.rs b/core/src/nodes/raw_swarm/tests.rs index d62415c6..440b8e66 100644 --- a/core/src/nodes/raw_swarm/tests.rs +++ b/core/src/nodes/raw_swarm/tests.rs @@ -107,7 +107,7 @@ fn successful_dial_reaches_a_node() { let mut swarm = swarm_fut.lock(); let poll_res = swarm.poll(); match poll_res { - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => Ok(Async::Ready(Some(conn_info))), _ => Ok(Async::Ready(None)) } })).expect("tokio works"); @@ -172,7 +172,7 @@ fn broadcasted_events_reach_active_nodes() { let mut swarm = swarm_fut.lock(); let poll_res = swarm.poll(); match poll_res { - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => Ok(Async::Ready(Some(conn_info))), _ => Ok(Async::Ready(None)) } })).expect("tokio works"); @@ -185,7 +185,7 @@ fn broadcasted_events_reach_active_nodes() { let mut swarm = swarm_fut.lock(); match swarm.poll() { Async::Ready(event) => { - assert_matches!(event, RawSwarmEvent::NodeEvent { peer_id: _, event: inner_event } => { + assert_matches!(event, RawSwarmEvent::NodeEvent { conn_info: _, event: inner_event } => { // The event we sent reached the node and triggered sending the out event we told it to return assert_matches!(inner_event, OutEvent::Custom("from handler 1")); }); @@ -236,7 +236,7 @@ fn querying_for_connected_peer() { let mut swarm = swarm_fut.lock(); let poll_res = swarm.poll(); match poll_res { - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => Ok(Async::Ready(Some(conn_info))), _ => Ok(Async::Ready(None)) } })).expect("tokio works"); @@ -385,8 +385,8 @@ fn yields_node_error_when_there_is_an_error_after_successful_connect() { let expected_peer_id = peer_id.clone(); rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut swarm = swarm_fut.lock(); - assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => { - assert_eq!(peer_id, expected_peer_id); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { conn_info, .. }) => { + assert_eq!(conn_info, expected_peer_id); }); Ok(Async::Ready(())) })).expect("tokio works"); diff --git a/core/src/swarm/swarm.rs b/core/src/swarm/swarm.rs index a14187de..38618e69 100644 --- a/core/src/swarm/swarm.rs +++ b/core/src/swarm/swarm.rs @@ -22,6 +22,7 @@ use crate::{ Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName, muxing::StreamMuxer, nodes::{ + collection::ConnectionInfo, handled_node::NodeHandler, node::Substream, raw_swarm::{self, RawSwarm, RawSwarmEvent} @@ -45,6 +46,8 @@ where TTransport: Transport, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent, NodeHandlerWrapperBuilder, NodeHandlerWrapperError<<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error>, + PeerId, + PeerId, >, /// Handles which nodes to connect to and how to handle the events sent back by the protocol @@ -240,17 +243,17 @@ where TBehaviour: NetworkBehaviour, match self.raw_swarm.poll() { Async::NotReady => raw_swarm_not_ready = true, - Async::Ready(RawSwarmEvent::NodeEvent { peer_id, event }) => { - self.behaviour.inject_node_event(peer_id, event); + Async::Ready(RawSwarmEvent::NodeEvent { conn_info, event }) => { + self.behaviour.inject_node_event(conn_info.peer_id().clone(), event); }, - Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => { - self.behaviour.inject_connected(peer_id, endpoint); + Async::Ready(RawSwarmEvent::Connected { conn_info, endpoint }) => { + self.behaviour.inject_connected(conn_info.peer_id().clone(), endpoint); }, - Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint, .. }) => { - self.behaviour.inject_disconnected(&peer_id, endpoint); + Async::Ready(RawSwarmEvent::NodeClosed { conn_info, endpoint, .. }) => { + self.behaviour.inject_disconnected(conn_info.peer_id(), endpoint); }, - Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => { - self.behaviour.inject_replaced(peer_id, closed_endpoint, endpoint); + Async::Ready(RawSwarmEvent::Replaced { new_info, closed_endpoint, endpoint, .. }) => { + self.behaviour.inject_replaced(new_info.peer_id().clone(), closed_endpoint, endpoint); }, Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => { let handler = self.behaviour.new_handler(); diff --git a/core/tests/raw_swarm_simult.rs b/core/tests/raw_swarm_simult.rs index 297c3a06..1fa37eb7 100644 --- a/core/tests/raw_swarm_simult.rs +++ b/core/tests/raw_swarm_simult.rs @@ -184,13 +184,13 @@ fn raw_swarm_simultaneous_connect() { assert_eq!(swarm1_step, 2); swarm1_step = 3; }, - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => { - assert_eq!(peer_id, *swarm2.local_peer_id()); + Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => { + assert_eq!(conn_info, *swarm2.local_peer_id()); assert_eq!(swarm1_step, 1); swarm1_step = 2; }, - Async::Ready(RawSwarmEvent::Replaced { peer_id, .. }) => { - assert_eq!(peer_id, *swarm2.local_peer_id()); + Async::Ready(RawSwarmEvent::Replaced { new_info, .. }) => { + assert_eq!(new_info, *swarm2.local_peer_id()); assert_eq!(swarm1_step, 2); swarm1_step = 3; }, @@ -208,13 +208,13 @@ fn raw_swarm_simultaneous_connect() { assert_eq!(swarm2_step, 2); swarm2_step = 3; }, - Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => { - assert_eq!(peer_id, *swarm1.local_peer_id()); + Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => { + assert_eq!(conn_info, *swarm1.local_peer_id()); assert_eq!(swarm2_step, 1); swarm2_step = 2; }, - Async::Ready(RawSwarmEvent::Replaced { peer_id, .. }) => { - assert_eq!(peer_id, *swarm1.local_peer_id()); + Async::Ready(RawSwarmEvent::Replaced { new_info, .. }) => { + assert_eq!(new_info, *swarm1.local_peer_id()); assert_eq!(swarm2_step, 2); swarm2_step = 3; },