Kademlia: Address some TODOs - Refactoring - API updates. (#1174)

* Address some TODOs, refactor queries and public API.

The following left-over issues are addressed:

  * The key for FIND_NODE requests is generalised to any Multihash,
    instead of just peer IDs.
  * All queries get a (configurable) timeout.
  * Finishing queries as soon as enough results have been received is simplified
    to avoid code duplication.
  * No more panics in provider-API-related code paths. The provider API is
    however still untested and (I think) still incomplete (e.g. expiration
    of provider records).
  * Numerous smaller TODOs encountered in the code.

The following public API changes / additions are made:

  * Introduce a `KademliaConfig` with new configuration options for
    the replication factor and query timeouts.
  * Rename `find_node` to `get_closest_peers`.
  * Rename `get_value` to `get_record` and `put_value` to `put_record`,
    introducing a `Quorum` parameter for both functions, replacing the
    existing `num_results` parameter with clearer semantics.
  * Rename `add_providing` to `start_providing` and `remove_providing`
    to `stop_providing`.
  * Add a `bootstrap` function that implements a (almost) standard
    Kademlia bootstrapping procedure.
  * Rename `KademliaOut` to `KademliaEvent` with an updated list of
    constructors (some renaming). All events that report query results
    now report a `Result` to uniformly permit reporting of errors.

The following refactorings are made:

  * Introduce some constants.
  * Consolidate `query.rs` and `write.rs` behind a common query interface
    to reduce duplication and facilitate better code reuse, introducing
    the notion of a query peer iterator. `query/peers/closest.rs`
    contains the code that was formerly in `query.rs`. `query/peers/fixed.rs` contains
    a modified variant of `write.rs` (which is removed). The new `query.rs`
    provides an interface for working with a collection of queries, taking
    over some code from `behaviour.rs`.
  * Reduce code duplication in tests and use the current_thread runtime for
    polling swarms to avoid spurious errors in the test output due to aborted
    connections when a test finishes prematurely (e.g. because a quorum of
    results has been collected).
  * Some additions / improvements to the existing tests.

* Fix test.

* Fix rebase.

* Tweak kad-ipfs example.

* Incorporate some feedback.

* Provide easy access and conversion to keys in error results.
This commit is contained in:
Roman Borschel
2019-07-03 16:16:25 +02:00
committed by GitHub
parent 8af4a28152
commit ef9cb056b2
18 changed files with 2451 additions and 1662 deletions

View File

@ -25,18 +25,24 @@
use futures::prelude::*;
use libp2p::{
Swarm,
PeerId,
identity
identity,
build_development_transport
};
use libp2p::kad::Kademlia;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent};
use std::env;
use std::time::Duration;
fn main() {
env_logger::init();
// Create a random key for ourselves.
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol
let transport = libp2p::build_development_transport(local_key);
let transport = build_development_transport(local_key);
// Create a swarm to manage peers and events.
let mut swarm = {
@ -45,42 +51,61 @@ fn main() {
// to insert our local node in the DHT. However here we use `without_init` because this
// example is very ephemeral and we don't want to pollute the DHT. In a real world
// application, you want to use `new` instead.
let mut behaviour: Kademlia<_> = libp2p::kad::Kademlia::new(local_peer_id.clone());
let mut cfg = KademliaConfig::new(local_peer_id.clone());
cfg.set_query_timeout(Duration::from_secs(5 * 60));
let mut behaviour: Kademlia<_> = Kademlia::new(cfg);
// TODO: the /dnsaddr/ scheme is not supported (https://github.com/libp2p/rust-libp2p/issues/967)
/*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
libp2p::core::Swarm::new(transport, behaviour, local_peer_id)
/*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/
// The only address that currently works.
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap());
// The following addresses always fail signature verification, possibly due to
// RSA keys with < 2048 bits.
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
// The following addresses are permanently unreachable:
// Other(Other(A(Transport(A(Underlying(Os { code: 101, kind: Other, message: "Network is unreachable" }))))))
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
Swarm::new(transport, behaviour, local_peer_id)
};
// Order Kademlia to search for a peer.
let to_search: PeerId = if let Some(peer_id) = std::env::args().nth(1) {
let to_search: PeerId = if let Some(peer_id) = env::args().nth(1) {
peer_id.parse().expect("Failed to parse peer ID to find")
} else {
identity::Keypair::generate_ed25519().public().into()
};
println!("Searching for {:?}", to_search);
swarm.find_node(to_search);
println!("Searching for the closest peers to {:?}", to_search);
swarm.get_closest_peers(to_search);
// Kick it off!
tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
loop {
match swarm.poll().expect("Error while polling swarm") {
Async::Ready(Some(ev @ libp2p::kad::KademliaOut::FindNodeResult { .. })) => {
println!("Result: {:#?}", ev);
return Ok(Async::Ready(()));
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(res))) => {
match res {
Ok(ok) => {
println!("Closest peers: {:#?}", ok.peers);
return Ok(Async::Ready(()));
}
Err(err) => {
println!("The search for closest peers failed: {:?}", err);
}
}
},
Async::Ready(Some(_)) => (),
Async::Ready(Some(_)) => {},
Async::Ready(None) | Async::NotReady => break,
}
}

View File

@ -98,8 +98,8 @@ fn three_fields() {
}
}
impl<TSubstream> libp2p::core::swarm::NetworkBehaviourEventProcess<libp2p::kad::KademliaOut> for Foo<TSubstream> {
fn inject_event(&mut self, _: libp2p::kad::KademliaOut) {
impl<TSubstream> libp2p::core::swarm::NetworkBehaviourEventProcess<libp2p::kad::KademliaEvent> for Foo<TSubstream> {
fn inject_event(&mut self, _: libp2p::kad::KademliaEvent) {
}
}

View File

@ -15,6 +15,7 @@ arrayvec = "0.4.7"
bs58 = "0.2.0"
bigint = "4.2"
bytes = "0.4"
either = "1.5"
fnv = "1.0"
futures = "0.1"
libp2p-core = { version = "0.10.0", path = "../../core" }

File diff suppressed because it is too large Load Diff

View File

@ -20,16 +20,11 @@
#![cfg(test)]
use crate::{
GetValueResult,
Kademlia,
KademliaOut,
kbucket::{self, Distance},
record::{Record, RecordStore},
};
use futures::{future, prelude::*};
use super::*;
use crate::kbucket::Distance;
use futures::future;
use libp2p_core::{
PeerId,
Swarm,
Transport,
identity,
@ -41,10 +36,10 @@ use libp2p_core::{
};
use libp2p_secio::SecioConfig;
use libp2p_yamux as yamux;
use rand::random;
use rand::{Rng, random, thread_rng};
use std::{collections::HashSet, iter::FromIterator, io, num::NonZeroU8, u64};
use tokio::runtime::Runtime;
use multihash::Hash;
use tokio::runtime::current_thread;
use multihash::Hash::SHA2256;
type TestSwarm = Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
@ -72,7 +67,8 @@ fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
.map_err(|e| panic!("Failed to create transport: {:?}", e))
.boxed();
let kad = Kademlia::new(local_public_key.clone().into_peer_id());
let cfg = KademliaConfig::new(local_public_key.clone().into_peer_id());
let kad = Kademlia::new(cfg);
result.push(Swarm::new(transport, kad, local_public_key.into_peer_id()));
}
@ -85,54 +81,48 @@ fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
(port_base, result)
}
#[test]
fn query_iter() {
fn distances(key: &kbucket::Key<PeerId>, peers: Vec<PeerId>) -> Vec<Distance> {
peers.into_iter()
.map(kbucket::Key::from)
.map(|k| k.distance(key))
.collect()
fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<TestSwarm>) {
let (port_base, mut swarms) = build_nodes(total);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
let mut i = 0;
for (j, peer) in swarm_ids.iter().enumerate().skip(1) {
if i < swarm_ids.len() {
swarms[i].add_address(&peer, Protocol::Memory(port_base + j as u64).into());
}
if j % step == 0 {
i += step;
}
}
fn run(n: usize) {
// Build `n` nodes. Node `n` knows about node `n-1`, node `n-1` knows about node `n-2`, etc.
// Node `n` is queried for a random peer and should return nodes `1..n-1` sorted by
// their distances to that peer.
(swarm_ids, swarms)
}
let (port_base, mut swarms) = build_nodes(n);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
#[test]
fn bootstrap() {
fn run<G: rand::Rng>(rng: &mut G) {
let num_total = rng.gen_range(2, 20);
let num_group = rng.gen_range(1, num_total);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);
// Connect each swarm in the list to its predecessor in the list.
for (i, (swarm, peer)) in &mut swarms.iter_mut().skip(1).zip(swarm_ids.clone()).enumerate() {
swarm.add_address(&peer, Protocol::Memory(port_base + i as u64).into())
}
swarms[0].bootstrap();
// Ask the last peer in the list to search a random peer. The search should
// propagate backwards through the list of peers.
let search_target = PeerId::random();
let search_target_key = kbucket::Key::from(search_target.clone());
swarms.last_mut().unwrap().find_node(search_target.clone());
// Set up expectations.
let expected_swarm_id = swarm_ids.last().unwrap().clone();
let expected_peer_ids: Vec<_> = swarm_ids.iter().cloned().take(n - 1).collect();
let mut expected_distances = distances(&search_target_key, expected_peer_ids.clone());
expected_distances.sort();
// Expected known peers
let expected_known = swarm_ids.iter().skip(1).cloned().collect::<HashSet<_>>();
// Run test
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
current_thread::run(
future::poll_fn(move || {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::FindNodeResult {
key, closer_peers
})) => {
assert_eq!(key, search_target);
assert_eq!(swarm_ids[i], expected_swarm_id);
assert!(expected_peer_ids.iter().all(|p| closer_peers.contains(p)));
let key = kbucket::Key::from(key);
assert_eq!(expected_distances, distances(&key, closer_peers));
Async::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => {
assert_eq!(i, 0);
assert_eq!(ok.peer, swarm_ids[0]);
let known = swarm.kbuckets.iter()
.map(|e| e.node.key.preimage().clone())
.collect::<HashSet<_>>();
assert_eq!(expected_known, known);
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
@ -142,10 +132,66 @@ fn query_iter() {
}
Ok(Async::NotReady)
}))
.unwrap()
}
for n in 2..=8 { run(n) }
let mut rng = thread_rng();
for _ in 0 .. 10 {
run(&mut rng)
}
}
#[test]
fn query_iter() {
fn distances<K>(key: &kbucket::Key<K>, peers: Vec<PeerId>) -> Vec<Distance> {
peers.into_iter()
.map(kbucket::Key::from)
.map(|k| k.distance(key))
.collect()
}
fn run<G: Rng>(rng: &mut G) {
let num_total = rng.gen_range(2, 20);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, 1);
// Ask the first peer in the list to search a random peer. The search should
// propagate forwards through the list of peers.
let search_target = PeerId::random();
let search_target_key = kbucket::Key::from(search_target.clone());
swarms[0].get_closest_peers(search_target.clone());
// Set up expectations.
let expected_swarm_id = swarm_ids[0].clone();
let expected_peer_ids: Vec<_> = swarm_ids.iter().skip(1).cloned().collect();
let mut expected_distances = distances(&search_target_key, expected_peer_ids.clone());
expected_distances.sort();
// Run test
current_thread::run(
future::poll_fn(move || {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(ok.key, search_target);
assert_eq!(swarm_ids[i], expected_swarm_id);
assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p)));
let key = kbucket::Key::new(ok.key);
assert_eq!(expected_distances, distances(&key, ok.peers));
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
Ok(Async::NotReady)
}))
}
let mut rng = thread_rng();
for _ in 0 .. 10 {
run(&mut rng)
}
}
#[test]
@ -162,16 +208,16 @@ fn unresponsive_not_returned_direct() {
// Ask first to search a random value.
let search_target = PeerId::random();
swarms[0].find_node(search_target.clone());
swarms[0].get_closest_peers(search_target.clone());
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
current_thread::run(
future::poll_fn(move || {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
assert_eq!(key, search_target);
assert_eq!(closer_peers.len(), 0);
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(ok.key, search_target);
assert_eq!(ok.peers.len(), 0);
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
@ -182,7 +228,6 @@ fn unresponsive_not_returned_direct() {
Ok(Async::NotReady)
}))
.unwrap();
}
#[test]
@ -207,17 +252,17 @@ fn unresponsive_not_returned_indirect() {
// Ask second to search a random value.
let search_target = PeerId::random();
swarms[1].find_node(search_target.clone());
swarms[1].get_closest_peers(search_target.clone());
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
current_thread::run(
future::poll_fn(move || {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
assert_eq!(key, search_target);
assert_eq!(closer_peers.len(), 1);
assert_eq!(closer_peers[0], first_peer_id);
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(ok.key, search_target);
assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id);
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
@ -228,38 +273,34 @@ fn unresponsive_not_returned_indirect() {
Ok(Async::NotReady)
}))
.unwrap();
}
#[test]
fn get_value_not_found() {
fn get_record_not_found() {
let (port_base, mut swarms) = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let num_results = NonZeroU8::new(1).unwrap();
swarms[0].get_value(&target_key, num_results);
let target_key = multihash::encode(SHA2256, &vec![1,2,3]).unwrap();
swarms[0].get_record(&target_key, Quorum::One);
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
current_thread::run(
future::poll_fn(move || {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::GetValueResult(result))) => {
if let GetValueResult::NotFound { key, closest_peers } = result {
Async::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => {
if let GetRecordError::NotFound { key, closest_peers, } = e {
assert_eq!(key, target_key);
assert_eq!(closest_peers.len(), 2);
assert!(closest_peers.contains(&swarm_ids[1]));
assert!(closest_peers.contains(&swarm_ids[2]));
return Ok(Async::Ready(()));
} else {
panic!("Expected GetValueResult::NotFound event");
panic!("Unexpected error result: {:?}", e);
}
}
Async::Ready(_) => (),
@ -270,62 +311,37 @@ fn get_value_not_found() {
Ok(Async::NotReady)
}))
.unwrap()
}
#[test]
fn put_value() {
fn run() {
// Build a test that checks if PUT_VALUE gets correctly propagated in
// a nontrivial topology:
// [31]
// / \
// [29] [30]
// /|\ /|\
// [0]..[14] [15]..[28]
//
// Nodes [29] and [30] have less than kbuckets::MAX_NODES_PER_BUCKET
// peers to avoid the situation when the bucket may be overflowed and
// some of the connections are dropped from the routing table
let (port_base, mut swarms) = build_nodes(32);
fn run<G: rand::Rng>(rng: &mut G) {
let num_total = rng.gen_range(21, 40);
let num_group = rng.gen_range(1, usize::min(num_total, kbucket::K_VALUE));
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
// Connect swarm[30] to each swarm in swarms[..15]
for (i, peer) in swarm_ids.iter().take(15).enumerate() {
swarms[30].add_address(&peer, Protocol::Memory(port_base + i as u64).into());
}
// Connect swarm[29] to each swarm in swarms[15..29]
for (i, peer) in swarm_ids.iter().skip(15).take(14).enumerate() {
swarms[29].add_address(&peer, Protocol::Memory(port_base + (i + 15) as u64).into());
}
// Connect swarms[31] to swarms[29, 30]
swarms[31].add_address(&swarm_ids[30], Protocol::Memory(port_base + 30 as u64).into());
swarms[31].add_address(&swarm_ids[29], Protocol::Memory(port_base + 29 as u64).into());
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let key = multihash::encode(SHA2256, &vec![1,2,3]).unwrap();
let bucket_key = kbucket::Key::from(key.clone());
let mut sorted_peer_ids: Vec<_> = swarm_ids
.iter()
.map(|id| (id.clone(), kbucket::Key::from(id.clone()).distance(&kbucket::Key::from(target_key.clone()))))
.map(|id| (id.clone(), kbucket::Key::from(id.clone()).distance(&bucket_key)))
.collect();
sorted_peer_ids.sort_by(|(_, d1), (_, d2)| d1.cmp(d2));
let closest: HashSet<PeerId> = HashSet::from_iter(sorted_peer_ids.into_iter().map(|(id, _)| id));
let closest = HashSet::from_iter(sorted_peer_ids.into_iter().map(|(id, _)| id));
swarms[31].put_value(target_key.clone(), vec![4,5,6]);
let record = Record { key: key.clone(), value: vec![4,5,6] };
swarms[0].put_record(record, Quorum::All);
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
current_thread::run(
future::poll_fn(move || {
let mut check_results = false;
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::PutValueResult{ .. })) => {
Async::Ready(Some(KademliaEvent::PutRecordResult(Ok(_)))) => {
check_results = true;
}
Async::Ready(_) => (),
@ -337,25 +353,27 @@ fn put_value() {
if check_results {
let mut have: HashSet<_> = Default::default();
for (i, swarm) in swarms.iter().take(31).enumerate() {
if swarm.records.get(&target_key).is_some() {
for (i, swarm) in swarms.iter().skip(1).enumerate() {
if swarm.records.get(&key).is_some() {
have.insert(swarm_ids[i].clone());
}
}
let intersection: HashSet<_> = have.intersection(&closest).collect();
assert_eq!(have.len(), kbucket::MAX_NODES_PER_BUCKET);
assert_eq!(intersection.len(), kbucket::MAX_NODES_PER_BUCKET);
assert_eq!(have.len(), kbucket::K_VALUE);
assert_eq!(intersection.len(), kbucket::K_VALUE);
return Ok(Async::Ready(()));
}
Ok(Async::NotReady)
}))
.unwrap()
}
let mut rng = thread_rng();
for _ in 0 .. 10 {
run();
run(&mut rng);
}
}
@ -363,36 +381,28 @@ fn put_value() {
fn get_value() {
let (port_base, mut swarms) = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let target_value = vec![4,5,6];
let num_results = NonZeroU8::new(1).unwrap();
swarms[1].records.put(Record {
key: target_key.clone(),
value: target_value.clone()
}).unwrap();
swarms[0].get_value(&target_key, num_results);
let record = Record {
key: multihash::encode(SHA2256, &vec![1,2,3]).unwrap(),
value: vec![4,5,6]
};
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
swarms[1].records.put(record.clone()).unwrap();
swarms[0].get_record(&record.key, Quorum::One);
current_thread::run(
future::poll_fn(move || {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::GetValueResult(result))) => {
if let GetValueResult::Found { results } = result {
assert_eq!(results.len(), 1);
let record = results.first().unwrap();
assert_eq!(record.key, target_key);
assert_eq!(record.value, target_value);
return Ok(Async::Ready(()));
} else {
panic!("Expected GetValueResult::Found event");
}
Async::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), 1);
assert_eq!(ok.records.first(), Some(&record));
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
Async::NotReady => break,
@ -402,56 +412,43 @@ fn get_value() {
Ok(Async::NotReady)
}))
.unwrap()
}
#[test]
fn get_value_multiple() {
// Check that if we have responses from multiple peers, a correct number of
// results is returned.
let num_results = NonZeroU8::new(10).unwrap();
let (port_base, mut swarms) = build_nodes(2 + num_results.get() as usize);
let num_nodes = 12;
let (_swarm_ids, mut swarms) = build_connected_nodes(num_nodes, num_nodes);
let num_results = 10;
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
let record = Record {
key: multihash::encode(SHA2256, &vec![1,2,3]).unwrap(),
value: vec![4,5,6],
};
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let target_value = vec![4,5,6];
for (i, swarm_id) in swarm_ids.iter().skip(1).enumerate() {
swarms[i + 1].records.put(Record {
key: target_key.clone(),
value: target_value.clone()
}).unwrap();
swarms[0].add_address(&swarm_id, Protocol::Memory(port_base + (i + 1) as u64).into());
for i in 0 .. num_nodes {
swarms[i].records.put(record.clone()).unwrap();
}
swarms[0].records.put(Record { key: target_key.clone(), value: target_value.clone() }).unwrap();
swarms[0].get_value(&target_key, num_results);
let quorum = Quorum::N(NonZeroU8::new(num_results as u8).unwrap());
swarms[0].get_record(&record.key, quorum);
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
current_thread::run(
future::poll_fn(move || {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::GetValueResult(result))) => {
if let GetValueResult::Found { results } = result {
assert_eq!(results.len(), num_results.get() as usize);
let record = results.first().unwrap();
assert_eq!(record.key, target_key);
assert_eq!(record.value, target_value);
return Ok(Async::Ready(()));
} else {
panic!("Expected GetValueResult::Found event");
}
Async::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), num_results);
assert_eq!(ok.records.first(), Some(&record));
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
Ok(Async::NotReady)
}))
.unwrap()
}

View File

@ -31,7 +31,7 @@ use libp2p_core::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId, upgrade::Negotiated};
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, upgrade::Negotiated};
use multihash::Multihash;
use std::{borrow::Cow, error, fmt, io, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
@ -135,8 +135,8 @@ pub enum KademliaHandlerEvent<TUserData> {
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
/// returned is not specified, but should be around 20.
FindNodeReq {
/// Identifier of the node.
key: PeerId,
/// The key for which to locate the closest nodes.
key: Multihash,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
},
@ -473,15 +473,8 @@ where
fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
match message {
KademliaHandlerIn::FindNodeReq { key, user_data } => {
// FIXME: Change `KadRequestMsg::FindNode::key` to be a `Multihash`.
match PeerId::from_multihash(key.clone()) {
Ok(key) => {
let msg = KadRequestMsg::FindNode { key };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
},
Err(_) => (),
}
let msg = KadRequestMsg::FindNode { key };
self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
}
KademliaHandlerIn::FindNodeRes {
closer_peers,

View File

@ -95,7 +95,7 @@ pub struct KBucketsTable<TPeerId, TVal> {
/// A (type-safe) index into a `KBucketsTable`, i.e. a non-negative integer in the
/// interval `[0, NUM_BUCKETS)`.
#[derive(Copy, Clone)]
#[derive(Copy, Clone, PartialEq, Eq)]
struct BucketIndex(usize);
impl BucketIndex {
@ -116,6 +116,20 @@ impl BucketIndex {
fn get(&self) -> usize {
self.0
}
/// Generates a random distance that falls into the bucket for this index.
fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
let mut bytes = [0u8; 32];
let quot = self.0 / 8;
for i in 0 .. quot {
bytes[31 - i] = rng.gen();
}
let rem = self.0 % 8;
let lower = usize::pow(2, rem as u32);
let upper = usize::pow(2, rem as u32 + 1);
bytes[31 - quot] = rng.gen_range(lower, upper) as u8;
Distance(bigint::U256::from(bytes))
}
}
impl<TPeerId, TVal> KBucketsTable<TPeerId, TVal>
@ -182,11 +196,14 @@ where
/// bucket is the closest bucket (containing at most one key).
pub fn buckets<'a>(&'a mut self) -> impl Iterator<Item = KBucketRef<'a, TPeerId, TVal>> + 'a {
let applied_pending = &mut self.applied_pending;
self.buckets.iter_mut().map(move |b| {
self.buckets.iter_mut().enumerate().map(move |(i, b)| {
if let Some(applied) = b.apply_pending() {
applied_pending.push_back(applied)
}
KBucketRef(b)
KBucketRef {
index: BucketIndex(i),
bucket: b
}
})
}
@ -263,7 +280,7 @@ struct ClosestIter<'a, TTarget, TPeerId, TVal, TMap, TOut> {
/// distance of the local key to the target.
buckets_iter: ClosestBucketsIter,
/// The iterator over the entries in the currently traversed bucket.
iter: Option<arrayvec::IntoIter<[TOut; MAX_NODES_PER_BUCKET]>>,
iter: Option<arrayvec::IntoIter<[TOut; K_VALUE]>>,
/// The projection function / mapping applied on each bucket as
/// it is encountered, producing the next `iter`ator.
fmap: TMap
@ -304,7 +321,7 @@ impl ClosestBucketsIter {
fn new(distance: Distance) -> Self {
let state = match BucketIndex::new(&distance) {
Some(i) => ClosestBucketsIterState::Start(i),
None => ClosestBucketsIterState::Done
None => ClosestBucketsIterState::Start(BucketIndex(0))
};
Self { distance, state }
}
@ -363,7 +380,7 @@ impl<TTarget, TPeerId, TVal, TMap, TOut> Iterator
for ClosestIter<'_, TTarget, TPeerId, TVal, TMap, TOut>
where
TPeerId: Clone,
TMap: Fn(&KBucket<TPeerId, TVal>) -> ArrayVec<[TOut; MAX_NODES_PER_BUCKET]>,
TMap: Fn(&KBucket<TPeerId, TVal>) -> ArrayVec<[TOut; K_VALUE]>,
TOut: AsRef<Key<TPeerId>>
{
type Item = TOut;
@ -396,7 +413,10 @@ where
}
/// A reference to a bucket in a `KBucketsTable`.
pub struct KBucketRef<'a, TPeerId, TVal>(&'a mut KBucket<TPeerId, TVal>);
pub struct KBucketRef<'a, TPeerId, TVal> {
index: BucketIndex,
bucket: &'a mut KBucket<TPeerId, TVal>
}
impl<TPeerId, TVal> KBucketRef<'_, TPeerId, TVal>
where
@ -404,19 +424,49 @@ where
{
/// Returns the number of entries in the bucket.
pub fn num_entries(&self) -> usize {
self.0.num_entries()
self.bucket.num_entries()
}
/// Returns true if the bucket has a pending node.
pub fn has_pending(&self) -> bool {
self.0.pending().map_or(false, |n| !n.is_ready())
self.bucket.pending().map_or(false, |n| !n.is_ready())
}
pub fn contains(&self, d: &Distance) -> bool {
BucketIndex::new(d).map_or(false, |i| i == self.index)
}
/// Generates a random distance that falls into this bucket.
///
/// Together with a known key `a` (e.g. the local key), a random distance `d` for
/// this bucket w.r.t `k` gives rise to the corresponding (random) key `b` s.t.
/// the XOR distance between `a` and `b` is `d`. In other words, it gives
/// rise to a random key falling into this bucket. See [`Key::from_distance`].
pub fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
self.index.rand_distance(rng)
}
}
#[cfg(test)]
mod tests {
use bigint::U256;
use super::*;
use libp2p_core::PeerId;
use quickcheck::*;
#[test]
fn rand_distance() {
fn prop(ix: u8) -> bool {
let d = BucketIndex(ix as usize).rand_distance(&mut rand::thread_rng());
let n = U256::from(<[u8; 32]>::from(d.0));
let b = U256::from(2);
let e = U256::from(ix);
let lower = b.pow(e);
let upper = b.pow(e + U256::from(1)) - U256::from(1);
lower <= n && n <= upper
}
quickcheck(prop as fn(_) -> _);
}
#[test]
fn basic_closest() {

View File

@ -25,11 +25,9 @@
//! > buckets in a `KBucketsTable` and hence is enforced by the public API
//! > of the `KBucketsTable` and in particular the public `Entry` API.
pub use crate::K_VALUE;
use super::*;
/// Maximum number of nodes in a bucket, i.e. the (fixed) `k` parameter.
pub const MAX_NODES_PER_BUCKET: usize = 20;
/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
#[derive(Debug, Clone)]
pub struct PendingNode<TPeerId, TVal> {
@ -90,16 +88,16 @@ pub struct Node<TPeerId, TVal> {
}
/// The position of a node in a `KBucket`, i.e. a non-negative integer
/// in the range `[0, MAX_NODES_PER_BUCKET)`.
/// in the range `[0, K_VALUE)`.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Position(usize);
/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values,
/// A `KBucket` is a list of up to `K_VALUE` `Key`s and associated values,
/// ordered from least-recently connected to most-recently connected.
#[derive(Debug, Clone)]
pub struct KBucket<TPeerId, TVal> {
/// The nodes contained in the bucket.
nodes: ArrayVec<[Node<TPeerId, TVal>; MAX_NODES_PER_BUCKET]>,
nodes: ArrayVec<[Node<TPeerId, TVal>; K_VALUE]>,
/// The position (index) in `nodes` that marks the first connected node.
///
@ -107,7 +105,7 @@ pub struct KBucket<TPeerId, TVal> {
/// most-recently connected, all entries above this index are also considered
/// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries
/// that are considered disconnected and the range
/// `[first_connected_pos, MAX_NODES_PER_BUCKET)` marks sub-list of entries that are
/// `[first_connected_pos, K_VALUE)` marks sub-list of entries that are
/// considered connected.
///
/// `None` indicates that there are no connected entries in the bucket, i.e.
@ -412,7 +410,7 @@ mod tests {
fn arbitrary<G: Gen>(g: &mut G) -> KBucket<PeerId, ()> {
let timeout = Duration::from_secs(g.gen_range(1, g.size() as u64));
let mut bucket = KBucket::<PeerId, ()>::new(timeout);
let num_nodes = g.gen_range(1, MAX_NODES_PER_BUCKET + 1);
let num_nodes = g.gen_range(1, K_VALUE + 1);
for _ in 0 .. num_nodes {
let key = Key::new(PeerId::random());
let node = Node { key: key.clone(), value: () };
@ -438,14 +436,14 @@ mod tests {
impl Arbitrary for Position {
fn arbitrary<G: Gen>(g: &mut G) -> Position {
Position(g.gen_range(0, MAX_NODES_PER_BUCKET))
Position(g.gen_range(0, K_VALUE))
}
}
// Fill a bucket with random nodes with the given status.
fn fill_bucket(bucket: &mut KBucket<PeerId, ()>, status: NodeStatus) {
let num_entries_start = bucket.num_entries();
for i in 0 .. MAX_NODES_PER_BUCKET - num_entries_start {
for i in 0 .. K_VALUE - num_entries_start {
let key = Key::new(PeerId::random());
let node = Node { key, value: () };
assert_eq!(InsertResult::Inserted, bucket.insert(node, status));
@ -466,7 +464,7 @@ mod tests {
for status in status {
let key = Key::new(PeerId::random());
let node = Node { key: key.clone(), value: () };
let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
let full = bucket.num_entries() == K_VALUE;
match bucket.insert(node, status) {
InsertResult::Inserted => {
let vec = match status {
@ -519,7 +517,7 @@ mod tests {
}
// One-by-one fill the bucket with connected nodes, replacing the disconnected ones.
for i in 0 .. MAX_NODES_PER_BUCKET {
for i in 0 .. K_VALUE {
let (first, first_status) = bucket.iter().next().unwrap();
let first_disconnected = first.clone();
assert_eq!(first_status, NodeStatus::Disconnected);
@ -552,11 +550,11 @@ mod tests {
}));
assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last());
assert!(bucket.pending().is_none());
assert_eq!(Some(MAX_NODES_PER_BUCKET - (i + 1)), bucket.first_connected_pos);
assert_eq!(Some(K_VALUE - (i + 1)), bucket.first_connected_pos);
}
assert!(bucket.pending().is_none());
assert_eq!(MAX_NODES_PER_BUCKET, bucket.num_entries());
assert_eq!(K_VALUE, bucket.num_entries());
// Trying to insert another connected node fails.
let key = Key::new(PeerId::random());
@ -595,7 +593,7 @@ mod tests {
assert_eq!(Some((&first_disconnected, NodeStatus::Connected)), bucket.iter().last());
assert_eq!(bucket.position(&first_disconnected.key).map(|p| p.0), bucket.first_connected_pos);
assert_eq!(1, bucket.num_connected());
assert_eq!(MAX_NODES_PER_BUCKET - 1, bucket.num_disconnected());
assert_eq!(K_VALUE - 1, bucket.num_disconnected());
}

View File

@ -21,7 +21,7 @@
//! The `Entry` API for quering and modifying the entries of a `KBucketsTable`
//! representing the nodes participating in the Kademlia DHT.
pub use super::bucket::{Node, NodeStatus, InsertResult, AppliedPending, MAX_NODES_PER_BUCKET};
pub use super::bucket::{Node, NodeStatus, InsertResult, AppliedPending, K_VALUE};
pub use super::key::*;
use super::*;

View File

@ -23,8 +23,8 @@ use libp2p_core::PeerId;
use multihash::Multihash;
use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}};
/// A `Key` is a cryptographic hash, identifying both the nodes participating in
/// the Kademlia DHT, as well as records stored in the DHT.
/// A `Key` identifies both the nodes participating in the Kademlia DHT, as well as
/// records stored in the DHT.
///
/// The set of all `Key`s defines the Kademlia keyspace.
///
@ -35,12 +35,40 @@ use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}};
#[derive(Clone, Debug)]
pub struct Key<T> {
preimage: T,
hash: GenericArray<u8, U32>,
bytes: KeyBytes,
}
impl<T> PartialEq for Key<T> {
fn eq(&self, other: &Key<T>) -> bool {
self.hash == other.hash
/// The raw bytes of a key in the DHT keyspace.
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct KeyBytes(GenericArray<u8, U32>);
impl KeyBytes {
/// Computes the distance of the keys according to the XOR metric.
pub fn distance<U>(&self, other: &U) -> Distance
where
U: AsRef<KeyBytes>
{
let a = U256::from(self.0.as_ref());
let b = U256::from(other.as_ref().0.as_ref());
Distance(a ^ b)
}
}
impl AsRef<KeyBytes> for KeyBytes {
fn as_ref(&self) -> &KeyBytes {
self
}
}
impl<T> AsRef<KeyBytes> for Key<T> {
fn as_ref(&self) -> &KeyBytes {
&self.bytes
}
}
impl<T, U> PartialEq<Key<U>> for Key<T> {
fn eq(&self, other: &Key<U>) -> bool {
self.bytes == other.bytes
}
}
@ -61,8 +89,18 @@ impl<T> Key<T> {
where
T: AsRef<[u8]>
{
let hash = Sha256::digest(preimage.as_ref());
Key { preimage, hash }
let bytes = KeyBytes(Sha256::digest(preimage.as_ref()));
Key { preimage, bytes }
}
/// Returns the uniquely determined key with the given distance to `self`.
///
/// This implements the following equivalence:
///
/// `self xor other = distance <==> other = self xor distance`
pub fn for_distance(&self, d: Distance) -> KeyBytes {
let key_int = U256::from(self.bytes.0.as_ref()) ^ d.0;
KeyBytes(GenericArray::from(<[u8; 32]>::from(key_int)))
}
/// Borrows the preimage of the key.
@ -76,23 +114,29 @@ impl<T> Key<T> {
}
/// Computes the distance of the keys according to the XOR metric.
pub fn distance<U>(&self, other: &Key<U>) -> Distance {
let a = U256::from(self.hash.as_ref());
let b = U256::from(other.hash.as_ref());
Distance(a ^ b)
pub fn distance<U>(&self, other: &U) -> Distance
where
U: AsRef<KeyBytes>
{
self.bytes.distance(other)
}
}
impl<T> Into<KeyBytes> for Key<T> {
fn into(self) -> KeyBytes {
self.bytes
}
}
impl From<Multihash> for Key<Multihash> {
fn from(h: Multihash) -> Self {
let k = Key::new(h.clone().into_bytes());
Key { preimage: h, hash: k.hash }
fn from(m: Multihash) -> Self {
Key::new(m)
}
}
impl From<PeerId> for Key<PeerId> {
fn from(peer_id: PeerId) -> Self {
Key::new(peer_id)
fn from(p: PeerId) -> Self {
Key::new(p)
}
}

View File

@ -18,16 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of the Kademlia protocol for libp2p.
//! Implementation of the libp2p-specific Kademlia protocol.
// TODO: we allow dead_code for now because this library contains a lot of unused code that will
// be useful later for record store
#![allow(dead_code)]
pub use self::behaviour::{Kademlia, KademliaOut, GetValueResult, PutValueResult};
pub use self::protocol::KadConnectionType;
pub use self::record::{RecordStore, RecordStorageError, MemoryRecordStorage};
pub mod handler;
pub mod kbucket;
pub mod protocol;
@ -37,4 +33,64 @@ mod addresses;
mod behaviour;
mod protobuf_structs;
mod query;
mod write;
pub use behaviour::{Kademlia, KademliaConfig, KademliaEvent, Quorum};
pub use behaviour::{
BootstrapResult,
BootstrapOk,
BootstrapError,
GetRecordResult,
GetRecordOk,
GetRecordError,
PutRecordResult,
PutRecordOk,
PutRecordError,
GetClosestPeersResult,
GetClosestPeersOk,
GetClosestPeersError,
AddProviderResult,
AddProviderOk,
AddProviderError,
GetProvidersResult,
GetProvidersOk,
GetProvidersError,
};
pub use protocol::KadConnectionType;
pub use record::{RecordStore, RecordStorageError, MemoryRecordStorage};
use std::time::Duration;
/// The `k` parameter of the Kademlia specification.
///
/// This parameter determines:
///
/// 1) The (fixed) maximum number of nodes in a bucket.
/// 2) The (default) replication factor, which in turn determines:
/// a) The number of closer peers returned in response to a request.
/// b) The number of closest peers to a key to search for in an iterative query.
///
/// The choice of (1) is fixed to this constant. The replication factor is configurable
/// but should generally be no greater than `K_VALUE`. All nodes in a Kademlia
/// DHT should agree on the choices made for (1) and (2).
///
/// The current value is `20`.
pub const K_VALUE: usize = 20;
/// The `α` parameter of the Kademlia specification.
///
/// This parameter determines the default parallelism for iterative queries,
/// i.e. the allowed number of in-flight requests that an iterative query is
/// waiting for at a particular time while it continues to make progress towards
/// locating the closest peers to a key.
///
/// The current value is `3`.
pub const ALPHA_VALUE: usize = 3;
const KBUCKET_PENDING_TIMEOUT: Duration = Duration::from_secs(60);
const ADD_PROVIDER_INTERVAL: Duration = Duration::from_secs(60);

View File

@ -256,8 +256,8 @@ pub enum KadRequestMsg {
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
/// returned is not specified, but should be around 20.
FindNode {
/// Identifier of the node.
key: PeerId,
/// The key for which to locate the closest nodes.
key: Multihash,
},
/// Same as `FindNode`, but should also return the entries of the local providers list for
@ -467,8 +467,8 @@ fn proto_to_req_msg(mut message: proto::Message) -> Result<KadRequestMsg, io::Er
}
proto::Message_MessageType::FIND_NODE => {
let key = PeerId::from_bytes(message.take_key())
.map_err(|_| invalid_data("Invalid peer id in FIND_NODE"))?;
let key = Multihash::from_bytes(message.take_key())
.map_err(|_| invalid_data("Invalid key in FIND_NODE"))?;
Ok(KadRequestMsg::FindNode { key })
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,68 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Peer selection strategies for queries in the form of iterator-like state machines.
//!
//! Using a peer iterator in a query involves performing the following steps
//! repeatedly and in an alternating fashion:
//!
//! 1. Calling `next` to observe the next state of the iterator and determine
//! what to do, which is to either issue new requests to peers or continue
//! waiting for responses.
//!
//! 2. When responses are received or requests fail, providing input to the
//! iterator via the `on_success` and `on_failure` callbacks,
//! respectively, followed by repeating step (1).
//!
//! When a call to `next` returns [`Finished`], no more peers can be obtained
//! from the iterator and the results can be obtained from `into_result`.
//!
//! A peer iterator can be finished prematurely at any time through `finish`.
//!
//! [`Finished`]: peers::PeersIterState::Finished
pub mod closest;
pub mod fixed;
use libp2p_core::PeerId;
use std::borrow::Cow;
/// The state of a peer iterator.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PeersIterState<'a> {
/// The iterator is waiting for results.
///
/// `Some(peer)` indicates that the iterator is now waiting for a result
/// from `peer`, in addition to any other peers for which it is already
/// waiting for results.
///
/// `None` indicates that the iterator is waiting for results and there is no
/// new peer to contact, despite the iterator not being at capacity w.r.t.
/// the permitted parallelism.
Waiting(Option<Cow<'a, PeerId>>),
/// The iterator is waiting for results and is at capacity w.r.t. the
/// permitted parallelism.
WaitingAtCapacity,
/// The iterator finished.
Finished
}

View File

@ -0,0 +1,694 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use super::*;
use crate::{K_VALUE, ALPHA_VALUE};
use crate::kbucket::{Key, KeyBytes, Distance};
use libp2p_core::PeerId;
use std::{time::Duration, iter::FromIterator};
use std::collections::btree_map::{BTreeMap, Entry};
use wasm_timer::Instant;
/// A peer iterator for a dynamically changing list of peers, sorted by increasing
/// distance to a chosen target.
#[derive(Debug, Clone)]
pub struct ClosestPeersIter {
config: ClosestPeersIterConfig,
/// The target whose distance to any peer determines the position of
/// the peer in the iterator.
target: KeyBytes,
/// The internal iterator state.
state: State,
/// The closest peers to the target, ordered by increasing distance.
closest_peers: BTreeMap<Distance, Peer>,
/// The number of peers for which the iterator is currently waiting for results.
num_waiting: usize,
}
/// Configuration for a `ClosestPeersIter`.
#[derive(Debug, Clone)]
pub struct ClosestPeersIterConfig {
/// Allowed level of parallelism.
///
/// The `α` parameter in the Kademlia paper. The maximum number of peers that
/// the iterator is allowed to wait for in parallel while iterating towards the closest
/// nodes to a target. Defaults to `ALPHA_VALUE`.
pub parallelism: usize,
/// Number of results (closest peers) to search for.
///
/// The number of closest peers for which the iterator must obtain successful results
/// in order to finish successfully. Defaults to `K_VALUE`.
pub num_results: usize,
/// The timeout for a single peer.
///
/// If a successful result is not reported for a peer within this timeout
/// window, the iterator considers the peer unresponsive and will not wait for
/// the peer when evaluating the termination conditions, until and unless a
/// result is delivered. Defaults to `10` seconds.
pub peer_timeout: Duration,
}
impl Default for ClosestPeersIterConfig {
fn default() -> Self {
ClosestPeersIterConfig {
parallelism: ALPHA_VALUE as usize,
num_results: K_VALUE as usize,
peer_timeout: Duration::from_secs(10),
}
}
}
impl ClosestPeersIter {
/// Creates a new iterator with a default configuration.
pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
where
I: IntoIterator<Item = Key<PeerId>>
{
Self::with_config(ClosestPeersIterConfig::default(), target, known_closest_peers)
}
/// Creates a new iterator with the given configuration.
pub fn with_config<I, T>(config: ClosestPeersIterConfig, target: T, known_closest_peers: I) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
T: Into<KeyBytes>
{
let target = target.into();
// Initialise the closest peers to start the iterator with.
let closest_peers = BTreeMap::from_iter(
known_closest_peers
.into_iter()
.map(|key| {
let distance = key.distance(&target);
let state = PeerState::NotContacted;
(distance, Peer { key, state })
})
.take(config.num_results));
// The iterator initially makes progress by iterating towards the target.
let state = State::Iterating { no_progress : 0 };
ClosestPeersIter {
config,
target,
state,
closest_peers,
num_waiting: 0
}
}
/// Callback for delivering the result of a successful request to a peer
/// that the iterator is waiting on.
///
/// Delivering results of requests back to the iterator allows the iterator to make
/// progress. The iterator is said to make progress either when the given
/// `closer_peers` contain a peer closer to the target than any peer seen so far,
/// or when the iterator did not yet accumulate `num_results` closest peers and
/// `closer_peers` contains a new peer, regardless of its distance to the target.
///
/// After calling this function, `next` should eventually be called again
/// to advance the state of the iterator.
///
/// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect.
pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I)
where
I: IntoIterator<Item = PeerId>
{
if let State::Finished = self.state {
return
}
let key = Key::from(peer.clone());
let distance = key.distance(&self.target);
// Mark the peer as succeeded.
match self.closest_peers.entry(distance) {
Entry::Vacant(..) => return,
Entry::Occupied(mut e) => match e.get().state {
PeerState::Waiting(..) => {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
e.get_mut().state = PeerState::Succeeded;
}
PeerState::Unresponsive => {
e.get_mut().state = PeerState::Succeeded;
}
PeerState::NotContacted
| PeerState::Failed
| PeerState::Succeeded => return
}
}
let num_closest = self.closest_peers.len();
let mut progress = false;
// Incorporate the reported closer peers into the iterator.
for peer in closer_peers {
let key = peer.into();
let distance = self.target.distance(&key);
let peer = Peer { key, state: PeerState::NotContacted };
self.closest_peers.entry(distance).or_insert(peer);
// The iterator makes progress if the new peer is either closer to the target
// than any peer seen so far (i.e. is the first entry), or the iterator did
// not yet accumulate enough closest peers.
progress = self.closest_peers.keys().next() == Some(&distance)
|| num_closest < self.config.num_results;
}
// Update the iterator state.
self.state = match self.state {
State::Iterating { no_progress } => {
let no_progress = if progress { 0 } else { no_progress + 1 };
if no_progress >= self.config.parallelism {
State::Stalled
} else {
State::Iterating { no_progress }
}
}
State::Stalled =>
if progress {
State::Iterating { no_progress: 0 }
} else {
State::Stalled
}
State::Finished => State::Finished
}
}
/// Callback for informing the iterator about a failed request to a peer
/// that the iterator is waiting on.
///
/// After calling this function, `next` should eventually be called again
/// to advance the state of the iterator.
///
/// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect.
pub fn on_failure(&mut self, peer: &PeerId) {
if let State::Finished = self.state {
return
}
let key = Key::from(peer.clone());
let distance = key.distance(&self.target);
match self.closest_peers.entry(distance) {
Entry::Vacant(_) => return,
Entry::Occupied(mut e) => match e.get().state {
PeerState::Waiting(_) => {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
e.get_mut().state = PeerState::Failed
}
PeerState::Unresponsive => {
e.get_mut().state = PeerState::Failed
}
_ => {}
}
}
}
/// Returns the list of peers for which the iterator is currently waiting
/// for results.
pub fn waiting(&self) -> impl Iterator<Item = &PeerId> {
self.closest_peers.values().filter_map(|peer|
match peer.state {
PeerState::Waiting(..) => Some(peer.key.preimage()),
_ => None
})
}
/// Returns the number of peers for which the iterator is currently
/// waiting for results.
pub fn num_waiting(&self) -> usize {
self.num_waiting
}
/// Returns true if the iterator is waiting for a response from the given peer.
pub fn is_waiting(&self, peer: &PeerId) -> bool {
self.waiting().any(|p| peer == p)
}
/// Advances the state of the iterator, potentially getting a new peer to contact.
pub fn next(&mut self, now: Instant) -> PeersIterState {
if let State::Finished = self.state {
return PeersIterState::Finished
}
// Count the number of peers that returned a result. If there is a
// request in progress to one of the `num_results` closest peers, the
// counter is set to `None` as the iterator can only finish once
// `num_results` closest peers have responded (or there are no more
// peers to contact, see `num_waiting`).
let mut result_counter = Some(0);
// Check if the iterator is at capacity w.r.t. the allowed parallelism.
let at_capacity = self.at_capacity();
for peer in self.closest_peers.values_mut() {
match peer.state {
PeerState::Waiting(timeout) => {
if now >= timeout {
// Unresponsive peers no longer count towards the limit for the
// bounded parallelism, though they might still be ongoing and
// their results can still be delivered to the iterator.
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
peer.state = PeerState::Unresponsive
}
else if at_capacity {
// The iterator is still waiting for a result from a peer and is
// at capacity w.r.t. the maximum number of peers being waited on.
return PeersIterState::WaitingAtCapacity
}
else {
// The iterator is still waiting for a result from a peer and the
// `result_counter` did not yet reach `num_results`. Therefore
// the iterator is not yet done, regardless of already successful
// queries to peers farther from the target.
result_counter = None;
}
}
PeerState::Succeeded =>
if let Some(ref mut cnt) = result_counter {
*cnt += 1;
// If `num_results` successful results have been delivered for the
// closest peers, the iterator is done.
if *cnt >= self.config.num_results {
self.state = State::Finished;
return PeersIterState::Finished
}
}
PeerState::NotContacted =>
if !at_capacity {
let timeout = now + self.config.peer_timeout;
peer.state = PeerState::Waiting(timeout);
self.num_waiting += 1;
return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())))
} else {
return PeersIterState::WaitingAtCapacity
}
PeerState::Unresponsive | PeerState::Failed => {
// Skip over unresponsive or failed peers.
}
}
}
if self.num_waiting > 0 {
// The iterator is still waiting for results and not at capacity w.r.t.
// the allowed parallelism, but there are no new peers to contact
// at the moment.
PeersIterState::Waiting(None)
} else {
// The iterator is finished because all available peers have been contacted
// and the iterator is not waiting for any more results.
self.state = State::Finished;
PeersIterState::Finished
}
}
/// Immediately transitions the iterator to [`PeersIterState::Finished`].
pub fn finish(&mut self) {
self.state = State::Finished
}
/// Checks whether the iterator has finished.
pub fn finished(&self) -> bool {
self.state == State::Finished
}
/// Consumes the iterator, returning the target and the closest peers.
pub fn into_result(self) -> impl Iterator<Item = PeerId> {
self.closest_peers
.into_iter()
.filter_map(|(_, peer)| {
if let PeerState::Succeeded = peer.state {
Some(peer.key.into_preimage())
} else {
None
}
})
.take(self.config.num_results)
}
/// Checks if the iterator is at capacity w.r.t. the permitted parallelism.
///
/// While the iterator is stalled, up to `num_results` parallel requests
/// are allowed. This is a slightly more permissive variant of the
/// requirement that the initiator "resends the FIND_NODE to all of the
/// k closest nodes it has not already queried".
fn at_capacity(&self) -> bool {
match self.state {
State::Stalled => self.num_waiting >= self.config.num_results,
State::Iterating { .. } => self.num_waiting >= self.config.parallelism,
State::Finished => true
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Private state
/// Internal state of the iterator.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum State {
/// The iterator is making progress by iterating towards `num_results` closest
/// peers to the target with a maximum of `parallelism` peers for which the
/// iterator is waiting for results at a time.
///
/// > **Note**: When the iterator switches back to `Iterating` after being
/// > `Stalled`, it may temporarily be waiting for more than `parallelism`
/// > results from peers, with new peers only being considered once
/// > the number pending results drops below `parallelism`.
Iterating {
/// The number of consecutive results that did not yield a peer closer
/// to the target. When this number reaches `parallelism` and no new
/// peer was discovered or at least `num_results` peers are known to
/// the iterator, it is considered `Stalled`.
no_progress: usize,
},
/// A iterator is stalled when it did not make progress after `parallelism`
/// consecutive successful results (see `on_success`).
///
/// While the iterator is stalled, the maximum allowed parallelism for pending
/// results is increased to `num_results` in an attempt to finish the iterator.
/// If the iterator can make progress again upon receiving the remaining
/// results, it switches back to `Iterating`. Otherwise it will be finished.
Stalled,
/// The iterator is finished.
///
/// A iterator finishes either when it has collected `num_results` results
/// from the closest peers (not counting those that failed or are unresponsive)
/// or because the iterator ran out of peers that have not yet delivered
/// results (or failed).
Finished
}
/// Representation of a peer in the context of a iterator.
#[derive(Debug, Clone)]
struct Peer {
key: Key<PeerId>,
state: PeerState
}
/// The state of a single `Peer`.
#[derive(Debug, Copy, Clone)]
enum PeerState {
/// The peer has not yet been contacted.
///
/// This is the starting state for every peer.
NotContacted,
/// The iterator is waiting for a result from the peer.
Waiting(Instant),
/// A result was not delivered for the peer within the configured timeout.
///
/// The peer is not taken into account for the termination conditions
/// of the iterator until and unless it responds.
Unresponsive,
/// Obtaining a result from the peer has failed.
///
/// This is a final state, reached as a result of a call to `on_failure`.
Failed,
/// A successful result from the peer has been delivered.
///
/// This is a final state, reached as a result of a call to `on_success`.
Succeeded,
}
#[cfg(test)]
mod tests {
use super::*;
use libp2p_core::PeerId;
use quickcheck::*;
use multihash::Multihash;
use rand::{Rng, thread_rng};
use std::{iter, time::Duration};
fn random_peers(n: usize) -> impl Iterator<Item = PeerId> + Clone {
(0 .. n).map(|_| PeerId::random())
}
fn random_iter<G: Rng>(g: &mut G) -> ClosestPeersIter {
let known_closest_peers = random_peers(g.gen_range(1, 60)).map(Key::from);
let target = Key::from(Into::<Multihash>::into(PeerId::random()));
let config = ClosestPeersIterConfig {
parallelism: g.gen_range(1, 10),
num_results: g.gen_range(1, 25),
peer_timeout: Duration::from_secs(g.gen_range(10, 30)),
};
ClosestPeersIter::with_config(config, target, known_closest_peers)
}
fn sorted<T: AsRef<KeyBytes>>(target: &T, peers: &Vec<Key<PeerId>>) -> bool {
peers.windows(2).all(|w| w[0].distance(&target) < w[1].distance(&target))
}
impl Arbitrary for ClosestPeersIter {
fn arbitrary<G: Gen>(g: &mut G) -> ClosestPeersIter {
random_iter(g)
}
}
#[test]
fn new_iter() {
let iter = random_iter(&mut thread_rng());
let target = iter.target.clone();
let (keys, states): (Vec<_>, Vec<_>) = iter.closest_peers
.values()
.map(|e| (e.key.clone(), &e.state))
.unzip();
let none_contacted = states
.iter()
.all(|s| match s {
PeerState::NotContacted => true,
_ => false
});
assert!(none_contacted,
"Unexpected peer state in new iterator.");
assert!(sorted(&target, &keys),
"Closest peers in new iterator not sorted by distance to target.");
assert_eq!(iter.num_waiting(), 0,
"Unexpected peers in progress in new iterator.");
assert_eq!(iter.into_result().count(), 0,
"Unexpected closest peers in new iterator");
}
#[test]
fn termination_and_parallelism() {
fn prop(mut iter: ClosestPeersIter) {
let now = Instant::now();
let mut rng = thread_rng();
let mut expected = iter.closest_peers
.values()
.map(|e| e.key.clone())
.collect::<Vec<_>>();
let num_known = expected.len();
let max_parallelism = usize::min(iter.config.parallelism, num_known);
let target = iter.target.clone();
let mut remaining;
let mut num_failures = 0;
'finished: loop {
if expected.len() == 0 {
break;
}
// Split off the next up to `parallelism` expected peers.
else if expected.len() < max_parallelism {
remaining = Vec::new();
}
else {
remaining = expected.split_off(max_parallelism);
}
// Advance for maximum parallelism.
for k in expected.iter() {
match iter.next(now) {
PeersIterState::Finished => break 'finished,
PeersIterState::Waiting(Some(p)) => assert_eq!(&*p, k.preimage()),
PeersIterState::Waiting(None) => panic!("Expected another peer."),
PeersIterState::WaitingAtCapacity => panic!("Unexpectedly reached capacity.")
}
}
let num_waiting = iter.num_waiting();
assert_eq!(num_waiting, expected.len());
// Check the bounded parallelism.
if iter.at_capacity() {
assert_eq!(iter.next(now), PeersIterState::WaitingAtCapacity)
}
// Report results back to the iterator with a random number of "closer"
// peers or an error, thus finishing the "in-flight requests".
for (i, k) in expected.iter().enumerate() {
if rng.gen_bool(0.75) {
let num_closer = rng.gen_range(0, iter.config.num_results + 1);
let closer_peers = random_peers(num_closer).collect::<Vec<_>>();
remaining.extend(closer_peers.iter().cloned().map(Key::from));
iter.on_success(k.preimage(), closer_peers);
} else {
num_failures += 1;
iter.on_failure(k.preimage());
}
assert_eq!(iter.num_waiting(), num_waiting - (i + 1));
}
// Re-sort the remaining expected peers for the next "round".
remaining.sort_by_key(|k| target.distance(&k));
expected = remaining
}
// The iterator must be finished.
assert_eq!(iter.next(now), PeersIterState::Finished);
assert_eq!(iter.state, State::Finished);
// Determine if all peers have been contacted by the iterator. This _must_ be
// the case if the iterator finished with fewer than the requested number
// of results.
let all_contacted = iter.closest_peers.values().all(|e| match e.state {
PeerState::NotContacted | PeerState::Waiting { .. } => false,
_ => true
});
let target = iter.target.clone();
let num_results = iter.config.num_results;
let result = iter.into_result();
let closest = result.map(Key::from).collect::<Vec<_>>();
assert!(sorted(&target, &closest));
if closest.len() < num_results {
// The iterator returned fewer results than requested. Therefore
// either the initial number of known peers must have been
// less than the desired number of results, or there must
// have been failures.
assert!(num_known < num_results || num_failures > 0);
// All peers must have been contacted.
assert!(all_contacted, "Not all peers have been contacted.");
} else {
assert_eq!(num_results, closest.len(), "Too many results.");
}
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
#[test]
fn no_duplicates() {
fn prop(mut iter: ClosestPeersIter) -> bool {
let now = Instant::now();
let closer = random_peers(1).collect::<Vec<_>>();
// A first peer reports a "closer" peer.
let peer1 = match iter.next(now) {
PeersIterState::Waiting(Some(p)) => p.into_owned(),
_ => panic!("No peer.")
};
iter.on_success(&peer1, closer.clone());
// Duplicate result from te same peer.
iter.on_success(&peer1, closer.clone());
// If there is a second peer, let it also report the same "closer" peer.
match iter.next(now) {
PeersIterState::Waiting(Some(p)) => {
let peer2 = p.into_owned();
iter.on_success(&peer2, closer.clone())
}
PeersIterState::Finished => {}
_ => panic!("Unexpectedly iter state."),
};
// The "closer" peer must only be in the iterator once.
let n = iter.closest_peers.values().filter(|e| e.key.preimage() == &closer[0]).count();
assert_eq!(n, 1);
true
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
#[test]
fn timeout() {
fn prop(mut iter: ClosestPeersIter) -> bool {
let mut now = Instant::now();
let peer = iter.closest_peers.values().next().unwrap().key.clone().into_preimage();
// Poll the iterator for the first peer to be in progress.
match iter.next(now) {
PeersIterState::Waiting(Some(id)) => assert_eq!(&*id, &peer),
_ => panic!()
}
// Artificially advance the clock.
now = now + iter.config.peer_timeout;
// Advancing the iterator again should mark the first peer as unresponsive.
let _ = iter.next(now);
match &iter.closest_peers.values().next().unwrap() {
Peer { key, state: PeerState::Unresponsive } => {
assert_eq!(key.preimage(), &peer);
},
Peer { state, .. } => panic!("Unexpected peer state: {:?}", state)
}
let finished = iter.finished();
iter.on_success(&peer, iter::empty());
let closest = iter.into_result().collect::<Vec<_>>();
if finished {
// Delivering results when the iterator already finished must have
// no effect.
assert_eq!(Vec::<PeerId>::new(), closest)
} else {
// Unresponsive peers can still deliver results while the iterator
// is not finished.
assert_eq!(vec![peer], closest)
}
true
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
}

View File

@ -0,0 +1,135 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use super::*;
use fnv::FnvHashMap;
use libp2p_core::PeerId;
use std::{vec, collections::hash_map::Entry};
/// A peer iterator for a fixed set of peers.
pub struct FixedPeersIter {
/// Ther permitted parallelism, i.e. number of pending results.
parallelism: usize,
/// The state of peers emitted by the iterator.
peers: FnvHashMap<PeerId, PeerState>,
/// The backlog of peers that can still be emitted.
iter: vec::IntoIter<PeerId>,
/// The internal state of the iterator.
state: State,
}
enum State {
Waiting { num_waiting: usize },
Finished
}
#[derive(Copy, Clone, PartialEq, Eq)]
enum PeerState {
/// The iterator is waiting for a result to be reported back for the peer.
Waiting,
/// The iterator has been informed that the attempt to contact the peer failed.
Failed,
/// The iterator has been informed of a successful result from the peer.
Succeeded,
}
impl FixedPeersIter {
pub fn new(peers: Vec<PeerId>, parallelism: usize) -> Self {
Self {
parallelism,
peers: FnvHashMap::default(),
iter: peers.into_iter(),
state: State::Waiting { num_waiting: 0 },
}
}
pub fn on_success(&mut self, peer: &PeerId) {
if let State::Waiting { num_waiting } = &mut self.state {
if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
*state = PeerState::Succeeded;
*num_waiting -= 1;
}
}
}
pub fn on_failure(&mut self, peer: &PeerId) {
if let State::Waiting { .. } = &self.state {
if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
*state = PeerState::Failed;
}
}
}
pub fn is_waiting(&self, peer: &PeerId) -> bool {
self.peers.get(peer) == Some(&PeerState::Waiting)
}
pub fn finish(&mut self) {
if let State::Waiting { .. } = self.state {
self.state = State::Finished
}
}
pub fn next(&mut self) -> PeersIterState {
match &mut self.state {
State::Finished => return PeersIterState::Finished,
State::Waiting { num_waiting } => {
if *num_waiting >= self.parallelism {
return PeersIterState::WaitingAtCapacity
}
loop {
match self.iter.next() {
None => if *num_waiting == 0 {
self.state = State::Finished;
return PeersIterState::Finished
} else {
return PeersIterState::Waiting(None)
}
Some(p) => match self.peers.entry(p.clone()) {
Entry::Occupied(_) => {} // skip duplicates
Entry::Vacant(e) => {
*num_waiting += 1;
e.insert(PeerState::Waiting);
return PeersIterState::Waiting(Some(Cow::Owned(p)))
}
}
}
}
}
}
}
pub fn into_result(self) -> impl Iterator<Item = PeerId> {
self.peers.into_iter()
.filter_map(|(p, s)|
if let PeerState::Succeeded = s {
Some(p)
} else {
None
})
}
}

View File

@ -18,21 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Abstracts the Kademlia record store behaviour and provides default in-memory store
//! Records and record storage abstraction of the libp2p Kademlia DHT.
use fnv::FnvHashMap;
use multihash::Multihash;
use std::borrow::Cow;
/// The error record store may return
#[derive(Clone, Debug, PartialEq)]
pub enum RecordStorageError {
/// Store reached the capacity limit.
AtCapacity,
/// Value being put is larger than the limit.
ValueTooLarge,
}
/// The records that are kept in the dht.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Record {
@ -48,6 +39,15 @@ pub trait RecordStore {
fn put(&mut self, r: Record) -> Result<(), RecordStorageError>;
}
/// The error record store may return
#[derive(Clone, Debug, PartialEq)]
pub enum RecordStorageError {
/// Store reached the capacity limit.
AtCapacity,
/// Value being put is larger than the limit.
ValueTooLarge,
}
/// In-memory implementation of the record store.
pub struct MemoryRecordStorage {
/// Maximum number of records we will store.

View File

@ -1,103 +0,0 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Contains the state of the second stage of PUT_VALUE process of Kademlia.
use fnv::FnvHashMap;
/// The state of the single peer.
#[derive(Clone)]
enum PeerState {
/// We don't know yet.
Unknown,
/// Putting a value failed.
Failed,
/// Putting a value succeeded.
Succeeded,
}
/// State of the `PUV_VALUE` second stage
///
/// Here we are gathering the results of all `PUT_VALUE` requests that we've
/// sent to the appropriate peers. We keep track of the set of peers that we've
/// sent the requests to and the counts for error and normal responses
pub struct WriteState<TPeerId, TTarget> {
/// The key that we're inserting into the dht.
target: TTarget,
/// The peers thae we'are asking to store our value.
peers: FnvHashMap<TPeerId, PeerState>,
/// The count of successful stores.
successes: usize,
/// The count of errors.
failures: usize,
}
impl<TPeerId, TTarget> WriteState<TPeerId, TTarget>
where
TPeerId: std::hash::Hash + Clone + Eq
{
/// Creates a new WriteState.
///
/// Stores the state of an ongoing second stage of a PUT_VALUE process
pub fn new(target: TTarget, peers: Vec<TPeerId>) -> Self {
use std::iter::FromIterator;
WriteState {
target,
peers: FnvHashMap::from_iter(peers
.into_iter()
.zip(std::iter::repeat(PeerState::Unknown))
),
successes: 0,
failures: 0,
}
}
/// Inform the state that writing to one of the target peers has succeeded
pub fn inject_write_success(&mut self, peer: &TPeerId) {
if let Some(state @ PeerState::Unknown) = self.peers.get_mut(peer) {
*state = PeerState::Succeeded;
self.successes += 1;
}
}
/// Inform the state that writing to one of the target peers has failed
pub fn inject_write_error(&mut self, peer: &TPeerId) {
if let Some(state @ PeerState::Unknown) = self.peers.get_mut(peer) {
*state = PeerState::Failed;
self.failures += 1;
}
}
/// Ask the state if it is done
// TODO: probably it should also be a poll() in the fashion of QueryState and have a timeout
pub fn done(&self) -> bool {
self.peers.len() == self.successes + self.failures
}
/// Consume the state and return a list of target peers and succeess/error counters
pub fn into_inner(self) -> (TTarget, usize, usize) {
(self.target, self.successes, self.failures)
}
}