src/query/peers/closest: Consider K_VALUE peers at initialization (#1536)

* protocols/kad/query/peers/closest: Consider K_VALUE nodes at init

By considering `K_VALUE` at `ClosestPeersIter` initialization, the initial peer
set length is independent of `num_results` and thus of the `replication_factor`.

* protocols/kad/src/behaviour/test: Enable building single nodes

Introduces the `build_node` function to build a single not connected
node. Along the way replace the notion of a `port_base` with returning
the actual `Multiaddr` of the node.

* protocols/kad/behaviour/test: Fix bootstrap test initialization

When looking for the closest node to a key, Kademlia considers
ALPHA_VALUE nodes to query at initialization. If `num_groups` is larger
than ALPHA_VALUE the remaining locally known nodes will not be
considered. Given that no other node is aware of them other than node 1,
they would be lost entirely. To prevent the above restrict `num_groups`
to be equal or smaller than ALPHA_VALUE.

* protocols/kad/behaviour/test: Fix put_record and get_provider

In the past, when trying to find the closest nodes to a key, Kademlia
would consider `num_result` amount of nodes to query out of all the
nodes it is aware of.

Both the `put_record` and the `get_provider` tests initialized their
swarms in the same way. The tests took the replication factor to use as
an input. The number of results to get was equal to the replication
factor. The amount of swarms to start was twice the replication factor.
Nodes would be grouped in two groups a replication factor nodes. The
first node would be aware of all of the nodes in the first group. The
last node of the first group would be aware of all the nodes in the
second group.

By coincidence (I assume) these numbers played together very well. At
initialization the first node would consider `num_results` amount of
peers (see first paragraph). It would then contact each of them. As the
first node is aware of the last node of the first group which in turn is
aware of all nodes in the second group, the first node would eventually
discover all nodes.

Recently the amount of nodes Kademlia considers at initialization when
looking for the nodes closest to a key was changed to only consider
ALPHA nodes.

With this in mind playing through the test setup above again would
result in (1) `replication_factor - ALPHA` nodes being entirely lost as
the first node would never consider them and (2) the first node probably
never contacting the last node out of the first group and thus not
discovering any nodes of the second group.

To keep the multi hop discovery in place while not basing ones test
setup on the lucky assumption of Kademlia considering replication factor
amount of nodes at initialization, this patch alters the two tests:

Build a fully connected set of nodes and one addition node (the first
node). Connect the first node to a single node of the fully connected
set (simulating a boot node). Continue as done previously.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
Max Inden 2020-04-17 19:57:35 +02:00 committed by GitHub
parent 87b5efb3e8
commit 9f981a4bb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 212 additions and 78 deletions

View File

@ -22,7 +22,7 @@
use super::*;
use crate::K_VALUE;
use crate::{ALPHA_VALUE, K_VALUE};
use crate::kbucket::Distance;
use crate::record::store::MemoryStore;
use futures::{
@ -35,7 +35,7 @@ use libp2p_core::{
Transport,
identity,
transport::MemoryTransport,
multiaddr::{Protocol, multiaddr},
multiaddr::{Protocol, Multiaddr, multiaddr},
muxing::StreamMuxerBox,
upgrade
};
@ -49,61 +49,83 @@ use multihash::{wrap, Code, Multihash};
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
fn build_node() -> (Multiaddr, TestSwarm) {
build_node_with_config(Default::default())
}
fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone());
let mut swarm = Swarm::new(transport, behaviour, local_id);
let address: Multiaddr = Protocol::Memory(random::<u64>()).into();
Swarm::listen_on(&mut swarm, address.clone()).unwrap();
(address, swarm)
}
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
fn build_nodes(num: usize) -> Vec<(Multiaddr, TestSwarm)> {
build_nodes_with_config(num, Default::default())
}
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwarm>) {
let port_base = 1 + random::<u64>() % (u64::MAX - num as u64);
let mut result: Vec<Swarm<_, _>> = Vec::with_capacity(num);
for _ in 0 .. num {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone());
result.push(Swarm::new(transport, behaviour, local_id));
}
for (i, s) in result.iter_mut().enumerate() {
Swarm::listen_on(s, Protocol::Memory(port_base + i as u64).into()).unwrap();
}
(port_base, result)
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Multiaddr, TestSwarm)> {
(0..num).map(|_| build_node_with_config(cfg.clone())).collect()
}
fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<TestSwarm>) {
fn build_connected_nodes(total: usize, step: usize) -> Vec<(Multiaddr, TestSwarm)> {
build_connected_nodes_with_config(total, step, Default::default())
}
fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig)
-> (Vec<PeerId>, Vec<TestSwarm>)
-> Vec<(Multiaddr, TestSwarm)>
{
let (port_base, mut swarms) = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms.iter()
.map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone()))
.collect();
let mut i = 0;
for (j, peer) in swarm_ids.iter().enumerate().skip(1) {
for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) {
if i < swarm_ids.len() {
swarms[i].add_address(&peer, Protocol::Memory(port_base + j as u64).into());
swarms[i].1.add_address(peer_id, addr.clone());
}
if j % step == 0 {
i += step;
}
}
(swarm_ids, swarms)
swarms
}
fn build_fully_connected_nodes_with_config(total: usize, cfg: KademliaConfig)
-> Vec<(Multiaddr, TestSwarm)>
{
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_addr_and_peer_id: Vec<_> = swarms.iter()
.map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone()))
.collect();
for (_addr, swarm) in swarms.iter_mut() {
for (addr, peer) in &swarm_addr_and_peer_id {
swarm.add_address(&peer, addr.clone());
}
}
swarms
}
fn random_multihash() -> Multihash {
@ -114,8 +136,17 @@ fn random_multihash() -> Multihash {
fn bootstrap() {
fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2, 20);
let num_group = rng.gen_range(1, num_total);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);
// When looking for the closest node to a key, Kademlia considers ALPHA_VALUE nodes to query
// at initialization. If `num_groups` is larger than ALPHA_VALUE the remaining locally known
// nodes will not be considered. Given that no other node is aware of them, they would be
// lost entirely. To prevent the above restrict `num_groups` to be equal or smaller than
// ALPHA_VALUE.
let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2);
let mut swarms = build_connected_nodes(num_total, num_group).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
swarms[0].bootstrap();
@ -166,7 +197,10 @@ fn query_iter() {
fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2, 20);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, 1);
let mut swarms = build_connected_nodes(num_total, 1).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
// Ask the first peer in the list to search a random peer. The search should
// propagate forwards through the list of peers.
@ -218,7 +252,9 @@ fn unresponsive_not_returned_direct() {
// Build one node. It contains fake addresses to non-existing nodes. We ask it to find a
// random peer. We make sure that no fake address is returned.
let (_, mut swarms) = build_nodes(1);
let mut swarms = build_nodes(1).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
// Add fake addresses.
for _ in 0 .. 10 {
@ -258,16 +294,20 @@ fn unresponsive_not_returned_indirect() {
// non-existing nodes. We ask node #2 to find a random peer. We make sure that no fake address
// is returned.
let (port_base, mut swarms) = build_nodes(2);
let mut swarms = build_nodes(2);
// Add fake addresses to first.
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
for _ in 0 .. 10 {
swarms[0].add_address(&PeerId::random(), multiaddr![Udp(10u16)]);
swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)]);
}
// Connect second to first.
swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into());
let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone();
let first_address = swarms[0].0.clone();
swarms[1].1.add_address(&first_peer_id, first_address);
// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();
// Ask second to search a random value.
let search_target = PeerId::random();
@ -299,12 +339,19 @@ fn unresponsive_not_returned_indirect() {
#[test]
fn get_record_not_found() {
let (port_base, mut swarms) = build_nodes(3);
let mut swarms = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
let swarm_ids: Vec<_> = swarms.iter()
.map(|(_addr, swarm)| Swarm::local_peer_id(swarm))
.cloned()
.collect();
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let (second, third) = (swarms[1].0.clone(), swarms[2].0.clone());
swarms[0].1.add_address(&swarm_ids[1], second);
swarms[1].1.add_address(&swarm_ids[2], third);
// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();
let target_key = record::Key::from(random_multihash());
swarms[0].get_record(&target_key, Quorum::One);
@ -338,16 +385,35 @@ fn get_record_not_found() {
)
}
/// A node joining a fully connected network via a single bootnode should be able to put a record to
/// the X closest nodes of the network where X is equal to the configured replication factor.
#[test]
fn put_record() {
fn prop(replication_factor: usize, records: Vec<Record>) {
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap();
let num_total = replication_factor.get() * 2;
let num_group = replication_factor.get();
let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor);
let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
let mut swarms = {
let mut fully_connected_swarms = build_fully_connected_nodes_with_config(
num_total - 1,
config.clone(),
);
let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address(
Swarm::local_peer_id(&fully_connected_swarms[0].1),
fully_connected_swarms[0].0.clone(),
);
let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms);
// Drop the swarm addresses.
swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>()
};
let records = records.into_iter()
.take(num_total)
@ -378,7 +444,7 @@ fn put_record() {
Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) |
Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => {
match res {
Err(e) => panic!(e),
Err(e) => panic!("{:?}", e),
Ok(ok) => {
assert!(records.contains_key(&ok.key));
let record = swarm.store.get(&ok.key).unwrap();
@ -408,10 +474,14 @@ fn put_record() {
assert_eq!(r.key, expected.key);
assert_eq!(r.value, expected.value);
assert_eq!(r.expires, expected.expires);
assert_eq!(r.publisher.as_ref(), Some(&swarm_ids[0]));
assert_eq!(r.publisher.as_ref(), Some(Swarm::local_peer_id(&swarms[0])));
let key = kbucket::Key::new(r.key.clone());
let mut expected = swarm_ids.clone().split_off(1);
let mut expected = swarms.iter()
.skip(1)
.map(Swarm::local_peer_id)
.cloned()
.collect::<Vec<_>>();
expected.sort_by(|id1, id2|
kbucket::Key::new(id1.clone()).distance(&key).cmp(
&kbucket::Key::new(id2.clone()).distance(&key)));
@ -421,17 +491,32 @@ fn put_record() {
.take(replication_factor.get())
.collect::<HashSet<_>>();
let actual = swarms.iter().enumerate().skip(1)
.filter_map(|(i, s)|
if s.store.get(key.preimage()).is_some() {
Some(swarm_ids[i].clone())
let actual = swarms.iter()
.skip(1)
.filter_map(|swarm|
if swarm.store.get(key.preimage()).is_some() {
Some(Swarm::local_peer_id(swarm).clone())
} else {
None
})
.collect::<HashSet<_>>();
assert_eq!(actual.len(), replication_factor.get());
assert_eq!(actual, expected);
let actual_not_expected = actual.difference(&expected)
.collect::<Vec<&PeerId>>();
assert!(
actual_not_expected.is_empty(),
"Did not expect records to be stored on nodes {:?}.",
actual_not_expected,
);
let expected_not_actual = expected.difference(&actual)
.collect::<Vec<&PeerId>>();
assert!(expected_not_actual.is_empty(),
"Expected record to be stored on nodes {:?}.",
expected_not_actual,
);
}
if republished {
@ -452,17 +537,21 @@ fn put_record() {
)
}
QuickCheck::new().tests(3).quickcheck(prop as fn(_,_))
QuickCheck::new().tests(3).quickcheck(prop as fn(_,_) -> _)
}
#[test]
fn get_value() {
let (port_base, mut swarms) = build_nodes(3);
let mut swarms = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
// Let first peer know of second peer and second peer know of third peer.
for i in 0..2 {
let (peer_id, address) = (Swarm::local_peer_id(&swarms[i+1].1).clone(), swarms[i+1].0.clone());
swarms[i].1.add_address(&peer_id, address);
}
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();
let record = Record::new(random_multihash(), vec![4,5,6]);
@ -496,7 +585,9 @@ fn get_value() {
fn get_value_many() {
// TODO: Randomise
let num_nodes = 12;
let (_, mut swarms) = build_connected_nodes(num_nodes, num_nodes);
let mut swarms = build_connected_nodes(num_nodes, 3).into_iter()
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();
let num_results = 10;
let record = Record::new(random_multihash(), vec![4,5,6]);
@ -530,17 +621,36 @@ fn get_value_many() {
)
}
/// A node joining a fully connected network via a single bootnode should be able to add itself as a
/// provider to the X closest nodes of the network where X is equal to the configured replication
/// factor.
#[test]
fn add_provider() {
fn prop(replication_factor: usize, keys: Vec<record::Key>) {
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap();
let num_total = replication_factor.get() * 2;
let num_group = replication_factor.get();
let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor);
let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
let mut swarms = {
let mut fully_connected_swarms = build_fully_connected_nodes_with_config(
num_total - 1,
config.clone(),
);
let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address(
Swarm::local_peer_id(&fully_connected_swarms[0].1),
fully_connected_swarms[0].0.clone(),
);
let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms);
// Drop addresses before returning.
swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>()
};
let keys: HashSet<_> = keys.into_iter().take(num_total).collect();
@ -594,10 +704,10 @@ fn add_provider() {
// each key was published to the `replication_factor` closest peers.
while let Some(key) = results.pop() {
// Collect the nodes that have a provider record for `key`.
let actual = swarms.iter().enumerate().skip(1)
.filter_map(|(i, s)|
if s.store.providers(&key).len() == 1 {
Some(swarm_ids[i].clone())
let actual = swarms.iter().skip(1)
.filter_map(|swarm|
if swarm.store.providers(&key).len() == 1 {
Some(Swarm::local_peer_id(&swarm).clone())
} else {
None
})
@ -609,7 +719,11 @@ fn add_provider() {
return Poll::Pending
}
let mut expected = swarm_ids.clone().split_off(1);
let mut expected = swarms.iter()
.skip(1)
.map(Swarm::local_peer_id)
.cloned()
.collect::<Vec<_>>();
let kbucket_key = kbucket::Key::new(key);
expected.sort_by(|id1, id2|
kbucket::Key::new(id1.clone()).distance(&kbucket_key).cmp(
@ -625,8 +739,8 @@ fn add_provider() {
// One round of publishing is complete.
assert!(results.is_empty());
for s in &swarms {
assert_eq!(s.queries.size(), 0);
for swarm in &swarms {
assert_eq!(swarm.queries.size(), 0);
}
if republished {
@ -656,19 +770,19 @@ fn add_provider() {
/// arithmetic overflow, see https://github.com/libp2p/rust-libp2p/issues/1290.
#[test]
fn exceed_jobs_max_queries() {
let (_, mut swarms) = build_nodes(1);
let (_addr, mut swarm) = build_node();
let num = JOBS_MAX_QUERIES + 1;
for _ in 0 .. num {
swarms[0].bootstrap();
swarm.bootstrap();
}
assert_eq!(swarms[0].queries.size(), num);
assert_eq!(swarm.queries.size(), num);
block_on(
poll_fn(move |ctx| {
for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) {
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
if let KademliaEvent::BootstrapResult(r) = e {
assert!(r.is_ok(), "Unexpected error")
} else {

View File

@ -108,7 +108,7 @@ impl ClosestPeersIter {
let state = PeerState::NotContacted;
(distance, Peer { key, state })
})
.take(config.num_results));
.take(K_VALUE.into()));
// The iterator initially makes progress by iterating towards the target.
let state = State::Iterating { no_progress : 0 };
@ -695,6 +695,26 @@ mod tests {
}
#[test]
fn without_success_try_up_to_k_peers() {
fn prop(mut iter: ClosestPeersIter) {
let now = Instant::now();
for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
match iter.next(now) {
PeersIterState::Waiting(Some(p)) => {
let peer = p.clone().into_owned();
iter.on_failure(&peer);
},
_ => panic!("Expected iterator to yield another peer to query."),
}
}
assert_eq!(PeersIterState::Finished, iter.next(now));
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
fn stalled_at_capacity() {
fn prop(mut iter: ClosestPeersIter) {
iter.state = State::Stalled;