Fix self-dialing in Kademlia. (#1097)

* Fix self-dialing in Kademlia.

Addresses https://github.com/libp2p/rust-libp2p/issues/341 which is the cause
for one of the observations made in https://github.com/libp2p/rust-libp2p/issues/1053.
However, the latter is not assumed to be fully addressed by these changes and
needs further investigation.

Currently, whenever a search for a key yields a response containing the initiating
peer as one of the closest peers known to the remote, the local node
would attempt to dial itself. That attempt is ignored by the Swarm, but
the Kademlia behaviour now believes it still has a query ongoing which is
always doomed to time out. That timeout delays successful completion of the query.
Hence, any query where a remote responds with the ID of the local node takes at
least as long as the `rpc_timeout` to complete, which possibly affects almost
all queries in smaller clusters where every node knows about every other.

This problem is fixed here by ensuring that Kademlia never tries to dial the local node.
Furthermore, `Discovered` events are no longer emitted for the local node
and it is not inserted into the `untrusted_addresses` from discovery, as described
in #341.

This commit also includes a change to the condition for freezing / terminating
a Kademlia query upon receiving a response. Specifically, the condition is
tightened such that it only applies if in addition to `parallelism`
consecutive responses that failed to yield a peer closer to the target, the
last response must also either not have reported any new peer or the
number of collected peers has already reached the number of desired results.
In effect, a Kademlia query now tries harder to actually return `k`
closest peers.

Tests have been refactored and expanded.

* Add another comment.
This commit is contained in:
Roman Borschel
2019-05-02 21:43:29 +02:00
committed by GitHub
parent 77ce5a52dd
commit 808a7a5ef6
4 changed files with 147 additions and 207 deletions

View File

@ -250,7 +250,7 @@ impl<TSubstream> Kademlia<TSubstream> {
let parallelism = 3;
Kademlia {
kbuckets: KBucketsTable::new(KadHash::from(local_peer_id), Duration::from_secs(60)), // TODO: constant
kbuckets: KBucketsTable::new(local_peer_id.into(), Duration::from_secs(60)), // TODO: constant
queued_events: SmallVec::new(),
active_queries: Default::default(),
connected_peers: Default::default(),
@ -349,6 +349,33 @@ impl<TSubstream> Kademlia<TSubstream> {
})
);
}
/// Processes discovered peers from a query.
fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
where
I: Iterator<Item=&'a KadPeer> + Clone
{
let local_id = self.kbuckets.my_id().peer_id().clone();
let others_iter = peers.filter(|p| p.node_id != local_id);
for peer in others_iter.clone() {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(
KademliaOut::Discovered {
peer_id: peer.node_id.clone(),
addresses: peer.multiaddrs.clone(),
ty: peer.connection_ty,
}
));
}
if let Some(query) = self.active_queries.get_mut(query_id) {
for peer in others_iter.clone() {
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id))
}
}
}
impl<TSubstream> NetworkBehaviour for Kademlia<TSubstream>
@ -551,22 +578,7 @@ where
closer_peers,
user_data,
} => {
// It is possible that we obtain a response for a query that has finished, which is
// why we may not find an entry in `self.active_queries`.
for peer in closer_peers.iter() {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: peer.node_id.clone(),
addresses: peer.multiaddrs.clone(),
ty: peer.connection_ty,
}));
}
if let Some(query) = self.active_queries.get_mut(&user_data) {
for peer in closer_peers.iter() {
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
}
self.discovered(&user_data, &source, closer_peers.iter());
}
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
let closer_peers = self.kbuckets
@ -599,27 +611,16 @@ where
provider_peers,
user_data,
} => {
for peer in closer_peers.iter().chain(provider_peers.iter()) {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: peer.node_id.clone(),
addresses: peer.multiaddrs.clone(),
ty: peer.connection_ty,
}));
}
// It is possible that we obtain a response for a query that has finished, which is
// why we may not find an entry in `self.active_queries`.
let peers = closer_peers.iter().chain(provider_peers.iter());
self.discovered(&user_data, &source, peers);
if let Some(query) = self.active_queries.get_mut(&user_data) {
if let QueryInfoInner::GetProviders { pending_results, .. } = &mut query.target_mut().inner {
if let QueryInfoInner::GetProviders {
pending_results, ..
} = &mut query.target_mut().inner {
for peer in provider_peers {
pending_results.push(peer.node_id);
}
}
for peer in closer_peers.iter() {
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
}
}
KademliaHandlerEvent::QueryError { user_data, .. } => {
@ -702,7 +703,7 @@ where
peer_id: peer_id.clone(),
event: rpc,
});
} else {
} else if peer_id != self.kbuckets.my_id().peer_id() {
self.pending_rpcs.push((peer_id.clone(), rpc));
return Async::Ready(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),

View File

@ -20,45 +20,52 @@
#![cfg(test)]
use crate::{Kademlia, KademliaOut};
use futures::prelude::*;
use crate::{Kademlia, KademliaOut, kbucket::KBucketsPeerId};
use futures::{future, prelude::*};
use libp2p_core::{
multiaddr::Protocol,
upgrade::{self, InboundUpgradeExt, OutboundUpgradeExt},
PeerId,
Swarm,
Transport
Transport,
identity,
transport::{MemoryTransport, boxed::Boxed},
nodes::Substream,
multiaddr::{Protocol, multiaddr},
muxing::StreamMuxerBox,
upgrade,
};
use libp2p_core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
use std::io;
use libp2p_secio::SecioConfig;
use libp2p_yamux as yamux;
use rand::random;
use std::{io, u64};
use tokio::runtime::Runtime;
type TestSwarm = Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
Kademlia<Substream<StreamMuxerBox>>
>;
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
fn build_nodes(port_base: u64, num: usize)
-> Vec<Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, Kademlia<Substream<StreamMuxerBox>>>>
{
fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
let port_base = 1 + random::<u64>() % (u64::MAX - num as u64);
let mut result: Vec<Swarm<_, _>> = Vec::with_capacity(num);
for _ in 0 .. num {
// TODO: make creating the transport more elegant ; literaly half of the code of the test
// is about creating the transport
let local_key = libp2p_core::identity::Keypair::generate_ed25519();
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_core::transport::MemoryTransport::default()
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
let transport = MemoryTransport::default()
.with_upgrade(SecioConfig::new(local_key))
.and_then(move |out, endpoint| {
let peer_id = out.remote_key.into_peer_id();
let peer_id2 = peer_id.clone();
let upgrade = libp2p_yamux::Config::default()
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
.map(|(id, muxer)| (id, StreamMuxerBox::new(muxer)))
let yamux = yamux::Config::default();
upgrade::apply(out.stream, yamux, endpoint)
.map(|muxer| (peer_id, StreamMuxerBox::new(muxer)))
})
.map_err(|e| panic!("Failed to create transport: {:?}", e))
.boxed();
let kad = Kademlia::without_init(local_public_key.clone().into_peer_id());
let kad = Kademlia::new(local_public_key.clone().into_peer_id());
result.push(Swarm::new(transport, kad, local_public_key.into_peer_id()));
}
@ -68,34 +75,53 @@ fn build_nodes(port_base: u64, num: usize)
i += 1
}
result
(port_base, result)
}
#[test]
fn basic_find_node() {
// Build two nodes. Node #2 only knows about node #1. Node #2 is asked for a random peer ID.
// Node #2 must return the identity of node #1.
fn query_iter() {
fn run(n: usize) {
// Build `n` nodes. Node `n` knows about node `n-1`, node `n-1` knows about node `n-2`, etc.
// Node `n` is queried for a random peer and should return nodes `1..n-1` sorted by
// their distances to that peer.
let port_base = rand::random();
let mut swarms = build_nodes(port_base, 2);
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
let (port_base, mut swarms) = build_nodes(n);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
// Connect second to first.
swarms[1].add_not_connected_address(&first_peer_id, Protocol::Memory(port_base).into());
// Connect each swarm in the list to its predecessor in the list.
for (i, (swarm, peer)) in &mut swarms.iter_mut().skip(1).zip(swarm_ids.clone()).enumerate() {
swarm.add_not_connected_address(&peer, Protocol::Memory(port_base + i as u64).into())
}
// Ask the last peer in the list to search a random peer. The search should
// propagate backwards through the list of peers.
let search_target = PeerId::random();
swarms[1].find_node(search_target.clone());
swarms.last_mut().unwrap().find_node(search_target.clone());
tokio::runtime::Runtime::new()
.unwrap()
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
// Set up expectations.
let expected_swarm_id = swarm_ids.last().unwrap().clone();
let expected_peer_ids: Vec<_> = swarm_ids
.iter().cloned().take(n - 1).collect();
let mut expected_distances: Vec<_> = expected_peer_ids
.iter().map(|p| p.distance_with(&search_target)).collect();
expected_distances.sort();
// Run test
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
Async::Ready(Some(KademliaOut::FindNodeResult {
key, closer_peers
})) => {
assert_eq!(key, search_target);
assert_eq!(closer_peers.len(), 1);
assert_eq!(closer_peers[0], first_peer_id);
assert_eq!(swarm_ids[i], expected_swarm_id);
assert!(expected_peer_ids.iter().all(|p| closer_peers.contains(p)));
assert_eq!(expected_distances,
closer_peers.iter()
.map(|p| p.distance_with(&key))
.collect::<Vec<_>>());
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
@ -103,105 +129,12 @@ fn basic_find_node() {
}
}
}
Ok(Async::NotReady)
}))
.unwrap();
}
#[test]
fn direct_query() {
// Build three nodes. Node #2 knows about node #1. Node #3 knows about node #2. Node #3 is
// asked about a random peer and should return nodes #1 and #2.
let port_base = rand::random::<u64>() % (std::u64::MAX - 1);
let mut swarms = build_nodes(port_base, 3);
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
let second_peer_id = Swarm::local_peer_id(&swarms[1]).clone();
// Connect second to first.
swarms[1].add_not_connected_address(&first_peer_id, Protocol::Memory(port_base).into());
// Connect third to second.
swarms[2].add_not_connected_address(&second_peer_id, Protocol::Memory(port_base + 1).into());
// Ask third to search a random value.
let search_target = PeerId::random();
swarms[2].find_node(search_target.clone());
tokio::runtime::Runtime::new()
.unwrap()
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
assert_eq!(key, search_target);
assert_eq!(closer_peers.len(), 2);
assert!((closer_peers[0] == first_peer_id) != (closer_peers[1] == first_peer_id));
assert!((closer_peers[0] == second_peer_id) != (closer_peers[1] == second_peer_id));
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
Ok(Async::NotReady)
}))
.unwrap();
}
#[test]
fn indirect_query() {
// Build four nodes. Node #2 knows about node #1. Node #3 knows about node #2. Node #4 knows
// about node #3. Node #4 is asked about a random peer and should return nodes #1, #2 and #3.
let port_base = rand::random::<u64>() % (std::u64::MAX - 2);
let mut swarms = build_nodes(port_base, 4);
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
let second_peer_id = Swarm::local_peer_id(&swarms[1]).clone();
let third_peer_id = Swarm::local_peer_id(&swarms[2]).clone();
// Connect second to first.
swarms[1].add_not_connected_address(&first_peer_id, Protocol::Memory(port_base).into());
// Connect third to second.
swarms[2].add_not_connected_address(&second_peer_id, Protocol::Memory(port_base + 1).into());
// Connect fourth to third.
swarms[3].add_not_connected_address(&third_peer_id, Protocol::Memory(port_base + 2).into());
// Ask fourth to search a random value.
let search_target = PeerId::random();
swarms[3].find_node(search_target.clone());
tokio::runtime::Runtime::new()
.unwrap()
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
assert_eq!(key, search_target);
assert_eq!(closer_peers.len(), 3);
assert_eq!(closer_peers.iter().filter(|p| **p == first_peer_id).count(), 1);
assert_eq!(closer_peers.iter().filter(|p| **p == second_peer_id).count(), 1);
assert_eq!(closer_peers.iter().filter(|p| **p == third_peer_id).count(), 1);
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
Ok(Async::NotReady)
}))
.unwrap();
for n in 2..=8 { run(n) }
}
#[test]
@ -209,8 +142,7 @@ fn unresponsive_not_returned_direct() {
// Build one node. It contains fake addresses to non-existing nodes. We ask it to find a
// random peer. We make sure that no fake address is returned.
let port_base = rand::random();
let mut swarms = build_nodes(port_base, 1);
let (_, mut swarms) = build_nodes(1);
// Add fake addresses.
for _ in 0 .. 10 {
@ -221,9 +153,8 @@ fn unresponsive_not_returned_direct() {
let search_target = PeerId::random();
swarms[0].find_node(search_target.clone());
tokio::runtime::Runtime::new()
.unwrap()
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
@ -246,18 +177,17 @@ fn unresponsive_not_returned_direct() {
#[test]
fn unresponsive_not_returned_indirect() {
// Build two nodes. Node #2 knows about node #1. Node #1 contains fake addresses to
// non-existing nodes. We ask node #1 to find a random peer. We make sure that no fake address
// non-existing nodes. We ask node #2 to find a random peer. We make sure that no fake address
// is returned.
let port_base = rand::random();
let mut swarms = build_nodes(port_base, 2);
let (port_base, mut swarms) = build_nodes(2);
// Add fake addresses to first.
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
for _ in 0 .. 10 {
swarms[0].add_not_connected_address(
&PeerId::random(),
libp2p_core::multiaddr::multiaddr![Udp(10u16)]
multiaddr![Udp(10u16)]
);
}
@ -268,9 +198,8 @@ fn unresponsive_not_returned_indirect() {
let search_target = PeerId::random();
swarms[1].find_node(search_target.clone());
tokio::runtime::Runtime::new()
.unwrap()
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {

View File

@ -764,9 +764,11 @@ fn process_kad_response<TUserData>(
user_data,
}
}
KadResponseMsg::FindNode { closer_peers } => KademliaHandlerEvent::FindNodeRes {
KadResponseMsg::FindNode { closer_peers } => {
KademliaHandlerEvent::FindNodeRes {
closer_peers,
user_data,
}
},
KadResponseMsg::GetProviders {
closer_peers,

View File

@ -165,6 +165,8 @@ where
}
}
let num_closest = self.closest_peers.len();
// Add the entries in `closest_peers`.
if let QueryStage::Iterating {
ref mut no_closer_in_a_row,
@ -190,10 +192,11 @@ where
});
// Make sure we don't insert duplicates.
let mut iter_start = self.closest_peers.iter().skip(insert_pos_start);
let duplicate = if let Some(insert_pos_size) = insert_pos_size {
self.closest_peers.iter().skip(insert_pos_start).take(insert_pos_size).any(|e| e.0 == elem_to_add)
iter_start.take(insert_pos_size).any(|e| e.0 == elem_to_add)
} else {
self.closest_peers.iter().skip(insert_pos_start).any(|e| e.0 == elem_to_add)
iter_start.any(|e| e.0 == elem_to_add)
};
if !duplicate {
@ -204,7 +207,7 @@ where
self.closest_peers
.insert(insert_pos_start, (elem_to_add, QueryPeerState::NotContacted));
}
} else if self.closest_peers.len() < self.num_results {
} else if num_closest < self.num_results {
debug_assert!(self.closest_peers.iter().all(|e| e.0 != elem_to_add));
self.closest_peers
.push((elem_to_add, QueryPeerState::NotContacted));
@ -215,16 +218,21 @@ where
// Check for duplicates in `closest_peers`.
debug_assert!(self.closest_peers.windows(2).all(|w| w[0].0 != w[1].0));
// Handle if `no_closer_in_a_row` is too high.
let freeze = if let QueryStage::Iterating { no_closer_in_a_row } = self.stage {
no_closer_in_a_row >= self.parallelism
} else {
false
};
if freeze {
let num_closest_new = self.closest_peers.len();
// Termination condition: If at least `self.parallelism` consecutive
// responses yield no peer closer to the target and either no new peers
// were discovered or the number of discovered peers reached the desired
// number of results, then the query is considered complete.
if let QueryStage::Iterating { no_closer_in_a_row } = self.stage {
if no_closer_in_a_row >= self.parallelism &&
(num_closest == num_closest_new ||
num_closest_new >= self.num_results)
{
self.stage = QueryStage::Frozen;
}
}
}
/// Returns the list of peers for which we are waiting for an answer.
pub fn waiting(&self) -> impl Iterator<Item = &TPeerId> {
@ -351,8 +359,8 @@ where
}
}
// If we don't have any query in progress, return `Finished` as we don't have anything more
// we can do.
// If we don't have any query in progress, return `Finished` as we don't have
// anything more we can do.
if active_counter > 0 {
Async::NotReady
} else {