diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index c6a987c1..869d755a 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -61,7 +61,7 @@ where listeners: ListenersStream, /// The nodes currently active. - active_nodes: CollectionStream, THandlerErr>, + active_nodes: CollectionStream, THandlerErr>, /// The reach attempts of the swarm. /// This needs to be a separate struct in order to handle multiple mutable borrows issues. @@ -190,7 +190,7 @@ where multiaddr: Multiaddr, /// The error that happened. - error: TransportError, + error: UnknownPeerDialErr, /// The handler that was passed to `dial()`. handler: THandler, @@ -283,12 +283,55 @@ where } } +/// Internal error type that contains all the possible errors that can happen in a reach attempt. +#[derive(Debug)] +enum InternalReachErr { + /// Error in the transport layer. + Transport(TransportError), + /// We successfully reached the peer, but there was a mismatch between the expected id and the + /// actual id of the peer. + PeerIdMismatch { + /// The peer id that the node reports. + obtained: PeerId, + }, + /// The negotiated `PeerId` is the same as the one of the local node. + FoundLocalPeerId, +} + +impl fmt::Display for InternalReachErr +where TTransErr: fmt::Display +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + InternalReachErr::Transport(err) => write!(f, "{}", err), + InternalReachErr::PeerIdMismatch { obtained } => { + write!(f, "Peer ID mismatch, obtained: {}", obtained.to_base58()) + }, + InternalReachErr::FoundLocalPeerId => { + write!(f, "Remote has the same PeerId as us") + } + } + } +} + +impl error::Error for InternalReachErr +where TTransErr: error::Error + 'static +{ + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + InternalReachErr::Transport(err) => Some(err), + InternalReachErr::PeerIdMismatch { .. } => None, + InternalReachErr::FoundLocalPeerId => None, + } + } +} + /// Error that can happen when trying to reach a node. #[derive(Debug)] pub enum RawSwarmReachError { /// Error in the transport layer. - // TODO: is a TransportError correct here? Transport(TransportError), + /// We successfully reached the peer, but there was a mismatch between the expected id and the /// actual id of the peer. PeerIdMismatch { @@ -321,6 +364,39 @@ where TTransErr: error::Error + 'static } } +/// Error that can happen when dialing a node with an unknown peer ID. +#[derive(Debug)] +pub enum UnknownPeerDialErr { + /// Error in the transport layer. + Transport(TransportError), + /// The negotiated `PeerId` is the same as the local node. + FoundLocalPeerId, +} + +impl fmt::Display for UnknownPeerDialErr +where TTransErr: fmt::Display +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + UnknownPeerDialErr::Transport(err) => write!(f, "{}", err), + UnknownPeerDialErr::FoundLocalPeerId => { + write!(f, "Unknown peer has same PeerId as us") + }, + } + } +} + +impl error::Error for UnknownPeerDialErr +where TTransErr: error::Error + 'static +{ + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + UnknownPeerDialErr::Transport(err) => Some(err), + UnknownPeerDialErr::FoundLocalPeerId => None, + } + } +} + /// Error that can happen on an incoming connection. #[derive(Debug)] pub enum IncomingError { @@ -330,6 +406,8 @@ pub enum IncomingError { /// Denied the incoming connection because we're already connected to this peer as a dialer /// and we have a higher priority than the remote. DeniedLowerPriority, + /// The negotiated `PeerId` is the same as the local node. + FoundLocalPeerId, } impl fmt::Display for IncomingError @@ -341,6 +419,9 @@ where TTransErr: fmt::Display IncomingError::DeniedLowerPriority => { write!(f, "Denied because of lower priority") }, + IncomingError::FoundLocalPeerId => { + write!(f, "Incoming connection has same PeerId as us") + }, } } } @@ -352,6 +433,7 @@ where TTransErr: error::Error + 'static match self { IncomingError::Transport(err) => Some(err), IncomingError::DeniedLowerPriority => None, + IncomingError::FoundLocalPeerId => None, } } } @@ -362,12 +444,14 @@ where TTrans: Transport { /// The produced upgrade. upgrade: TTrans::ListenerUpgrade, + /// PeerId of the local node. + local_peer_id: PeerId, /// Address of the listener which received the connection. listen_addr: Multiaddr, /// Address used to send back data to the remote. send_back_addr: Multiaddr, /// Reference to the `active_nodes` field of the swarm. - active_nodes: &'a mut CollectionStream, THandlerErr>, + active_nodes: &'a mut CollectionStream, THandlerErr>, /// Reference to the `other_reach_attempts` field of the swarm. other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>, } @@ -400,7 +484,17 @@ where { let connected_point = self.to_connected_point(); let handler = builder(self.info()); - let id = self.active_nodes.add_reach_attempt(self.upgrade.map_err(|err| RawSwarmReachError::Transport(TransportError::Other(err))), handler); + let local_peer_id = self.local_peer_id; + let upgrade = self.upgrade + .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) + .and_then(move |(peer_id, muxer)| { + if peer_id == local_peer_id { + Err(InternalReachErr::FoundLocalPeerId) + } else { + Ok((peer_id, muxer)) + } + }); + let id = self.active_nodes.add_reach_attempt(upgrade, handler); self.other_reach_attempts.push(( id, connected_point, @@ -605,10 +699,19 @@ where TInEvent: Send + 'static, TOutEvent: Send + 'static, { - let future = self.transport().clone().dial(addr.clone())?; + let local_peer_id = self.reach_attempts.local_peer_id.clone(); + let future = self.transport().clone().dial(addr.clone())? + .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) + .and_then(move |(peer_id, muxer)| { + if peer_id == local_peer_id { + Err(InternalReachErr::FoundLocalPeerId) + } else { + Ok((peer_id, muxer)) + } + }); let connected_point = ConnectedPoint::Dialer { address: addr }; - let reach_id = self.active_nodes.add_reach_attempt(future.map_err(|err| RawSwarmReachError::Transport(TransportError::Other(err))), handler); + let reach_id = self.active_nodes.add_reach_attempt(future, handler); self.reach_attempts.other_reach_attempts.push((reach_id, connected_point)); Ok(()) } @@ -715,18 +818,18 @@ where Ok(fut) => { let expected_peer_id = peer_id.clone(); let fut = fut - .map_err(|err| RawSwarmReachError::Transport(TransportError::Other(err))) + .map_err(|err| InternalReachErr::Transport(TransportError::Other(err))) .and_then(move |(actual_peer_id, muxer)| { if actual_peer_id == expected_peer_id { Ok((actual_peer_id, muxer)) } else { - Err(RawSwarmReachError::PeerIdMismatch { obtained: actual_peer_id }) + Err(InternalReachErr::PeerIdMismatch { obtained: actual_peer_id }) } }); self.active_nodes.add_reach_attempt(fut, handler) }, Err(err) => { - let fut = future::err(RawSwarmReachError::Transport(err)); + let fut = future::err(InternalReachErr::Transport(err)); self.active_nodes.add_reach_attempt(fut, handler) }, }; @@ -766,6 +869,7 @@ where Async::Ready(ListenersEvent::Incoming { upgrade, listen_addr, send_back_addr }) => { let event = IncomingConnectionEvent { upgrade, + local_peer_id: self.reach_attempts.local_peer_id.clone(), listen_addr, send_back_addr, active_nodes: &mut self.active_nodes, @@ -876,7 +980,7 @@ impl Default for ActionItem { /// > panics will likely happen. fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>( reach_attempts: &mut ReachAttempts, - event: CollectionReachEvent, THandlerErr> + event: CollectionReachEvent, THandlerErr> ) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>) where TTrans: Transport + Clone, @@ -1005,7 +1109,7 @@ fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool { fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>( reach_attempts: &mut ReachAttempts, reach_id: ReachAttemptId, - error: RawSwarmReachError, + error: InternalReachErr, handler: THandler, ) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>) where TTrans: Transport @@ -1035,6 +1139,17 @@ where TTrans: Transport Default::default() }; + let error = match error { + InternalReachErr::Transport(err) => RawSwarmReachError::Transport(err), + InternalReachErr::PeerIdMismatch { obtained } => { + RawSwarmReachError::PeerIdMismatch { obtained } + }, + InternalReachErr::FoundLocalPeerId => { + unreachable!("We only generate FoundLocalPeerId within dial() or accept(); neither \ + of these methods add an entry to out_reach_attempts; QED") + }, + }; + return (action, RawSwarmEvent::DialError { remain_addrs_attempt: num_remain, peer_id, @@ -1050,12 +1165,16 @@ where TTrans: Transport .position(|i| i.0 == reach_id) { let (_, endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos); - let error = match error { - RawSwarmReachError::Transport(err) => err, - RawSwarmReachError::PeerIdMismatch { .. } => unreachable!(), // TODO: prove - }; match endpoint { ConnectedPoint::Dialer { address } => { + let error = match error { + InternalReachErr::Transport(err) => UnknownPeerDialErr::Transport(err), + InternalReachErr::FoundLocalPeerId => UnknownPeerDialErr::FoundLocalPeerId, + InternalReachErr::PeerIdMismatch { .. } => { + unreachable!("We only generate PeerIdMismatch within start_dial_out(), + which doesn't add any entry in other_reach_attempts; QED") + }, + }; return (Default::default(), RawSwarmEvent::UnknownPeerDialError { multiaddr: address, error, @@ -1063,8 +1182,19 @@ where TTrans: Transport }); } ConnectedPoint::Listener { listen_addr, send_back_addr } => { - let error = IncomingError::Transport(error); - return (Default::default(), RawSwarmEvent::IncomingConnectionError { listen_addr, send_back_addr, error }); + let error = match error { + InternalReachErr::Transport(err) => IncomingError::Transport(err), + InternalReachErr::FoundLocalPeerId => IncomingError::FoundLocalPeerId, + InternalReachErr::PeerIdMismatch { .. } => { + unreachable!("We only generate PeerIdMismatch within start_dial_out(), + which doesn't add any entry in other_reach_attempts; QED") + }, + }; + return (Default::default(), RawSwarmEvent::IncomingConnectionError { + listen_addr, + send_back_addr, + error + }); } } } @@ -1301,7 +1431,7 @@ where TTrans: Transport { attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>, - active_nodes: &'a mut CollectionStream, THandlerErr>, + active_nodes: &'a mut CollectionStream, THandlerErr>, } impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>