Allow injecting an existing connection (#1157)

This commit is contained in:
Pierre Krieger 2019-06-03 17:27:43 +02:00 committed by GitHub
parent 710ce90c6e
commit fbc6ea5c5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 6 deletions

View File

@ -325,13 +325,13 @@ where
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler))
@ -365,6 +365,42 @@ where
self.inner.broadcast_event(event)
}
/// Adds an existing connection to a node to the collection.
///
/// Returns whether we have replaced an existing connection, or not.
pub fn add_connection<TMuxer>(&mut self, conn_info: TConnInfo, user_data: TUserData, muxer: TMuxer, handler: THandler::Handler)
-> CollectionNodeAccept<TConnInfo, TUserData>
where
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Clone + Send + 'static,
TPeerId: Clone,
{
// Calling `HandledNodesTasks::add_connection` is the same as calling
// `HandledNodesTasks::add_reach_attempt`, except that we don't get any `NodeReached` event.
// We therefore implement this method the same way as calling `add_reach_attempt` followed
// with simulating a received `NodeReached` event and accepting it.
let task_id = self.inner.add_connection(
TaskState::Pending,
muxer,
handler
);
CollectionReachEvent {
conn_info: Some(conn_info),
id: task_id,
parent: self,
}.accept(user_data).0
}
/// Grants access to an object that allows controlling a peer of the collection.
///
/// Returns `None` if we don't have a connection to this peer.

View File

@ -217,9 +217,9 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConn
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
let task_id = self.next_task_id;
@ -244,6 +244,43 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConn
task_id
}
/// Adds an existing connection to a node to the collection.
///
/// This method spawns a task dedicated to processing the node's events.
///
/// No `NodeReached` event will be emitted for this task, since the node has already been
/// reached.
pub fn add_connection<TMuxer, THandler>(&mut self, user_data: TUserData, muxer: TMuxer, handler: THandler) -> TaskId
where
TIntoHandler: IntoNodeHandler<TConnInfo, Handler = THandler> + Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::unbounded();
self.tasks.insert(task_id, (tx, user_data));
let task: NodeTask<futures::future::Empty<_, _>, _, _, _, _, _, _> = NodeTask {
taken_over: SmallVec::new(),
inner: NodeTaskInner::Node(HandledNode::new(muxer, handler)),
events_tx: self.events_tx.clone(),
in_events_rx: rx.fuse(),
id: task_id,
};
self.to_spawn.push(Box::new(task));
task_id
}
/// Sends an event to all the tasks, including the pending ones.
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,

View File

@ -1784,6 +1784,39 @@ where
Ok(self.connect_inner(handler, first, rest))
}
/// Moves the given node to a connected state using the given connection info and muxer.
///
/// No `Connected` event is generated for this action.
///
/// # Panic
///
/// Panics if `conn_info.peer_id()` is not the current peer.
///
pub fn inject_connection(self, conn_info: TConnInfo, connected_point: ConnectedPoint, muxer: TMuxer, handler: THandler::Handler)
-> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
where
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
TPeerId: Eq + Hash + Clone,
{
if conn_info.peer_id() != &self.peer_id {
panic!("Mismatch between conn_info PeerId and request PeerId");
}
match self.nodes.active_nodes.add_connection((conn_info, connected_point), (), muxer, handler) {
CollectionNodeAccept::NewEntry => {},
CollectionNodeAccept::ReplacedExisting { .. } =>
unreachable!("We can only build a PeerNotConnected if we don't have this peer in \
the collection yet"),
}
PeerConnected {
active_nodes: &mut self.nodes.active_nodes,
connected_points: &mut self.nodes.reach_attempts.connected_points,
out_reach_attempts: &mut self.nodes.reach_attempts.out_reach_attempts,
peer_id: self.peer_id,
}
}
/// Inner implementation of `connect`.
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>