From 089e34967100b167c0ca0252d910e12e6a0a96a5 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 10 May 2019 11:05:22 +0200 Subject: [PATCH] Pass the ConnectedPoint to into_handler() (#1085) --- core/src/nodes/raw_swarm.rs | 80 +++++++++++++--------- core/src/protocols_handler/mod.rs | 5 +- core/src/protocols_handler/node_handler.rs | 8 ++- core/src/protocols_handler/select.rs | 7 +- core/src/swarm/toggle.rs | 4 +- 5 files changed, 63 insertions(+), 41 deletions(-) diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index f650c2c5..77ba3518 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -61,7 +61,7 @@ where listeners: ListenersStream, /// The nodes currently active. - active_nodes: CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, + active_nodes: CollectionStream, THandlerErr, (), (TConnInfo, ConnectedPoint), TPeerId>, /// The reach attempts of the swarm. /// This needs to be a separate struct in order to handle multiple mutable borrows issues. @@ -88,6 +88,17 @@ where } } +impl ConnectionInfo for (TConnInfo, ConnectedPoint) +where + TConnInfo: ConnectionInfo +{ + type PeerId = TConnInfo::PeerId; + + fn peer_id(&self) -> &Self::PeerId { + self.0.peer_id() + } +} + struct ReachAttempts { /// Peer ID of the node we control. local_peer_id: TPeerId, @@ -520,7 +531,7 @@ where TTrans: Transport /// Address used to send back data to the remote. send_back_addr: Multiaddr, /// Reference to the `active_nodes` field of the swarm. - active_nodes: &'a mut CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), (TConnInfo, ConnectedPoint), TPeerId>, /// Reference to the `other_reach_attempts` field of the swarm. other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>, } @@ -531,7 +542,7 @@ where TTrans: Transport, TTrans::Error: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -558,11 +569,14 @@ where let local_peer_id = self.local_peer_id; let upgrade = self.upgrade .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) - .and_then(move |(peer_id, muxer)| { - if *peer_id.peer_id() == local_peer_id { - Err(InternalReachErr::FoundLocalPeerId) - } else { - Ok((peer_id, muxer)) + .and_then({ + let connected_point = connected_point.clone(); + move |(peer_id, muxer)| { + if *peer_id.peer_id() == local_peer_id { + Err(InternalReachErr::FoundLocalPeerId) + } else { + Ok(((peer_id, connected_point), muxer)) + } } }); let id = self.active_nodes.add_reach_attempt(upgrade, handler); @@ -691,7 +705,7 @@ impl + Send + 'static, + THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -798,17 +812,20 @@ where TPeerId: Send + 'static, { let local_peer_id = self.reach_attempts.local_peer_id.clone(); - let future = self.transport().clone().dial(addr.clone())? + let connected_point = ConnectedPoint::Dialer { address: addr.clone() }; + let future = self.transport().clone().dial(addr)? .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) - .and_then(move |(peer_id, muxer)| { - if *peer_id.peer_id() == local_peer_id { - Err(InternalReachErr::FoundLocalPeerId) - } else { - Ok((peer_id, muxer)) + .and_then({ + let connected_point = connected_point.clone(); + move |(peer_id, muxer)| { + if *peer_id.peer_id() == local_peer_id { + Err(InternalReachErr::FoundLocalPeerId) + } else { + Ok(((peer_id, connected_point), muxer)) + } } }); - let connected_point = ConnectedPoint::Dialer { address: addr }; let reach_id = self.active_nodes.add_reach_attempt(future, handler); self.reach_attempts.other_reach_attempts.push((reach_id, connected_point)); Ok(()) @@ -946,11 +963,12 @@ where let reach_id = match self.transport().clone().dial(first.clone()) { Ok(fut) => { let expected_peer_id = peer_id.clone(); + let connected_point = ConnectedPoint::Dialer { address: first.clone() }; let fut = fut .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) .and_then(move |(actual_conn_info, muxer)| { if *actual_conn_info.peer_id() == expected_peer_id { - Ok((actual_conn_info, muxer)) + Ok(((actual_conn_info, connected_point), muxer)) } else { Err(InternalReachErr::PeerIdMismatch { obtained: actual_conn_info }) } @@ -987,7 +1005,7 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -1053,14 +1071,14 @@ where messages; QED"); action = Default::default(); out_event = RawSwarmEvent::NodeClosed { - conn_info, + conn_info: conn_info.0, endpoint, error, }; } Async::Ready(CollectionEvent::NodeEvent { peer, event }) => { action = Default::default(); - out_event = RawSwarmEvent::NodeEvent { conn_info: peer.info().clone(), event }; + out_event = RawSwarmEvent::NodeEvent { conn_info: peer.info().0.clone(), event }; } } @@ -1112,7 +1130,7 @@ impl Default for ActionItem { /// > panics will likely happen. fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>( reach_attempts: &mut ReachAttempts, - event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr, THandlerErr, (), TConnInfo, TPeerId>, + event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr, THandlerErr, (), (TConnInfo, ConnectedPoint), TPeerId>, ) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>) where TTrans: Transport + Clone, @@ -1181,14 +1199,14 @@ where guaranteed to always deliver a connection closed message after it has \ been opened, and no two closed messages; QED"); return (action, RawSwarmEvent::Replaced { - new_info: conn_info, - old_info, + new_info: conn_info.0, + old_info: old_info.0, endpoint: opened_endpoint, closed_endpoint, }); } else { return (action, RawSwarmEvent::Connected { - conn_info, + conn_info: conn_info.0, endpoint: opened_endpoint }); } @@ -1223,15 +1241,15 @@ where to always deliver a connection closed message after it has been opened, \ and no two closed messages; QED"); return (Default::default(), RawSwarmEvent::Replaced { - new_info: conn_info, - old_info, + new_info: conn_info.0, + old_info: old_info.0, endpoint: opened_endpoint, closed_endpoint, }); } else { return (Default::default(), RawSwarmEvent::Connected { - conn_info, + conn_info: conn_info.0, endpoint: opened_endpoint }); } @@ -1447,7 +1465,7 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -1572,7 +1590,7 @@ pub struct PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, where TTrans: Transport, { /// Reference to the `active_nodes` of the parent. - active_nodes: &'a mut CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), (TConnInfo, ConnectedPoint), TPeerId>, /// Reference to the `connected_points` field of the parent. connected_points: &'a mut FnvHashMap, /// Reference to the `out_reach_attempts` field of the parent. @@ -1630,7 +1648,7 @@ where TTrans: Transport { attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>, - active_nodes: &'a mut CollectionStream, THandlerErr, (), TConnInfo, TPeerId>, + active_nodes: &'a mut CollectionStream, THandlerErr, (), (TConnInfo, ConnectedPoint), TPeerId>, } impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> @@ -1720,7 +1738,7 @@ where TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index 6d47bc04..0f3d0710 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -37,6 +37,7 @@ //! > connection with a remote. In order to handle a protocol that requires knowledge of //! > the network as a whole, see the `NetworkBehaviour` trait. +use crate::nodes::raw_swarm::ConnectedPoint; use crate::PeerId; use crate::upgrade::{ InboundUpgrade, @@ -427,7 +428,7 @@ pub trait IntoProtocolsHandler { /// Builds the protocols handler. /// /// The `PeerId` is the id of the node the handler is going to handle. - fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler; + fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler; /// Return the handler's inbound protocol. fn inbound_protocol(&self) -> ::InboundProtocol; @@ -456,7 +457,7 @@ where T: ProtocolsHandler { type Handler = Self; - fn into_handler(self, _: &PeerId) -> Self { + fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self { self } diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index 0688e007..65427f67 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -22,6 +22,7 @@ use crate::{ PeerId, nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}, nodes::handled_node_tasks::IntoNodeHandler, + nodes::raw_swarm::ConnectedPoint, protocols_handler::{KeepAlive, ProtocolsHandler, IntoProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ self, @@ -69,7 +70,8 @@ where } } -impl IntoNodeHandler for NodeHandlerWrapperBuilder +impl IntoNodeHandler<(PeerId, ConnectedPoint)> + for NodeHandlerWrapperBuilder where TIntoProtoHandler: IntoProtocolsHandler, TProtoHandler: ProtocolsHandler, @@ -78,9 +80,9 @@ where { type Handler = NodeHandlerWrapper; - fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler { + fn into_handler(self, remote_info: &(PeerId, ConnectedPoint)) -> Self::Handler { NodeHandlerWrapper { - handler: self.handler.into_handler(remote_peer_id), + handler: self.handler.into_handler(&remote_info.0, &remote_info.1), negotiating_in: Vec::new(), negotiating_out: Vec::new(), queued_dial_upgrades: Vec::new(), diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index b902034b..b4735645 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -30,6 +30,7 @@ use crate::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, }, + nodes::raw_swarm::ConnectedPoint, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -76,10 +77,10 @@ where { type Handler = ProtocolsHandlerSelect; - fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler { + fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { ProtocolsHandlerSelect { - proto1: self.proto1.into_handler(remote_peer_id), - proto2: self.proto2.into_handler(remote_peer_id), + proto1: self.proto1.into_handler(remote_peer_id, connected_point), + proto2: self.proto2.into_handler(remote_peer_id, connected_point), } } diff --git a/core/src/swarm/toggle.rs b/core/src/swarm/toggle.rs index be905597..819149f0 100644 --- a/core/src/swarm/toggle.rs +++ b/core/src/swarm/toggle.rs @@ -156,9 +156,9 @@ where { type Handler = ToggleProtoHandler; - fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler { + fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { ToggleProtoHandler { - inner: self.inner.map(|h| h.into_handler(remote_peer_id)) + inner: self.inner.map(|h| h.into_handler(remote_peer_id, connected_point)) } }