Add a dial priority system (#753)

This commit is contained in:
Pierre Krieger 2018-12-07 14:43:51 +01:00 committed by GitHub
parent dd5fb17a2b
commit 8d8fc75a4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 23 deletions

View File

@ -63,6 +63,9 @@ where
#[derive(Debug)]
struct ReachAttempts {
/// Peer ID of the node we control.
local_peer_id: PeerId,
/// Attempts to reach a peer.
out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>,
@ -416,12 +419,13 @@ where
{
/// Creates a new node events stream.
#[inline]
pub fn new(transport: TTrans) -> Self {
pub fn new(transport: TTrans, local_peer_id: PeerId) -> Self {
// TODO: with_capacity?
RawSwarm {
listeners: ListenersStream::new(transport),
active_nodes: CollectionStream::new(),
reach_attempts: ReachAttempts {
local_peer_id,
out_reach_attempts: Default::default(),
other_reach_attempts: Vec::new(),
connected_points: Default::default(),
@ -466,6 +470,14 @@ where
.flat_map(move |server| self.transport().nat_traversal(server, observed_addr))
}
/// Returns the peer id of the local node.
///
/// This is the same value as was passed to `new()`.
#[inline]
pub fn local_peer_id(&self) -> &PeerId {
&self.reach_attempts.local_peer_id
}
/// Dials a multiaddress without knowing the peer ID we're going to obtain.
///
/// The second parameter is the handler to use if we manage to reach a node.
@ -746,6 +758,21 @@ where
{
let (_, opened_endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos);
// If we already have an active connection to this peer, a priority system comes into play.
// If we have a lower peer ID than the incoming one, we drop an incoming connection.
if event.would_replace() && has_dial_prio(&reach_attempts.local_peer_id, event.peer_id()) {
if let Some(ConnectedPoint::Dialer { .. }) = reach_attempts.connected_points.get(event.peer_id()) {
if let ConnectedPoint::Listener { listen_addr, send_back_addr } = opened_endpoint {
return (Default::default(), RawSwarmEvent::IncomingConnectionError {
listen_addr,
send_back_addr,
error: IoError::new(IoErrorKind::PermissionDenied,
"refused incoming connection".to_string()),
});
}
}
}
// Set the endpoint for this peer.
let closed_endpoint = reach_attempts.connected_points.insert(event.peer_id().clone(), opened_endpoint.clone());
@ -824,6 +851,15 @@ where
find back this ID in either of these two sets");
}
/// Returns true if `local` has dialing priority over `other`.
///
/// This means that if `local` and `other` both dial each other, the connection from `local` should
/// be kept and the one from `other` will be dropped.
#[inline]
fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool {
local.as_bytes() < other.as_bytes()
}
/// Handles a reach error event from the collection.
///
/// Optionally returns an event to return from the stream.
@ -1245,13 +1281,13 @@ mod tests {
fn query_transport() {
let transport = DummyTransport::new();
let transport2 = transport.clone();
let raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport);
let raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random());
assert_eq!(raw_swarm.transport(), &transport2);
}
#[test]
fn starts_listening() {
let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random());
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = addr.clone();
assert!(raw_swarm.listen_on(addr).is_ok());
@ -1264,7 +1300,7 @@ mod tests {
fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() {
// the DummyTransport nat_traversal increments the port number by one for Ip4 addresses
let transport = DummyTransport::new();
let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport);
let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random());
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
// An unrelated outside address is returned as-is, no transform
let outside_addr1 = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
@ -1292,7 +1328,7 @@ mod tests {
#[test]
fn successful_dial_reaches_a_node() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random());
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let dial_res = swarm.dial(addr, Handler::default());
assert!(dial_res.is_ok());
@ -1330,7 +1366,7 @@ mod tests {
// Set up listener to see an incoming connection
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer)))));
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport);
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random());
swarm.listen_on("/memory".parse().unwrap()).unwrap();
// no incoming yet
@ -1355,7 +1391,7 @@ mod tests {
#[test]
fn broadcasted_events_reach_active_nodes() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random());
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Opened);
@ -1402,7 +1438,7 @@ mod tests {
#[test]
fn querying_for_pending_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random());
let peer_id = PeerId::random();
let peer = swarm.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. }));
@ -1414,7 +1450,7 @@ mod tests {
#[test]
fn querying_for_unknown_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random());
let peer_id = PeerId::random();
let peer = swarm.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected( PeerNotConnected { nodes: _, peer_id: node_peer_id }) => {
@ -1424,7 +1460,7 @@ mod tests {
#[test]
fn querying_for_connected_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new(), PeerId::random());
// Dial a node
let addr = "/ip4/127.0.0.1/tcp/1234".parse().expect("bad multiaddr");
@ -1458,7 +1494,7 @@ mod tests {
// Set up listener to be closed
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(None)));
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport);
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random());
swarm.listen_on("/memory".parse().unwrap()).unwrap();
let mut rt = Runtime::new().unwrap();
@ -1477,7 +1513,7 @@ mod tests {
fn unknown_peer_that_is_unreachable_yields_unknown_peer_dial_error() {
let mut transport = DummyTransport::new();
transport.make_dial_fail();
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport);
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random());
let addr = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let handler = Handler::default();
let dial_result = swarm.dial(addr, handler);
@ -1508,7 +1544,7 @@ mod tests {
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
transport.make_dial_fail();
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random())));
{
let swarm1 = swarm.clone();
@ -1548,7 +1584,7 @@ mod tests {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random())));
{
// Set up an outgoing connection with a PeerId we know
@ -1602,7 +1638,7 @@ mod tests {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport, PeerId::random())));
{
// Set up an outgoing connection with a PeerId we know
@ -1650,4 +1686,13 @@ mod tests {
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn local_prio_equivalence_relation() {
for _ in 0..1000 {
let a = PeerId::random();
let b = PeerId::random();
assert_ne!(has_dial_prio(&a, &b), has_dial_prio(&b, &a));
}
}
}

View File

@ -58,9 +58,6 @@ where TTransport: Transport,
/// Public key of the local node.
local_public_key: PublicKey,
/// Peer ID of the local node.
local_peer_id: PeerId,
/// List of protocols that the behaviour says it supports.
supported_protocols: SmallVec<[Vec<u8>; 16]>,
@ -129,16 +126,14 @@ where TBehaviour: NetworkBehaviour<TTopology>,
.map(|(name, _)| name.to_vec())
.collect();
let raw_swarm = RawSwarm::new(transport);
let local_peer_id = local_public_key.clone().into_peer_id();
let raw_swarm = RawSwarm::new(transport, local_peer_id.clone());
Swarm {
raw_swarm,
behaviour,
topology,
local_public_key,
local_peer_id,
supported_protocols,
listened_addrs: SmallVec::new(),
external_addresses: SmallVec::new(),
@ -195,7 +190,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
/// Returns the peer ID of the swarm passed as parameter.
#[inline]
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.local_peer_id
&me.raw_swarm.local_peer_id()
}
/// Returns the topology of the swarm.
@ -278,7 +273,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
listened_addrs: &self.listened_addrs,
external_addresses: &self.external_addresses,
local_public_key: &self.local_public_key,
local_peer_id: &self.local_peer_id,
local_peer_id: &self.raw_swarm.local_peer_id(),
};
self.behaviour.poll(&mut parameters)
};