mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-28 01:01:34 +00:00
Kademlia: Somewhat complete the records implementation. (#1189)
* Somewhat complete the implementation of Kademlia records. This commit relates to [libp2p-146] and [libp2p-1089]. * All records expire (by default, configurable). * Provider records are also stored in the RecordStore, and the RecordStore API extended. * Background jobs for periodic (re-)replication and (re-)publication of records. Regular (value-)records are subject to re-replication and re-publication as per standard Kademlia. Provider records are only subject to re-publication. * For standard Kademlia value lookups (quorum = 1), the record is cached at the closest peer to the key that did not return the value, as per standard Kademlia. * Expiration times of regular (value-)records is computed exponentially inversely proportional to the number of nodes between the local node and the closest node known to the key (beyond the k closest), as per standard Kademlia. The protobuf messages are extended with two fields: `ttl` and `publisher` in order to implement the different semantics of re-replication (by any of the k closest peers to the key, not affecting expiry) and re-publication (by the original publisher, resetting the expiry). This is not done yet in other libp2p Kademlia implementations, see e.g. [libp2p-go-323]. The new protobuf fields have been given somewhat unique identifiers to prevent future collision. Similarly, periodic re-publication of provider records does not seem to be done yet in other implementations, see e.g. [libp2p-js-98]. [libp2p-146]: https://github.com/libp2p/rust-libp2p/issues/146 [libp2p-1089]: https://github.com/libp2p/rust-libp2p/issues/1089 [libp2p-go-323]: https://github.com/libp2p/go-libp2p-kad-dht/issues/323 [libp2p-js-98]: https://github.com/libp2p/js-libp2p-kad-dht/issues/98 * Tweak kad-ipfs example. * Add missing files. * Ensure new delays are polled immediately. To ensure task notification, since `NotReady` is returned right after. * Fix ipfs-kad example and use wasm_timer. * Small cleanup. * Incorporate some feedback. * Adjustments after rebase. * Distinguish events further. In order for a user to easily distinguish the result of e.g. a `put_record` operation from the result of a later republication, different event constructors are used. Furthermore, for now, re-replication and "caching" of records (at the closest peer to the key that did not return a value during a successful lookup) do not yield events for now as they are less interesting. * Speed up tests for CI. * Small refinements and more documentation. * Guard a node against overriding records for which it considers itself to be the publisher. * Document the jobs module more extensively. * More inline docs around removal of "unreachable" addresses. * Remove wildcard re-exports. * Use NonZeroUsize for the constants. * Re-add method lost on merge. * Add missing 'pub'. * Further increase the timeout in the ipfs-kad example. * Readd log dependency to libp2p-kad. * Simplify RecordStore API slightly. * Some more commentary. * Change Addresses::remove to return Result<(),()>. Change the semantics of `Addresses::remove` so that the error case is unambiguous, instead of the success case. Use the `Result` for clearer semantics to that effect. * Add some documentation to .
This commit is contained in:
@ -22,7 +22,9 @@
|
||||
|
||||
use super::*;
|
||||
|
||||
use crate::K_VALUE;
|
||||
use crate::kbucket::Distance;
|
||||
use crate::record::store::MemoryStore;
|
||||
use futures::future;
|
||||
use libp2p_core::{
|
||||
PeerId,
|
||||
@ -37,18 +39,24 @@ use libp2p_core::{
|
||||
use libp2p_secio::SecioConfig;
|
||||
use libp2p_swarm::Swarm;
|
||||
use libp2p_yamux as yamux;
|
||||
use quickcheck::*;
|
||||
use rand::{Rng, random, thread_rng};
|
||||
use std::{collections::HashSet, iter::FromIterator, io, num::NonZeroU8, u64};
|
||||
use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64};
|
||||
use tokio::runtime::current_thread;
|
||||
use multihash::Hash::SHA2256;
|
||||
|
||||
type TestSwarm = Swarm<
|
||||
Boxed<(PeerId, StreamMuxerBox), io::Error>,
|
||||
Kademlia<Substream<StreamMuxerBox>>
|
||||
Kademlia<Substream<StreamMuxerBox>, MemoryStore>
|
||||
>;
|
||||
|
||||
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
|
||||
fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
|
||||
build_nodes_with_config(num, Default::default())
|
||||
}
|
||||
|
||||
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
|
||||
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwarm>) {
|
||||
let port_base = 1 + random::<u64>() % (u64::MAX - num as u64);
|
||||
let mut result: Vec<Swarm<_, _>> = Vec::with_capacity(num);
|
||||
|
||||
@ -68,22 +76,27 @@ fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
|
||||
.map_err(|e| panic!("Failed to create transport: {:?}", e))
|
||||
.boxed();
|
||||
|
||||
let cfg = KademliaConfig::new(local_public_key.clone().into_peer_id());
|
||||
let kad = Kademlia::new(cfg);
|
||||
result.push(Swarm::new(transport, kad, local_public_key.into_peer_id()));
|
||||
let local_id = local_public_key.clone().into_peer_id();
|
||||
let store = MemoryStore::new(local_id.clone());
|
||||
let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone());
|
||||
result.push(Swarm::new(transport, behaviour, local_id));
|
||||
}
|
||||
|
||||
let mut i = 0;
|
||||
for s in result.iter_mut() {
|
||||
Swarm::listen_on(s, Protocol::Memory(port_base + i).into()).unwrap();
|
||||
i += 1
|
||||
for (i, s) in result.iter_mut().enumerate() {
|
||||
Swarm::listen_on(s, Protocol::Memory(port_base + i as u64).into()).unwrap();
|
||||
}
|
||||
|
||||
(port_base, result)
|
||||
}
|
||||
|
||||
fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<TestSwarm>) {
|
||||
let (port_base, mut swarms) = build_nodes(total);
|
||||
build_connected_nodes_with_config(total, step, Default::default())
|
||||
}
|
||||
|
||||
fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig)
|
||||
-> (Vec<PeerId>, Vec<TestSwarm>)
|
||||
{
|
||||
let (port_base, mut swarms) = build_nodes_with_config(total, cfg);
|
||||
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
|
||||
|
||||
let mut i = 0;
|
||||
@ -101,7 +114,7 @@ fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<TestSwa
|
||||
|
||||
#[test]
|
||||
fn bootstrap() {
|
||||
fn run<G: rand::Rng>(rng: &mut G) {
|
||||
fn run(rng: &mut impl Rng) {
|
||||
let num_total = rng.gen_range(2, 20);
|
||||
let num_group = rng.gen_range(1, num_total);
|
||||
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);
|
||||
@ -150,7 +163,7 @@ fn query_iter() {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn run<G: Rng>(rng: &mut G) {
|
||||
fn run(rng: &mut impl Rng) {
|
||||
let num_total = rng.gen_range(2, 20);
|
||||
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, 1);
|
||||
|
||||
@ -242,10 +255,7 @@ fn unresponsive_not_returned_indirect() {
|
||||
// Add fake addresses to first.
|
||||
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
|
||||
for _ in 0 .. 10 {
|
||||
swarms[0].add_address(
|
||||
&PeerId::random(),
|
||||
multiaddr![Udp(10u16)]
|
||||
);
|
||||
swarms[0].add_address(&PeerId::random(), multiaddr![Udp(10u16)]);
|
||||
}
|
||||
|
||||
// Connect second to first.
|
||||
@ -315,35 +325,52 @@ fn get_record_not_found() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn put_value() {
|
||||
fn run<G: rand::Rng>(rng: &mut G) {
|
||||
let num_total = rng.gen_range(21, 40);
|
||||
let num_group = rng.gen_range(1, usize::min(num_total, kbucket::K_VALUE));
|
||||
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);
|
||||
fn put_record() {
|
||||
fn prop(replication_factor: usize, records: Vec<Record>) {
|
||||
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap();
|
||||
let num_total = replication_factor.get() * 2;
|
||||
let num_group = replication_factor.get();
|
||||
|
||||
let key = multihash::encode(SHA2256, &vec![1,2,3]).unwrap();
|
||||
let bucket_key = kbucket::Key::from(key.clone());
|
||||
let mut config = KademliaConfig::default();
|
||||
config.set_replication_factor(replication_factor);
|
||||
let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
|
||||
|
||||
let mut sorted_peer_ids: Vec<_> = swarm_ids
|
||||
.iter()
|
||||
.map(|id| (id.clone(), kbucket::Key::from(id.clone()).distance(&bucket_key)))
|
||||
.collect();
|
||||
let records = records.into_iter()
|
||||
.take(num_total)
|
||||
.map(|mut r| {
|
||||
// We don't want records to expire prematurely, as they would
|
||||
// be removed from storage and no longer replicated, but we still
|
||||
// want to check that an explicitly set expiration is preserved.
|
||||
r.expires = r.expires.map(|t| t + Duration::from_secs(60));
|
||||
(r.key.clone(), r)
|
||||
})
|
||||
.collect::<HashMap<_,_>>();
|
||||
|
||||
sorted_peer_ids.sort_by(|(_, d1), (_, d2)| d1.cmp(d2));
|
||||
for r in records.values() {
|
||||
swarms[0].put_record(r.clone(), Quorum::All);
|
||||
}
|
||||
|
||||
let closest = HashSet::from_iter(sorted_peer_ids.into_iter().map(|(id, _)| id));
|
||||
|
||||
let record = Record { key: key.clone(), value: vec![4,5,6] };
|
||||
swarms[0].put_record(record, Quorum::All);
|
||||
// Each test run republishes all records once.
|
||||
let mut republished = false;
|
||||
// The accumulated results for one round of publishing.
|
||||
let mut results = Vec::new();
|
||||
|
||||
current_thread::run(
|
||||
future::poll_fn(move || {
|
||||
let mut check_results = false;
|
||||
future::poll_fn(move || loop {
|
||||
// Poll all swarms until they are "NotReady".
|
||||
for swarm in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll().unwrap() {
|
||||
Async::Ready(Some(KademliaEvent::PutRecordResult(Ok(_)))) => {
|
||||
check_results = true;
|
||||
Async::Ready(Some(KademliaEvent::PutRecordResult(res))) |
|
||||
Async::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => {
|
||||
match res {
|
||||
Err(e) => panic!(e),
|
||||
Ok(ok) => {
|
||||
assert!(records.contains_key(&ok.key));
|
||||
let record = swarm.store.get(&ok.key).unwrap();
|
||||
results.push(record.into_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
Async::Ready(_) => (),
|
||||
Async::NotReady => break,
|
||||
@ -351,31 +378,64 @@ fn put_value() {
|
||||
}
|
||||
}
|
||||
|
||||
if check_results {
|
||||
let mut have: HashSet<_> = Default::default();
|
||||
// All swarms are NotReady and not enough results have been collected
|
||||
// so far, thus wait to be polled again for further progress.
|
||||
if results.len() != records.len() {
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
|
||||
for (i, swarm) in swarms.iter().skip(1).enumerate() {
|
||||
if swarm.records.get(&key).is_some() {
|
||||
have.insert(swarm_ids[i].clone());
|
||||
}
|
||||
// Consume the results, checking that each record was replicated
|
||||
// correctly to the closest peers to the key.
|
||||
while let Some(r) = results.pop() {
|
||||
let expected = records.get(&r.key).unwrap();
|
||||
|
||||
assert_eq!(r.key, expected.key);
|
||||
assert_eq!(r.value, expected.value);
|
||||
assert_eq!(r.expires, expected.expires);
|
||||
assert_eq!(r.publisher.as_ref(), Some(&swarm_ids[0]));
|
||||
|
||||
let key = kbucket::Key::new(r.key.clone());
|
||||
let mut expected = swarm_ids.clone().split_off(1);
|
||||
expected.sort_by(|id1, id2|
|
||||
kbucket::Key::new(id1).distance(&key).cmp(
|
||||
&kbucket::Key::new(id2).distance(&key)));
|
||||
|
||||
let expected = expected
|
||||
.into_iter()
|
||||
.take(replication_factor.get())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let actual = swarms.iter().enumerate().skip(1)
|
||||
.filter_map(|(i, s)|
|
||||
if s.store.get(key.preimage()).is_some() {
|
||||
Some(swarm_ids[i].clone())
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
assert_eq!(actual.len(), replication_factor.get());
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
||||
if republished {
|
||||
assert_eq!(swarms[0].store.records().count(), records.len());
|
||||
for k in records.keys() {
|
||||
swarms[0].store.remove(&k);
|
||||
}
|
||||
|
||||
let intersection: HashSet<_> = have.intersection(&closest).collect();
|
||||
|
||||
assert_eq!(have.len(), kbucket::K_VALUE);
|
||||
assert_eq!(intersection.len(), kbucket::K_VALUE);
|
||||
|
||||
assert_eq!(swarms[0].store.records().count(), 0);
|
||||
// All records have been republished, thus the test is complete.
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}))
|
||||
// Tell the replication job to republish asap.
|
||||
swarms[0].put_record_job.as_mut().unwrap().asap(true);
|
||||
republished = true;
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
let mut rng = thread_rng();
|
||||
for _ in 0 .. 10 {
|
||||
run(&mut rng);
|
||||
}
|
||||
QuickCheck::new().tests(3).quickcheck(prop as fn(_,_))
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -387,12 +447,9 @@ fn get_value() {
|
||||
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
|
||||
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
|
||||
|
||||
let record = Record {
|
||||
key: multihash::encode(SHA2256, &vec![1,2,3]).unwrap(),
|
||||
value: vec![4,5,6]
|
||||
};
|
||||
let record = Record::new(multihash::encode(SHA2256, &vec![1,2,3]).unwrap(), vec![4,5,6]);
|
||||
|
||||
swarms[1].records.put(record.clone()).unwrap();
|
||||
swarms[1].store.put(record.clone()).unwrap();
|
||||
swarms[0].get_record(&record.key, Quorum::One);
|
||||
|
||||
current_thread::run(
|
||||
@ -416,23 +473,19 @@ fn get_value() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_value_multiple() {
|
||||
// Check that if we have responses from multiple peers, a correct number of
|
||||
// results is returned.
|
||||
fn get_value_many() {
|
||||
// TODO: Randomise
|
||||
let num_nodes = 12;
|
||||
let (_swarm_ids, mut swarms) = build_connected_nodes(num_nodes, num_nodes);
|
||||
let (_, mut swarms) = build_connected_nodes(num_nodes, num_nodes);
|
||||
let num_results = 10;
|
||||
|
||||
let record = Record {
|
||||
key: multihash::encode(SHA2256, &vec![1,2,3]).unwrap(),
|
||||
value: vec![4,5,6],
|
||||
};
|
||||
let record = Record::new(multihash::encode(SHA2256, &vec![1,2,3]).unwrap(), vec![4,5,6]);
|
||||
|
||||
for i in 0 .. num_nodes {
|
||||
swarms[i].records.put(record.clone()).unwrap();
|
||||
swarms[i].store.put(record.clone()).unwrap();
|
||||
}
|
||||
|
||||
let quorum = Quorum::N(NonZeroU8::new(num_results as u8).unwrap());
|
||||
let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap());
|
||||
swarms[0].get_record(&record.key, quorum);
|
||||
|
||||
current_thread::run(
|
||||
@ -453,3 +506,123 @@ fn get_value_multiple() {
|
||||
Ok(Async::NotReady)
|
||||
}))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_provider() {
|
||||
fn prop(replication_factor: usize, keys: Vec<kbucket::Key<Multihash>>) {
|
||||
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap();
|
||||
let num_total = replication_factor.get() * 2;
|
||||
let num_group = replication_factor.get();
|
||||
|
||||
let mut config = KademliaConfig::default();
|
||||
config.set_replication_factor(replication_factor);
|
||||
|
||||
let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
|
||||
|
||||
let keys: HashSet<_> = keys.into_iter().take(num_total).collect();
|
||||
|
||||
// Each test run publishes all records twice.
|
||||
let mut published = false;
|
||||
let mut republished = false;
|
||||
// The accumulated results for one round of publishing.
|
||||
let mut results = Vec::new();
|
||||
|
||||
// Initiate the first round of publishing.
|
||||
for k in &keys {
|
||||
swarms[0].start_providing(k.preimage().clone());
|
||||
}
|
||||
|
||||
current_thread::run(
|
||||
future::poll_fn(move || loop {
|
||||
// Poll all swarms until they are "NotReady".
|
||||
for swarm in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll().unwrap() {
|
||||
Async::Ready(Some(KademliaEvent::StartProvidingResult(res))) |
|
||||
Async::Ready(Some(KademliaEvent::RepublishProviderResult(res))) => {
|
||||
match res {
|
||||
Err(e) => panic!(e),
|
||||
Ok(ok) => {
|
||||
let key = kbucket::Key::new(ok.key.clone());
|
||||
assert!(keys.contains(&key));
|
||||
results.push(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
Async::Ready(_) => (),
|
||||
Async::NotReady => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if results.len() == keys.len() {
|
||||
// All requests have been sent for one round of publishing.
|
||||
published = true
|
||||
}
|
||||
|
||||
if !published {
|
||||
// Still waiting for all requests to be sent for one round
|
||||
// of publishing.
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
|
||||
// A round of publishing is complete. Consume the results, checking that
|
||||
// each key was published to the `replication_factor` closest peers.
|
||||
while let Some(key) = results.pop() {
|
||||
// Collect the nodes that have a provider record for `key`.
|
||||
let actual = swarms.iter().enumerate().skip(1)
|
||||
.filter_map(|(i, s)|
|
||||
if s.store.providers(key.preimage()).len() == 1 {
|
||||
Some(swarm_ids[i].clone())
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
if actual.len() != replication_factor.get() {
|
||||
// Still waiting for some nodes to process the request.
|
||||
results.push(key);
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
|
||||
let mut expected = swarm_ids.clone().split_off(1);
|
||||
expected.sort_by(|id1, id2|
|
||||
kbucket::Key::new(id1).distance(&key).cmp(
|
||||
&kbucket::Key::new(id2).distance(&key)));
|
||||
|
||||
let expected = expected
|
||||
.into_iter()
|
||||
.take(replication_factor.get())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
||||
// One round of publishing is complete.
|
||||
assert!(results.is_empty());
|
||||
for s in &swarms {
|
||||
assert_eq!(s.queries.size(), 0);
|
||||
}
|
||||
|
||||
if republished {
|
||||
assert_eq!(swarms[0].store.provided().count(), keys.len());
|
||||
for k in &keys {
|
||||
swarms[0].stop_providing(k.preimage());
|
||||
}
|
||||
assert_eq!(swarms[0].store.provided().count(), 0);
|
||||
// All records have been republished, thus the test is complete.
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
|
||||
// Initiate the second round of publishing by telling the
|
||||
// periodic provider job to run asap.
|
||||
swarms[0].add_provider_job.as_mut().unwrap().asap();
|
||||
published = false;
|
||||
republished = true;
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
QuickCheck::new().tests(3).quickcheck(prop as fn(_,_))
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user