diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index a397ff9f..9145f320 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -63,6 +63,9 @@ where #[derive(Debug)] struct ReachAttempts { + /// Peer ID of the node we control. + local_peer_id: PeerId, + /// Attempts to reach a peer. out_reach_attempts: FnvHashMap, @@ -416,12 +419,13 @@ where { /// Creates a new node events stream. #[inline] - pub fn new(transport: TTrans) -> Self { + pub fn new(transport: TTrans, local_peer_id: PeerId) -> Self { // TODO: with_capacity? RawSwarm { listeners: ListenersStream::new(transport), active_nodes: CollectionStream::new(), reach_attempts: ReachAttempts { + local_peer_id, out_reach_attempts: Default::default(), other_reach_attempts: Vec::new(), connected_points: Default::default(), @@ -466,6 +470,14 @@ where .flat_map(move |server| self.transport().nat_traversal(server, observed_addr)) } + /// Returns the peer id of the local node. + /// + /// This is the same value as was passed to `new()`. + #[inline] + pub fn local_peer_id(&self) -> &PeerId { + &self.reach_attempts.local_peer_id + } + /// Dials a multiaddress without knowing the peer ID we're going to obtain. /// /// The second parameter is the handler to use if we manage to reach a node. @@ -746,6 +758,21 @@ where { let (_, opened_endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos); + // If we already have an active connection to this peer, a priority system comes into play. + // If we have a lower peer ID than the incoming one, we drop an incoming connection. + if event.would_replace() && has_dial_prio(&reach_attempts.local_peer_id, event.peer_id()) { + if let Some(ConnectedPoint::Dialer { .. }) = reach_attempts.connected_points.get(event.peer_id()) { + if let ConnectedPoint::Listener { listen_addr, send_back_addr } = opened_endpoint { + return (Default::default(), RawSwarmEvent::IncomingConnectionError { + listen_addr, + send_back_addr, + error: IoError::new(IoErrorKind::PermissionDenied, + "refused incoming connection".to_string()), + }); + } + } + } + // Set the endpoint for this peer. let closed_endpoint = reach_attempts.connected_points.insert(event.peer_id().clone(), opened_endpoint.clone()); @@ -824,6 +851,15 @@ where find back this ID in either of these two sets"); } +/// Returns true if `local` has dialing priority over `other`. +/// +/// This means that if `local` and `other` both dial each other, the connection from `local` should +/// be kept and the one from `other` will be dropped. +#[inline] +fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool { + local.as_bytes() < other.as_bytes() +} + /// Handles a reach error event from the collection. /// /// Optionally returns an event to return from the stream. @@ -1245,13 +1281,13 @@ mod tests { fn query_transport() { let transport = DummyTransport::new(); let transport2 = transport.clone(); - let raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport); + let raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()); assert_eq!(raw_swarm.transport(), &transport2); } #[test] fn starts_listening() { - let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random()); let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); let addr2 = addr.clone(); assert!(raw_swarm.listen_on(addr).is_ok()); @@ -1264,7 +1300,7 @@ mod tests { fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() { // the DummyTransport nat_traversal increments the port number by one for Ip4 addresses let transport = DummyTransport::new(); - let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport); + let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()); let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); // An unrelated outside address is returned as-is, no transform let outside_addr1 = "/memory".parse::().expect("bad multiaddr"); @@ -1292,7 +1328,7 @@ mod tests { #[test] fn successful_dial_reaches_a_node() { - let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random()); let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); let dial_res = swarm.dial(addr, Handler::default()); assert!(dial_res.is_ok()); @@ -1330,7 +1366,7 @@ mod tests { // Set up listener to see an incoming connection transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer))))); - let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()); swarm.listen_on("/memory".parse().unwrap()).unwrap(); // no incoming yet @@ -1355,7 +1391,7 @@ mod tests { #[test] fn broadcasted_events_reach_active_nodes() { - let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random()); let mut muxer = DummyMuxer::new(); muxer.set_inbound_connection_state(DummyConnectionState::Pending); muxer.set_outbound_connection_state(DummyConnectionState::Opened); @@ -1402,7 +1438,7 @@ mod tests { #[test] fn querying_for_pending_peer() { - let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random()); let peer_id = PeerId::random(); let peer = swarm.peer(peer_id.clone()); assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. })); @@ -1414,7 +1450,7 @@ mod tests { #[test] fn querying_for_unknown_peer() { - let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random()); let peer_id = PeerId::random(); let peer = swarm.peer(peer_id.clone()); assert_matches!(peer, Peer::NotConnected( PeerNotConnected { nodes: _, peer_id: node_peer_id }) => { @@ -1424,7 +1460,7 @@ mod tests { #[test] fn querying_for_connected_peer() { - let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random()); // Dial a node let addr = "/ip4/127.0.0.1/tcp/1234".parse().expect("bad multiaddr"); @@ -1458,7 +1494,7 @@ mod tests { // Set up listener to be closed transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(None))); - let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()); swarm.listen_on("/memory".parse().unwrap()).unwrap(); let mut rt = Runtime::new().unwrap(); @@ -1477,7 +1513,7 @@ mod tests { fn unknown_peer_that_is_unreachable_yields_unknown_peer_dial_error() { let mut transport = DummyTransport::new(); transport.make_dial_fail(); - let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()); let addr = "/memory".parse::().expect("bad multiaddr"); let handler = Handler::default(); let dial_result = swarm.dial(addr, handler); @@ -1508,7 +1544,7 @@ mod tests { let peer_id = PeerId::random(); transport.set_next_peer_id(&peer_id); transport.make_dial_fail(); - let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()))); { let swarm1 = swarm.clone(); @@ -1548,7 +1584,7 @@ mod tests { let mut transport = DummyTransport::new(); let peer_id = PeerId::random(); transport.set_next_peer_id(&peer_id); - let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()))); { // Set up an outgoing connection with a PeerId we know @@ -1602,7 +1638,7 @@ mod tests { let mut transport = DummyTransport::new(); let peer_id = PeerId::random(); transport.set_next_peer_id(&peer_id); - let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random()))); { // Set up an outgoing connection with a PeerId we know @@ -1650,4 +1686,13 @@ mod tests { Ok(Async::Ready(())) })).expect("tokio works"); } + + #[test] + fn local_prio_equivalence_relation() { + for _ in 0..1000 { + let a = PeerId::random(); + let b = PeerId::random(); + assert_ne!(has_dial_prio(&a, &b), has_dial_prio(&b, &a)); + } + } } diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 1c071847..4a763d6b 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -58,9 +58,6 @@ where TTransport: Transport, /// Public key of the local node. local_public_key: PublicKey, - /// Peer ID of the local node. - local_peer_id: PeerId, - /// List of protocols that the behaviour says it supports. supported_protocols: SmallVec<[Vec; 16]>, @@ -129,16 +126,14 @@ where TBehaviour: NetworkBehaviour, .map(|(name, _)| name.to_vec()) .collect(); - let raw_swarm = RawSwarm::new(transport); - let local_peer_id = local_public_key.clone().into_peer_id(); + let raw_swarm = RawSwarm::new(transport, local_peer_id.clone()); Swarm { raw_swarm, behaviour, topology, local_public_key, - local_peer_id, supported_protocols, listened_addrs: SmallVec::new(), external_addresses: SmallVec::new(), @@ -195,7 +190,7 @@ where TBehaviour: NetworkBehaviour, /// Returns the peer ID of the swarm passed as parameter. #[inline] pub fn local_peer_id(me: &Self) -> &PeerId { - &me.local_peer_id + &me.raw_swarm.local_peer_id() } /// Returns the topology of the swarm. @@ -278,7 +273,7 @@ where TBehaviour: NetworkBehaviour, listened_addrs: &self.listened_addrs, external_addresses: &self.external_addresses, local_public_key: &self.local_public_key, - local_peer_id: &self.local_peer_id, + local_peer_id: &self.raw_swarm.local_peer_id(), }; self.behaviour.poll(&mut parameters) };