diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 9c801a95..29b395d8 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -325,13 +325,13 @@ where TFut: Future + Send + 'static, THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? + ::OutboundOpenInfo: Send + 'static, TReachErr: error::Error + Send + 'static, THandlerErr: error::Error + Send + 'static, TInEvent: Send + 'static, TOutEvent: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required - TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required + TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer::OutboundSubstream: Send + 'static, TConnInfo: Send + 'static, { ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler)) @@ -365,6 +365,42 @@ where self.inner.broadcast_event(event) } + /// Adds an existing connection to a node to the collection. + /// + /// Returns whether we have replaced an existing connection, or not. + pub fn add_connection(&mut self, conn_info: TConnInfo, user_data: TUserData, muxer: TMuxer, handler: THandler::Handler) + -> CollectionNodeAccept + where + THandler: IntoNodeHandler + Send + 'static, + THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + ::OutboundOpenInfo: Send + 'static, + TReachErr: error::Error + Send + 'static, + THandlerErr: error::Error + Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer::OutboundSubstream: Send + 'static, + TConnInfo: Clone + Send + 'static, + TPeerId: Clone, + { + // Calling `HandledNodesTasks::add_connection` is the same as calling + // `HandledNodesTasks::add_reach_attempt`, except that we don't get any `NodeReached` event. + // We therefore implement this method the same way as calling `add_reach_attempt` followed + // with simulating a received `NodeReached` event and accepting it. + + let task_id = self.inner.add_connection( + TaskState::Pending, + muxer, + handler + ); + + CollectionReachEvent { + conn_info: Some(conn_info), + id: task_id, + parent: self, + }.accept(user_data).0 + } + /// Grants access to an object that allows controlling a peer of the collection. /// /// Returns `None` if we don't have a connection to this peer. diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index fb8fe3ed..bf32b6b5 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -217,9 +217,9 @@ impl::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? - TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required - TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required + ::OutboundOpenInfo: Send + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer::OutboundSubstream: Send + 'static, TConnInfo: Send + 'static, { let task_id = self.next_task_id; @@ -244,6 +244,43 @@ impl(&mut self, user_data: TUserData, muxer: TMuxer, handler: THandler) -> TaskId + where + TIntoHandler: IntoNodeHandler + Send + 'static, + THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, + TReachErr: error::Error + Send + 'static, + THandlerErr: error::Error + Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + ::OutboundOpenInfo: Send + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer::OutboundSubstream: Send + 'static, + TConnInfo: Send + 'static, + { + let task_id = self.next_task_id; + self.next_task_id.0 += 1; + + let (tx, rx) = mpsc::unbounded(); + self.tasks.insert(task_id, (tx, user_data)); + + let task: NodeTask, _, _, _, _, _, _> = NodeTask { + taken_over: SmallVec::new(), + inner: NodeTaskInner::Node(HandledNode::new(muxer, handler)), + events_tx: self.events_tx.clone(), + in_events_rx: rx.fuse(), + id: task_id, + }; + + self.to_spawn.push(Box::new(task)); + task_id + } + /// Sends an event to all the tasks, including the pending ones. pub fn broadcast_event(&mut self, event: &TInEvent) where TInEvent: Clone, diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 77ba3518..23200291 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -1784,6 +1784,39 @@ where Ok(self.connect_inner(handler, first, rest)) } + /// Moves the given node to a connected state using the given connection info and muxer. + /// + /// No `Connected` event is generated for this action. + /// + /// # Panic + /// + /// Panics if `conn_info.peer_id()` is not the current peer. + /// + pub fn inject_connection(self, conn_info: TConnInfo, connected_point: ConnectedPoint, muxer: TMuxer, handler: THandler::Handler) + -> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> + where + TConnInfo: fmt::Debug + ConnectionInfo + Clone + Send + 'static, + TPeerId: Eq + Hash + Clone, + { + if conn_info.peer_id() != &self.peer_id { + panic!("Mismatch between conn_info PeerId and request PeerId"); + } + + match self.nodes.active_nodes.add_connection((conn_info, connected_point), (), muxer, handler) { + CollectionNodeAccept::NewEntry => {}, + CollectionNodeAccept::ReplacedExisting { .. } => + unreachable!("We can only build a PeerNotConnected if we don't have this peer in \ + the collection yet"), + } + + PeerConnected { + active_nodes: &mut self.nodes.active_nodes, + connected_points: &mut self.nodes.reach_attempts.connected_points, + out_reach_attempts: &mut self.nodes.reach_attempts.out_reach_attempts, + peer_id: self.peer_id, + } + } + /// Inner implementation of `connect`. fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec) -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>