protocols/kad: Implement S-Kademlia's lookup over disjoint paths v2 (#1473)

The extension paper S-Kademlia includes a proposal for lookups over
disjoint paths. Within vanilla Kademlia, queries keep track of the
closest nodes in a single bucket. Any adversary along the path can thus
influence all future paths, in case they can come up with the
next-closest (not overall closest) hops. S-Kademlia tries to solve the
attack above by querying over disjoint paths using multiple buckets.

To adjust the libp2p Kademlia implementation accordingly this change-set
introduces an additional peers iterator: `ClosestDisjointPeersIter`.
This new iterator wraps around a set of `ClosestPeersIter`
`ClosestDisjointPeersIter` enforces that each of the `ClosestPeersIter`
explore disjoint paths by having each peer instantly return that was
queried by a different iterator before.
This commit is contained in:
Max Inden
2020-06-19 12:22:26 +02:00
committed by GitHub
parent 00fc223487
commit 9dd2d662e9
10 changed files with 1432 additions and 120 deletions

View File

@ -33,13 +33,14 @@ use async_std::{io, task};
use futures::prelude::*; use futures::prelude::*;
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{ use libp2p::kad::{
record::Key,
Kademlia, Kademlia,
KademliaEvent, KademliaEvent,
PeerRecord,
PutRecordOk, PutRecordOk,
QueryResult, QueryResult,
Quorum, Quorum,
Record Record,
record::Key,
}; };
use libp2p::{ use libp2p::{
NetworkBehaviour, NetworkBehaviour,
@ -86,7 +87,7 @@ fn main() -> Result<(), Box<dyn Error>> {
match message { match message {
KademliaEvent::QueryResult { result, .. } => match result { KademliaEvent::QueryResult { result, .. } => match result {
QueryResult::GetRecord(Ok(ok)) => { QueryResult::GetRecord(Ok(ok)) => {
for Record { key, value, .. } in ok.records { for PeerRecord { record: Record { key, value, .. }, ..} in ok.records {
println!( println!(
"Got record {:?} {:?}", "Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(), std::str::from_utf8(key.as_ref()).unwrap(),

View File

@ -30,6 +30,7 @@ unsigned-varint = { version = "0.3", features = ["futures-codec"] }
void = "1.0" void = "1.0"
[dev-dependencies] [dev-dependencies]
futures-timer = "3.0"
libp2p-secio = { path = "../secio" } libp2p-secio = { path = "../secio" }
libp2p-yamux = { path = "../../muxers/yamux" } libp2p-yamux = { path = "../../muxers/yamux" }
quickcheck = "0.9.0" quickcheck = "0.9.0"

View File

@ -127,8 +127,9 @@ impl Default for KademliaConfig {
impl KademliaConfig { impl KademliaConfig {
/// Sets a custom protocol name. /// Sets a custom protocol name.
/// ///
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a /// Kademlia nodes only communicate with other nodes using the same protocol
/// custom name therefore allows to segregate the DHT from others, if that is desired. /// name. Using a custom name therefore allows to segregate the DHT from
/// others, if that is desired.
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self { pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_config.set_protocol_name(name); self.protocol_config.set_protocol_name(name);
self self
@ -154,10 +155,41 @@ impl KademliaConfig {
self self
} }
/// Sets the allowed level of parallelism for iterative queries.
///
/// The `α` parameter in the Kademlia paper. The maximum number of peers
/// that an iterative query is allowed to wait for in parallel while
/// iterating towards the closest nodes to a target. Defaults to
/// `ALPHA_VALUE`.
///
/// This only controls the level of parallelism of an iterative query, not
/// the level of parallelism of a query to a fixed set of peers.
///
/// When used with [`KademliaConfig::disjoint_query_paths`] it equals
/// the amount of disjoint paths used.
pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
self.query_config.parallelism = parallelism;
self
}
/// Require iterative queries to use disjoint paths for increased resiliency
/// in the presence of potentially adversarial nodes.
///
/// When enabled the number of disjoint paths used equals the configured
/// parallelism.
///
/// See the S/Kademlia paper for more information on the high level design
/// as well as its security improvements.
pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
self.query_config.disjoint_query_paths = enabled;
self
}
/// Sets the TTL for stored records. /// Sets the TTL for stored records.
/// ///
/// The TTL should be significantly longer than the (re-)publication /// The TTL should be significantly longer than the (re-)publication
/// interval, to avoid premature expiration of records. The default is 36 hours. /// interval, to avoid premature expiration of records. The default is 36
/// hours.
/// ///
/// `None` means records never expire. /// `None` means records never expire.
/// ///
@ -191,10 +223,10 @@ impl KademliaConfig {
/// Sets the (re-)publication interval of stored records. /// Sets the (re-)publication interval of stored records.
/// ///
/// Records persist in the DHT until they expire. By default, published records /// Records persist in the DHT until they expire. By default, published
/// are re-published in regular intervals for as long as the record exists /// records are re-published in regular intervals for as long as the record
/// in the local storage of the original publisher, thereby extending the /// exists in the local storage of the original publisher, thereby extending
/// records lifetime. /// the records lifetime.
/// ///
/// This interval should be significantly shorter than the record TTL, to /// This interval should be significantly shorter than the record TTL, to
/// ensure records do not expire prematurely. The default is 24 hours. /// ensure records do not expire prematurely. The default is 24 hours.
@ -220,7 +252,8 @@ impl KademliaConfig {
/// Sets the interval at which provider records for keys provided /// Sets the interval at which provider records for keys provided
/// by the local node are re-published. /// by the local node are re-published.
/// ///
/// `None` means that stored provider records are never automatically re-published. /// `None` means that stored provider records are never automatically
/// re-published.
/// ///
/// Must be significantly less than the provider record TTL. /// Must be significantly less than the provider record TTL.
pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self { pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
@ -236,7 +269,8 @@ impl KademliaConfig {
/// Modifies the maximum allowed size of individual Kademlia packets. /// Modifies the maximum allowed size of individual Kademlia packets.
/// ///
/// It might be necessary to increase this value if trying to put large records. /// It might be necessary to increase this value if trying to put large
/// records.
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self { pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
self.protocol_config.set_max_packet_size(size); self.protocol_config.set_max_packet_size(size);
self self
@ -247,7 +281,7 @@ impl<TStore> Kademlia<TStore>
where where
for<'a> TStore: RecordStore<'a> for<'a> TStore: RecordStore<'a>
{ {
/// Creates a new `Kademlia` network behaviour with the given configuration. /// Creates a new `Kademlia` network behaviour with a default configuration.
pub fn new(id: PeerId, store: TStore) -> Self { pub fn new(id: PeerId, store: TStore) -> Self {
Self::with_config(id, store, Default::default()) Self::with_config(id, store, Default::default())
} }
@ -430,7 +464,7 @@ where
if record.is_expired(Instant::now()) { if record.is_expired(Instant::now()) {
self.store.remove(key) self.store.remove(key)
} else { } else {
records.push(record.into_owned()); records.push(PeerRecord{ peer: None, record: record.into_owned()});
} }
} }
@ -892,7 +926,7 @@ where
if let Some(cache_key) = cache_at { if let Some(cache_key) = cache_at {
// Cache the record at the closest node to the key that // Cache the record at the closest node to the key that
// did not return the record. // did not return the record.
let record = records.first().expect("[not empty]").clone(); let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0"); let quorum = NonZeroUsize::new(1).expect("1 > 0");
let context = PutRecordContext::Cache; let context = PutRecordContext::Cache;
let info = QueryInfo::PutRecord { let info = QueryInfo::PutRecord {
@ -900,7 +934,7 @@ where
record, record,
quorum, quorum,
phase: PutRecordPhase::PutRecord { phase: PutRecordPhase::PutRecord {
num_results: 0, success: vec![],
get_closest_peers_stats: QueryStats::empty() get_closest_peers_stats: QueryStats::empty()
} }
}; };
@ -934,7 +968,7 @@ where
record, record,
quorum, quorum,
phase: PutRecordPhase::PutRecord { phase: PutRecordPhase::PutRecord {
num_results: 0, success: vec![],
get_closest_peers_stats: result.stats get_closest_peers_stats: result.stats
} }
}; };
@ -947,13 +981,13 @@ where
context, context,
record, record,
quorum, quorum,
phase: PutRecordPhase::PutRecord { num_results, get_closest_peers_stats } phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
} => { } => {
let mk_result = |key: record::Key| { let mk_result = |key: record::Key| {
if num_results >= quorum.get() { if success.len() >= quorum.get() {
Ok(PutRecordOk { key }) Ok(PutRecordOk { key })
} else { } else {
Err(PutRecordError::QuorumFailed { key, quorum, num_results }) Err(PutRecordError::QuorumFailed { key, quorum, success })
} }
}; };
match context { match context {
@ -1050,9 +1084,9 @@ where
let err = Err(PutRecordError::Timeout { let err = Err(PutRecordError::Timeout {
key: record.key, key: record.key,
quorum, quorum,
num_results: match phase { success: match phase {
PutRecordPhase::GetClosestPeers => 0, PutRecordPhase::GetClosestPeers => vec![],
PutRecordPhase::PutRecord { num_results, .. } => num_results PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
} }
}); });
match context { match context {
@ -1098,7 +1132,7 @@ where
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::GetRecord(Err( result: QueryResult::GetRecord(Err(
GetRecordError::Timeout { key, records, quorum } GetRecordError::Timeout { key, records, quorum },
)) ))
}), }),
@ -1475,9 +1509,24 @@ where
key, records, quorum, cache_at key, records, quorum, cache_at
} = &mut query.inner.info { } = &mut query.inner.info {
if let Some(record) = record { if let Some(record) = record {
records.push(record); records.push(PeerRecord{ peer: Some(source.clone()), record });
if records.len() >= quorum.get() {
query.finish() let quorum = quorum.get();
if records.len() >= quorum {
// Desired quorum reached. The query may finish. See
// [`Query::try_finish`] for details.
let peers = records.iter()
.filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
.cloned()
.collect::<Vec<_>>();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"GetRecord query ({:?}) reached quorum ({}/{}) with \
response from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
} }
} else if quorum.get() == 1 { } else if quorum.get() == 1 {
// It is a "standard" Kademlia query, for which the // It is a "standard" Kademlia query, for which the
@ -1513,11 +1562,21 @@ where
if let Some(query) = self.queries.get_mut(&user_data) { if let Some(query) = self.queries.get_mut(&user_data) {
query.on_success(&source, vec![]); query.on_success(&source, vec![]);
if let QueryInfo::PutRecord { if let QueryInfo::PutRecord {
phase: PutRecordPhase::PutRecord { num_results, .. }, quorum, .. phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
} = &mut query.inner.info { } = &mut query.inner.info {
*num_results += 1; success.push(source.clone());
if *num_results >= quorum.get() {
query.finish() let quorum = quorum.get();
if success.len() >= quorum {
let peers = success.clone();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"PutRecord query ({:?}) reached quorum ({}/{}) with response \
from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
} }
} }
} }
@ -1659,6 +1718,16 @@ impl Quorum {
} }
} }
/// A record either received by the given peer or retrieved from the local
/// record store.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
/// The peer from whom the record was received. `None` if the record was
/// retrieved from local storage.
pub peer: Option<PeerId>,
pub record: Record,
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Events // Events
@ -1742,7 +1811,7 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
/// The successful result of [`Kademlia::get_record`]. /// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct GetRecordOk { pub struct GetRecordOk {
pub records: Vec<Record> pub records: Vec<PeerRecord>
} }
/// The error result of [`Kademlia::get_record`]. /// The error result of [`Kademlia::get_record`].
@ -1754,12 +1823,12 @@ pub enum GetRecordError {
}, },
QuorumFailed { QuorumFailed {
key: record::Key, key: record::Key,
records: Vec<Record>, records: Vec<PeerRecord>,
quorum: NonZeroUsize quorum: NonZeroUsize
}, },
Timeout { Timeout {
key: record::Key, key: record::Key,
records: Vec<Record>, records: Vec<PeerRecord>,
quorum: NonZeroUsize quorum: NonZeroUsize
} }
} }
@ -1799,12 +1868,14 @@ pub struct PutRecordOk {
pub enum PutRecordError { pub enum PutRecordError {
QuorumFailed { QuorumFailed {
key: record::Key, key: record::Key,
num_results: usize, /// [`PeerId`]s of the peers the record was successfully stored on.
success: Vec<PeerId>,
quorum: NonZeroUsize quorum: NonZeroUsize
}, },
Timeout { Timeout {
key: record::Key, key: record::Key,
num_results: usize, /// [`PeerId`]s of the peers the record was successfully stored on.
success: Vec<PeerId>,
quorum: NonZeroUsize quorum: NonZeroUsize
}, },
} }
@ -2061,8 +2132,9 @@ pub enum QueryInfo {
GetRecord { GetRecord {
/// The key to look for. /// The key to look for.
key: record::Key, key: record::Key,
/// The records found so far. /// The records with the id of the peer that returned them. `None` when
records: Vec<Record>, /// the record was found in the local store.
records: Vec<PeerRecord>,
/// The number of records to look for. /// The number of records to look for.
quorum: NonZeroUsize, quorum: NonZeroUsize,
/// The closest peer to `key` that did not return a record. /// The closest peer to `key` that did not return a record.
@ -2150,8 +2222,8 @@ pub enum PutRecordPhase {
/// The query is replicating the record to the closest nodes to the key. /// The query is replicating the record to the closest nodes to the key.
PutRecord { PutRecord {
/// The number of successful replication requests so far. /// A list of peers the given record has been successfully replicated to.
num_results: usize, success: Vec<PeerId>,
/// Query statistics from the finished `GetClosestPeers` phase. /// Query statistics from the finished `GetClosestPeers` phase.
get_closest_peers_stats: QueryStats, get_closest_peers_stats: QueryStats,
}, },

View File

@ -22,14 +22,15 @@
use super::*; use super::*;
use crate::{ALPHA_VALUE, K_VALUE}; use crate::K_VALUE;
use crate::kbucket::Distance; use crate::kbucket::Distance;
use crate::record::store::MemoryStore; use crate::record::{Key, store::MemoryStore};
use futures::{ use futures::{
prelude::*, prelude::*,
executor::block_on, executor::block_on,
future::poll_fn, future::poll_fn,
}; };
use futures_timer::Delay;
use libp2p_core::{ use libp2p_core::{
PeerId, PeerId,
Transport, Transport,
@ -43,8 +44,8 @@ use libp2p_secio::SecioConfig;
use libp2p_swarm::Swarm; use libp2p_swarm::Swarm;
use libp2p_yamux as yamux; use libp2p_yamux as yamux;
use quickcheck::*; use quickcheck::*;
use rand::{Rng, random, thread_rng}; use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng};
use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64}; use std::{collections::{HashSet, HashMap}, time::Duration, io, num::NonZeroUsize, u64};
use multihash::{wrap, Code, Multihash}; use multihash::{wrap, Code, Multihash};
type TestSwarm = Swarm<Kademlia<MemoryStore>>; type TestSwarm = Swarm<Kademlia<MemoryStore>>;
@ -132,21 +133,45 @@ fn random_multihash() -> Multihash {
wrap(Code::Sha2_256, &thread_rng().gen::<[u8; 32]>()) wrap(Code::Sha2_256, &thread_rng().gen::<[u8; 32]>())
} }
#[derive(Clone, Debug)]
struct Seed([u8; 32]);
impl Arbitrary for Seed {
fn arbitrary<G: Gen>(g: &mut G) -> Seed {
Seed(g.gen())
}
}
#[test] #[test]
fn bootstrap() { fn bootstrap() {
fn run(rng: &mut impl Rng) { fn prop(seed: Seed) {
let num_total = rng.gen_range(2, 20); let mut rng = StdRng::from_seed(seed.0);
// When looking for the closest node to a key, Kademlia considers ALPHA_VALUE nodes to query
// at initialization. If `num_groups` is larger than ALPHA_VALUE the remaining locally known
// nodes will not be considered. Given that no other node is aware of them, they would be
// lost entirely. To prevent the above restrict `num_groups` to be equal or smaller than
// ALPHA_VALUE.
let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2);
let mut swarms = build_connected_nodes(num_total, num_group).into_iter() let num_total = rng.gen_range(2, 20);
// When looking for the closest node to a key, Kademlia considers
// K_VALUE nodes to query at initialization. If `num_group` is larger
// than K_VALUE the remaining locally known nodes will not be
// considered. Given that no other node is aware of them, they would be
// lost entirely. To prevent the above restrict `num_group` to be equal
// or smaller than K_VALUE.
let num_group = rng.gen_range(1, (num_total % K_VALUE.get()) + 2);
let mut cfg = KademliaConfig::default();
if rng.gen() {
cfg.disjoint_query_paths(true);
}
let mut swarms = build_connected_nodes_with_config(
num_total,
num_group,
cfg,
).into_iter()
.map(|(_a, s)| s) .map(|(_a, s)| s)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); let swarm_ids: Vec<_> = swarms.iter()
.map(Swarm::local_peer_id)
.cloned()
.collect();
let qid = swarms[0].bootstrap().unwrap(); let qid = swarms[0].bootstrap().unwrap();
@ -190,10 +215,7 @@ fn bootstrap() {
) )
} }
let mut rng = thread_rng(); QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
for _ in 0 .. 10 {
run(&mut rng)
}
} }
#[test] #[test]
@ -415,16 +437,22 @@ fn get_record_not_found() {
) )
} }
/// A node joining a fully connected network via a single bootnode should be able to put a record to /// A node joining a fully connected network via three (ALPHA_VALUE) bootnodes
/// the X closest nodes of the network where X is equal to the configured replication factor. /// should be able to put a record to the X closest nodes of the network where X
/// is equal to the configured replication factor.
#[test] #[test]
fn put_record() { fn put_record() {
fn prop(replication_factor: usize, records: Vec<Record>) { fn prop(records: Vec<Record>, seed: Seed) {
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); let mut rng = StdRng::from_seed(seed.0);
let num_total = replication_factor.get() * 2; let replication_factor = NonZeroUsize::new(rng.gen_range(1, (K_VALUE.get() / 2) + 1)).unwrap();
// At least 4 nodes, 1 under test + 3 bootnodes.
let num_total = usize::max(4, replication_factor.get() * 2);
let mut config = KademliaConfig::default(); let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor); config.set_replication_factor(replication_factor);
if rng.gen() {
config.disjoint_query_paths(true);
}
let mut swarms = { let mut swarms = {
let mut fully_connected_swarms = build_fully_connected_nodes_with_config( let mut fully_connected_swarms = build_fully_connected_nodes_with_config(
@ -433,10 +461,13 @@ fn put_record() {
); );
let mut single_swarm = build_node_with_config(config); let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address( // Connect `single_swarm` to three bootnodes.
Swarm::local_peer_id(&fully_connected_swarms[0].1), for i in 0..3 {
fully_connected_swarms[0].0.clone(), single_swarm.1.add_address(
); Swarm::local_peer_id(&fully_connected_swarms[i].1),
fully_connected_swarms[i].0.clone(),
);
}
let mut swarms = vec![single_swarm]; let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms); swarms.append(&mut fully_connected_swarms);
@ -618,11 +649,13 @@ fn get_record() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::GetRecord(Ok(ok)), .. id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records })),
..
})) => { })) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert_eq!(ok.records.len(), 1); assert_eq!(records.len(), 1);
assert_eq!(ok.records.first(), Some(&record)); assert_eq!(records.first().unwrap().record, record);
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
@ -662,11 +695,13 @@ fn get_record_many() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::GetRecord(Ok(ok)), .. id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records })),
..
})) => { })) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert_eq!(ok.records.len(), num_results); assert_eq!(records.len(), num_results);
assert_eq!(ok.records.first(), Some(&record)); assert_eq!(records.first().unwrap().record, record);
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
@ -681,17 +716,22 @@ fn get_record_many() {
) )
} }
/// A node joining a fully connected network via a single bootnode should be able to add itself as a /// A node joining a fully connected network via three (ALPHA_VALUE) bootnodes
/// provider to the X closest nodes of the network where X is equal to the configured replication /// should be able to add itself as a provider to the X closest nodes of the
/// factor. /// network where X is equal to the configured replication factor.
#[test] #[test]
fn add_provider() { fn add_provider() {
fn prop(replication_factor: usize, keys: Vec<record::Key>) { fn prop(keys: Vec<record::Key>, seed: Seed) {
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); let mut rng = StdRng::from_seed(seed.0);
let num_total = replication_factor.get() * 2; let replication_factor = NonZeroUsize::new(rng.gen_range(1, (K_VALUE.get() / 2) + 1)).unwrap();
// At least 4 nodes, 1 under test + 3 bootnodes.
let num_total = usize::max(4, replication_factor.get() * 2);
let mut config = KademliaConfig::default(); let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor); config.set_replication_factor(replication_factor);
if rng.gen() {
config.disjoint_query_paths(true);
}
let mut swarms = { let mut swarms = {
let mut fully_connected_swarms = build_fully_connected_nodes_with_config( let mut fully_connected_swarms = build_fully_connected_nodes_with_config(
@ -700,10 +740,13 @@ fn add_provider() {
); );
let mut single_swarm = build_node_with_config(config); let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address( // Connect `single_swarm` to three bootnodes.
Swarm::local_peer_id(&fully_connected_swarms[0].1), for i in 0..3 {
fully_connected_swarms[0].0.clone(), single_swarm.1.add_address(
); Swarm::local_peer_id(&fully_connected_swarms[i].1),
fully_connected_swarms[i].0.clone(),
);
}
let mut swarms = vec![single_swarm]; let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms); swarms.append(&mut fully_connected_swarms);
@ -877,3 +920,135 @@ fn exp_decr_expiration_overflow() {
quickcheck(prop_no_panic as fn(_, _)) quickcheck(prop_no_panic as fn(_, _))
} }
#[test]
fn disjoint_query_does_not_finish_before_all_paths_did() {
let mut config = KademliaConfig::default();
config.disjoint_query_paths(true);
// I.e. setting the amount disjoint paths to be explored to 2.
config.set_parallelism(NonZeroUsize::new(2).unwrap());
let mut alice = build_node_with_config(config);
let mut trudy = build_node(); // Trudy the intrudor, an adversary.
let mut bob = build_node();
let key = Key::new(&multihash::Sha2_256::digest(&thread_rng().gen::<[u8; 32]>()));
let record_bob = Record::new(key.clone(), b"bob".to_vec());
let record_trudy = Record::new(key.clone(), b"trudy".to_vec());
// Make `bob` and `trudy` aware of their version of the record searched by
// `alice`.
bob.1.store.put(record_bob.clone()).unwrap();
trudy.1.store.put(record_trudy.clone()).unwrap();
// Make `trudy` and `bob` known to `alice`.
alice.1.add_address(Swarm::local_peer_id(&trudy.1), trudy.0.clone());
alice.1.add_address(Swarm::local_peer_id(&bob.1), bob.0.clone());
// Drop the swarm addresses.
let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1);
// Have `alice` query the Dht for `key` with a quorum of 1.
alice.get_record(&key, Quorum::One);
// The default peer timeout is 10 seconds. Choosing 1 seconds here should
// give enough head room to prevent connections to `bob` to time out.
let mut before_timeout = Delay::new(Duration::from_secs(1));
// Poll only `alice` and `trudy` expecting `alice` not yet to return a query
// result as it is not able to connect to `bob` just yet.
block_on(
poll_fn(|ctx| {
for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult{
result: QueryResult::GetRecord(result),
..
})) => {
if i != 0 {
panic!("Expected `QueryResult` from Alice.")
}
match result {
Ok(_) => panic!(
"Expected query not to finish until all \
disjoint paths have been explored.",
),
Err(e) => panic!("{:?}", e),
}
}
// Ignore any other event.
Poll::Ready(Some(_)) => (),
Poll::Ready(None) => panic!("Expected Kademlia behaviour not to finish."),
Poll::Pending => break,
}
}
}
// Make sure not to wait until connections to `bob` time out.
before_timeout.poll_unpin(ctx)
})
);
// Make sure `alice` has exactly one query with `trudy`'s record only.
assert_eq!(1, alice.queries.iter().count());
alice.queries.iter().for_each(|q| {
match &q.inner.info {
QueryInfo::GetRecord{ records, .. } => {
assert_eq!(
*records,
vec![PeerRecord {
peer: Some(Swarm::local_peer_id(&trudy).clone()),
record: record_trudy.clone(),
}],
);
},
i @ _ => panic!("Unexpected query info: {:?}", i),
}
});
// Poll `alice` and `bob` expecting `alice` to return a successful query
// result as it is now able to explore the second disjoint path.
let records = block_on(
poll_fn(|ctx| {
for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult{
result: QueryResult::GetRecord(result),
..
})) => {
if i != 0 {
panic!("Expected `QueryResult` from Alice.")
}
match result {
Ok(ok) => return Poll::Ready(ok.records),
Err(e) => unreachable!("{:?}", e),
}
}
// Ignore any other event.
Poll::Ready(Some(_)) => (),
Poll::Ready(None) => panic!(
"Expected Kademlia behaviour not to finish.",
),
Poll::Pending => break,
}
}
}
Poll::Pending
})
);
assert_eq!(2, records.len());
assert!(records.contains(&PeerRecord {
peer: Some(Swarm::local_peer_id(&bob).clone()),
record: record_bob,
}));
assert!(records.contains(&PeerRecord {
peer: Some(Swarm::local_peer_id(&trudy).clone()),
record: record_trudy,
}));
}

View File

@ -48,6 +48,8 @@ pub use behaviour::{
QueryInfo, QueryInfo,
QueryStats, QueryStats,
PeerRecord,
BootstrapResult, BootstrapResult,
BootstrapOk, BootstrapOk,
BootstrapError, BootstrapError,
@ -107,4 +109,3 @@ pub const K_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) };
/// ///
/// The current value is `3`. /// The current value is `3`.
pub const ALPHA_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(3) }; pub const ALPHA_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(3) };

View File

@ -21,10 +21,10 @@
mod peers; mod peers;
use peers::PeersIterState; use peers::PeersIterState;
use peers::closest::{ClosestPeersIter, ClosestPeersIterConfig}; use peers::closest::{ClosestPeersIterConfig, ClosestPeersIter, disjoint::ClosestDisjointPeersIter};
use peers::fixed::FixedPeersIter; use peers::fixed::FixedPeersIter;
use crate::K_VALUE; use crate::{ALPHA_VALUE, K_VALUE};
use crate::kbucket::{Key, KeyBytes}; use crate::kbucket::{Key, KeyBytes};
use either::Either; use either::Either;
use fnv::FnvHashMap; use fnv::FnvHashMap;
@ -104,7 +104,7 @@ impl<TInner> QueryPool<TInner> {
I: IntoIterator<Item = PeerId> I: IntoIterator<Item = PeerId>
{ {
assert!(!self.queries.contains_key(&id)); assert!(!self.queries.contains_key(&id));
let parallelism = self.config.replication_factor.get(); let parallelism = self.config.replication_factor;
let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism)); let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
let query = Query::new(id, peer_iter, inner); let query = Query::new(id, peer_iter, inner);
self.queries.insert(id, query); self.queries.insert(id, query);
@ -113,7 +113,7 @@ impl<TInner> QueryPool<TInner> {
/// Adds a query to the pool that iterates towards the closest peers to the target. /// Adds a query to the pool that iterates towards the closest peers to the target.
pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
where where
T: Into<KeyBytes>, T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>> I: IntoIterator<Item = Key<PeerId>>
{ {
let id = self.next_query_id(); let id = self.next_query_id();
@ -124,14 +124,23 @@ impl<TInner> QueryPool<TInner> {
/// Adds a query to the pool that iterates towards the closest peers to the target. /// Adds a query to the pool that iterates towards the closest peers to the target.
pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner) pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
where where
T: Into<KeyBytes>, T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>> I: IntoIterator<Item = Key<PeerId>>
{ {
let cfg = ClosestPeersIterConfig { let cfg = ClosestPeersIterConfig {
num_results: self.config.replication_factor.get(), num_results: self.config.replication_factor,
parallelism: self.config.parallelism,
.. ClosestPeersIterConfig::default() .. ClosestPeersIterConfig::default()
}; };
let peer_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers));
let peer_iter = if self.config.disjoint_query_paths {
QueryPeerIter::ClosestDisjoint(
ClosestDisjointPeersIter::with_config(cfg, target, peers),
)
} else {
QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
};
let query = Query::new(id, peer_iter, inner); let query = Query::new(id, peer_iter, inner);
self.queries.insert(id, query); self.queries.insert(id, query);
} }
@ -212,15 +221,34 @@ pub struct QueryId(usize);
/// The configuration for queries in a `QueryPool`. /// The configuration for queries in a `QueryPool`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct QueryConfig { pub struct QueryConfig {
/// Timeout of a single query.
///
/// See [`crate::behaviour::KademliaConfig::set_query_timeout`] for details.
pub timeout: Duration, pub timeout: Duration,
/// The replication factor to use.
///
/// See [`crate::behaviour::KademliaConfig::set_replication_factor`] for details.
pub replication_factor: NonZeroUsize, pub replication_factor: NonZeroUsize,
/// Allowed level of parallelism for iterative queries.
///
/// See [`crate::behaviour::KademliaConfig::set_parallelism`] for details.
pub parallelism: NonZeroUsize,
/// Whether to use disjoint paths on iterative lookups.
///
/// See [`crate::behaviour::KademliaConfig::disjoint_query_paths`] for details.
pub disjoint_query_paths: bool,
} }
impl Default for QueryConfig { impl Default for QueryConfig {
fn default() -> Self { fn default() -> Self {
QueryConfig { QueryConfig {
timeout: Duration::from_secs(60), timeout: Duration::from_secs(60),
replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0") replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
parallelism: ALPHA_VALUE,
disjoint_query_paths: false,
} }
} }
} }
@ -240,6 +268,7 @@ pub struct Query<TInner> {
/// The peer selection strategies that can be used by queries. /// The peer selection strategies that can be used by queries.
enum QueryPeerIter { enum QueryPeerIter {
Closest(ClosestPeersIter), Closest(ClosestPeersIter),
ClosestDisjoint(ClosestDisjointPeersIter),
Fixed(FixedPeersIter) Fixed(FixedPeersIter)
} }
@ -263,6 +292,7 @@ impl<TInner> Query<TInner> {
pub fn on_failure(&mut self, peer: &PeerId) { pub fn on_failure(&mut self, peer: &PeerId) {
let updated = match &mut self.peer_iter { let updated = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_failure(peer), QueryPeerIter::Closest(iter) => iter.on_failure(peer),
QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
QueryPeerIter::Fixed(iter) => iter.on_failure(peer) QueryPeerIter::Fixed(iter) => iter.on_failure(peer)
}; };
if updated { if updated {
@ -279,6 +309,7 @@ impl<TInner> Query<TInner> {
{ {
let updated = match &mut self.peer_iter { let updated = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers), QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::Fixed(iter) => iter.on_success(peer) QueryPeerIter::Fixed(iter) => iter.on_success(peer)
}; };
if updated { if updated {
@ -290,6 +321,7 @@ impl<TInner> Query<TInner> {
pub fn is_waiting(&self, peer: &PeerId) -> bool { pub fn is_waiting(&self, peer: &PeerId) -> bool {
match &self.peer_iter { match &self.peer_iter {
QueryPeerIter::Closest(iter) => iter.is_waiting(peer), QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
QueryPeerIter::Fixed(iter) => iter.is_waiting(peer) QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
} }
} }
@ -298,6 +330,7 @@ impl<TInner> Query<TInner> {
fn next(&mut self, now: Instant) -> PeersIterState { fn next(&mut self, now: Instant) -> PeersIterState {
let state = match &mut self.peer_iter { let state = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.next(now), QueryPeerIter::Closest(iter) => iter.next(now),
QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
QueryPeerIter::Fixed(iter) => iter.next() QueryPeerIter::Fixed(iter) => iter.next()
}; };
@ -308,6 +341,34 @@ impl<TInner> Query<TInner> {
state state
} }
/// Tries to (gracefully) finish the query prematurely, providing the peers
/// that are no longer of interest for further progress of the query.
///
/// A query may require that in order to finish gracefully a certain subset
/// of peers must be contacted. E.g. in the case of disjoint query paths a
/// query may only finish gracefully if every path contacted a peer whose
/// response permits termination of the query. The given peers are those for
/// which this is considered to be the case, i.e. for which a termination
/// condition is satisfied.
///
/// Returns `true` if the query did indeed finish, `false` otherwise. In the
/// latter case, a new attempt at finishing the query may be made with new
/// `peers`.
///
/// A finished query immediately stops yielding new peers to contact and
/// will be reported by [`QueryPool::poll`] via
/// [`QueryPoolState::Finished`].
pub fn try_finish<'a, I>(&mut self, peers: I) -> bool
where
I: IntoIterator<Item = &'a PeerId>
{
match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => { iter.finish(); true },
QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
QueryPeerIter::Fixed(iter) => { iter.finish(); true }
}
}
/// Finishes the query prematurely. /// Finishes the query prematurely.
/// ///
/// A finished query immediately stops yielding new peers to contact and will be /// A finished query immediately stops yielding new peers to contact and will be
@ -315,6 +376,7 @@ impl<TInner> Query<TInner> {
pub fn finish(&mut self) { pub fn finish(&mut self) {
match &mut self.peer_iter { match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.finish(), QueryPeerIter::Closest(iter) => iter.finish(),
QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
QueryPeerIter::Fixed(iter) => iter.finish() QueryPeerIter::Fixed(iter) => iter.finish()
} }
} }
@ -326,6 +388,7 @@ impl<TInner> Query<TInner> {
pub fn is_finished(&self) -> bool { pub fn is_finished(&self) -> bool {
match &self.peer_iter { match &self.peer_iter {
QueryPeerIter::Closest(iter) => iter.is_finished(), QueryPeerIter::Closest(iter) => iter.is_finished(),
QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
QueryPeerIter::Fixed(iter) => iter.is_finished() QueryPeerIter::Fixed(iter) => iter.is_finished()
} }
} }
@ -333,7 +396,8 @@ impl<TInner> Query<TInner> {
/// Consumes the query, producing the final `QueryResult`. /// Consumes the query, producing the final `QueryResult`.
pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> { pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
let peers = match self.peer_iter { let peers = match self.peer_iter {
QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()), QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()) QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
}; };
QueryResult { peers, inner: self.inner, stats: self.stats } QueryResult { peers, inner: self.inner, stats: self.stats }

View File

@ -65,4 +65,3 @@ pub enum PeersIterState<'a> {
/// The iterator finished. /// The iterator finished.
Finished Finished
} }

View File

@ -23,10 +23,12 @@ use super::*;
use crate::{K_VALUE, ALPHA_VALUE}; use crate::{K_VALUE, ALPHA_VALUE};
use crate::kbucket::{Key, KeyBytes, Distance}; use crate::kbucket::{Key, KeyBytes, Distance};
use libp2p_core::PeerId; use libp2p_core::PeerId;
use std::{time::Duration, iter::FromIterator}; use std::{time::Duration, iter::FromIterator, num::NonZeroUsize};
use std::collections::btree_map::{BTreeMap, Entry}; use std::collections::btree_map::{BTreeMap, Entry};
use wasm_timer::Instant; use wasm_timer::Instant;
pub mod disjoint;
/// A peer iterator for a dynamically changing list of peers, sorted by increasing /// A peer iterator for a dynamically changing list of peers, sorted by increasing
/// distance to a chosen target. /// distance to a chosen target.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -55,13 +57,13 @@ pub struct ClosestPeersIterConfig {
/// The `α` parameter in the Kademlia paper. The maximum number of peers that /// The `α` parameter in the Kademlia paper. The maximum number of peers that
/// the iterator is allowed to wait for in parallel while iterating towards the closest /// the iterator is allowed to wait for in parallel while iterating towards the closest
/// nodes to a target. Defaults to `ALPHA_VALUE`. /// nodes to a target. Defaults to `ALPHA_VALUE`.
pub parallelism: usize, pub parallelism: NonZeroUsize,
/// Number of results (closest peers) to search for. /// Number of results (closest peers) to search for.
/// ///
/// The number of closest peers for which the iterator must obtain successful results /// The number of closest peers for which the iterator must obtain successful results
/// in order to finish successfully. Defaults to `K_VALUE`. /// in order to finish successfully. Defaults to `K_VALUE`.
pub num_results: usize, pub num_results: NonZeroUsize,
/// The timeout for a single peer. /// The timeout for a single peer.
/// ///
@ -75,8 +77,8 @@ pub struct ClosestPeersIterConfig {
impl Default for ClosestPeersIterConfig { impl Default for ClosestPeersIterConfig {
fn default() -> Self { fn default() -> Self {
ClosestPeersIterConfig { ClosestPeersIterConfig {
parallelism: ALPHA_VALUE.get(), parallelism: ALPHA_VALUE,
num_results: K_VALUE.get(), num_results: K_VALUE,
peer_timeout: Duration::from_secs(10), peer_timeout: Duration::from_secs(10),
} }
} }
@ -180,14 +182,14 @@ impl ClosestPeersIter {
// than any peer seen so far (i.e. is the first entry), or the iterator did // than any peer seen so far (i.e. is the first entry), or the iterator did
// not yet accumulate enough closest peers. // not yet accumulate enough closest peers.
progress = self.closest_peers.keys().next() == Some(&distance) progress = self.closest_peers.keys().next() == Some(&distance)
|| num_closest < self.config.num_results; || num_closest < self.config.num_results.get();
} }
// Update the iterator state. // Update the iterator state.
self.state = match self.state { self.state = match self.state {
State::Iterating { no_progress } => { State::Iterating { no_progress } => {
let no_progress = if progress { 0 } else { no_progress + 1 }; let no_progress = if progress { 0 } else { no_progress + 1 };
if no_progress >= self.config.parallelism { if no_progress >= self.config.parallelism.get() {
State::Stalled State::Stalled
} else { } else {
State::Iterating { no_progress } State::Iterating { no_progress }
@ -310,7 +312,7 @@ impl ClosestPeersIter {
*cnt += 1; *cnt += 1;
// If `num_results` successful results have been delivered for the // If `num_results` successful results have been delivered for the
// closest peers, the iterator is done. // closest peers, the iterator is done.
if *cnt >= self.config.num_results { if *cnt >= self.config.num_results.get() {
self.state = State::Finished; self.state = State::Finished;
return PeersIterState::Finished return PeersIterState::Finished
} }
@ -355,7 +357,7 @@ impl ClosestPeersIter {
self.state == State::Finished self.state == State::Finished
} }
/// Consumes the iterator, returning the target and the closest peers. /// Consumes the iterator, returning the closest peers.
pub fn into_result(self) -> impl Iterator<Item = PeerId> { pub fn into_result(self) -> impl Iterator<Item = PeerId> {
self.closest_peers self.closest_peers
.into_iter() .into_iter()
@ -366,7 +368,7 @@ impl ClosestPeersIter {
None None
} }
}) })
.take(self.config.num_results) .take(self.config.num_results.get())
} }
/// Checks if the iterator is at capacity w.r.t. the permitted parallelism. /// Checks if the iterator is at capacity w.r.t. the permitted parallelism.
@ -378,9 +380,9 @@ impl ClosestPeersIter {
fn at_capacity(&self) -> bool { fn at_capacity(&self) -> bool {
match self.state { match self.state {
State::Stalled => self.num_waiting >= usize::max( State::Stalled => self.num_waiting >= usize::max(
self.config.num_results, self.config.parallelism self.config.num_results.get(), self.config.parallelism.get()
), ),
State::Iterating { .. } => self.num_waiting >= self.config.parallelism, State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(),
State::Finished => true State::Finished => true
} }
} }
@ -487,8 +489,8 @@ mod tests {
.map(Key::from); .map(Key::from);
let target = Key::from(Into::<Multihash>::into(PeerId::random())); let target = Key::from(Into::<Multihash>::into(PeerId::random()));
let config = ClosestPeersIterConfig { let config = ClosestPeersIterConfig {
parallelism: g.gen_range(1, 10), parallelism: NonZeroUsize::new(g.gen_range(1, 10)).unwrap(),
num_results: g.gen_range(1, 25), num_results: NonZeroUsize::new(g.gen_range(1, 25)).unwrap(),
peer_timeout: Duration::from_secs(g.gen_range(10, 30)), peer_timeout: Duration::from_secs(g.gen_range(10, 30)),
}; };
ClosestPeersIter::with_config(config, target, known_closest_peers) ClosestPeersIter::with_config(config, target, known_closest_peers)
@ -545,7 +547,7 @@ mod tests {
.map(|e| e.key.clone()) .map(|e| e.key.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let num_known = expected.len(); let num_known = expected.len();
let max_parallelism = usize::min(iter.config.parallelism, num_known); let max_parallelism = usize::min(iter.config.parallelism.get(), num_known);
let target = iter.target.clone(); let target = iter.target.clone();
let mut remaining; let mut remaining;
@ -584,7 +586,7 @@ mod tests {
// peers or an error, thus finishing the "in-flight requests". // peers or an error, thus finishing the "in-flight requests".
for (i, k) in expected.iter().enumerate() { for (i, k) in expected.iter().enumerate() {
if rng.gen_bool(0.75) { if rng.gen_bool(0.75) {
let num_closer = rng.gen_range(0, iter.config.num_results + 1); let num_closer = rng.gen_range(0, iter.config.num_results.get() + 1);
let closer_peers = random_peers(num_closer, &mut rng); let closer_peers = random_peers(num_closer, &mut rng);
remaining.extend(closer_peers.iter().cloned().map(Key::from)); remaining.extend(closer_peers.iter().cloned().map(Key::from));
iter.on_success(k.preimage(), closer_peers); iter.on_success(k.preimage(), closer_peers);
@ -620,16 +622,16 @@ mod tests {
assert!(sorted(&target, &closest)); assert!(sorted(&target, &closest));
if closest.len() < num_results { if closest.len() < num_results.get() {
// The iterator returned fewer results than requested. Therefore // The iterator returned fewer results than requested. Therefore
// either the initial number of known peers must have been // either the initial number of known peers must have been
// less than the desired number of results, or there must // less than the desired number of results, or there must
// have been failures. // have been failures.
assert!(num_known < num_results || num_failures > 0); assert!(num_known < num_results.get() || num_failures > 0);
// All peers must have been contacted. // All peers must have been contacted.
assert!(all_contacted, "Not all peers have been contacted."); assert!(all_contacted, "Not all peers have been contacted.");
} else { } else {
assert_eq!(num_results, closest.len(), "Too many results."); assert_eq!(num_results.get(), closest.len(), "Too many results.");
} }
} }
@ -741,7 +743,7 @@ mod tests {
fn prop(mut iter: ClosestPeersIter) { fn prop(mut iter: ClosestPeersIter) {
iter.state = State::Stalled; iter.state = State::Stalled;
for i in 0..usize::max(iter.config.parallelism, iter.config.num_results) { for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) {
iter.num_waiting = i; iter.num_waiting = i;
assert!( assert!(
!iter.at_capacity(), !iter.at_capacity(),
@ -750,7 +752,10 @@ mod tests {
) )
} }
iter.num_waiting = usize::max(iter.config.parallelism, iter.config.num_results); iter.num_waiting = usize::max(
iter.config.parallelism.get(),
iter.config.num_results.get(),
);
assert!( assert!(
iter.at_capacity(), iter.at_capacity(),
"Iterator should be at capacity if `max(parallelism, num_results)` requests are \ "Iterator should be at capacity if `max(parallelism, num_results)` requests are \

View File

@ -0,0 +1,991 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use super::*;
use crate::kbucket::{Key, KeyBytes};
use libp2p_core::PeerId;
use std::{
collections::HashMap,
iter::{Cycle, Map, Peekable},
ops::{Add, Index, IndexMut, Range},
};
use wasm_timer::Instant;
/// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery
/// path per configured parallelism according to the S/Kademlia paper.
pub struct ClosestDisjointPeersIter {
config: ClosestPeersIterConfig,
target: KeyBytes,
/// The set of wrapped [`ClosestPeersIter`].
iters: Vec<ClosestPeersIter>,
/// Order in which to query the iterators ensuring fairness across
/// [`ClosestPeersIter::next`] calls.
iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
/// Mapping of contacted peers by their [`PeerId`] to [`PeerState`]
/// containing the corresponding iterator indices as well as the response
/// state.
///
/// Used to track which iterator contacted which peer. See [`PeerState`]
/// for details.
contacted_peers: HashMap<PeerId, PeerState>,
}
impl ClosestDisjointPeersIter {
/// Creates a new iterator with a default configuration.
pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
{
Self::with_config(
ClosestPeersIterConfig::default(),
target,
known_closest_peers,
)
}
/// Creates a new iterator with the given configuration.
pub fn with_config<I, T>(
config: ClosestPeersIterConfig,
target: T,
known_closest_peers: I,
) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
T: Into<KeyBytes> + Clone,
{
let peers = known_closest_peers.into_iter().take(K_VALUE.get()).collect::<Vec<_>>();
let iters = (0..config.parallelism.get())
// NOTE: All [`ClosestPeersIter`] share the same set of peers at
// initialization. The [`ClosestDisjointPeersIter.contacted_peers`]
// mapping ensures that a successful response from a peer is only
// ever passed to a single [`ClosestPeersIter`]. See
// [`ClosestDisjointPeersIter::on_success`] for details.
.map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
.collect::<Vec<_>>();
let iters_len = iters.len();
ClosestDisjointPeersIter {
config,
target: target.into(),
iters,
iter_order: (0..iters_len).map(IteratorIndex as fn(usize) -> IteratorIndex).cycle(),
contacted_peers: HashMap::new(),
}
}
/// Callback for informing the iterator about a failed request to a peer.
///
/// If the iterator is currently waiting for a result from `peer`,
/// the iterator state is updated and `true` is returned. In that
/// case, after calling this function, `next` should eventually be
/// called again to obtain the new state of the iterator.
///
/// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect and `false` is returned.
pub fn on_failure(&mut self, peer: &PeerId) -> bool {
let mut updated = false;
if let Some(PeerState{ initiated_by, response }) = self.contacted_peers.get_mut(peer) {
updated = self.iters[*initiated_by].on_failure(peer);
if updated {
*response = ResponseState::Failed;
}
for (i, iter) in &mut self.iters.iter_mut().enumerate() {
if i != (*initiated_by).into() {
// This iterator never triggered an actual request to the
// given peer - thus ignore the returned boolean.
iter.on_failure(peer);
}
}
}
updated
}
/// Callback for delivering the result of a successful request to a peer.
///
/// Delivering results of requests back to the iterator allows the iterator
/// to make progress. The iterator is said to make progress either when the
/// given `closer_peers` contain a peer closer to the target than any peer
/// seen so far, or when the iterator did not yet accumulate `num_results`
/// closest peers and `closer_peers` contains a new peer, regardless of its
/// distance to the target.
///
/// If the iterator is currently waiting for a result from `peer`,
/// the iterator state is updated and `true` is returned. In that
/// case, after calling this function, `next` should eventually be
/// called again to obtain the new state of the iterator.
///
/// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect and `false` is returned.
pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
where
I: IntoIterator<Item = PeerId>,
{
let mut updated = false;
if let Some(PeerState{ initiated_by, response }) = self.contacted_peers.get_mut(peer) {
// Pass the new `closer_peers` to the iterator that first yielded
// the peer.
updated = self.iters[*initiated_by].on_success(peer, closer_peers);
if updated {
// Mark the response as succeeded for future iterators yielding
// this peer. There is no need to keep the `closer_peers`
// around, given that they are only passed to the first
// iterator.
*response = ResponseState::Succeeded;
}
for (i, iter) in &mut self.iters.iter_mut().enumerate() {
if i != (*initiated_by).into() {
// Only report the success to all remaining not-first
// iterators. Do not pass the `closer_peers` in order to
// uphold the S/Kademlia disjoint paths guarantee.
//
// This iterator never triggered an actual request to the
// given peer - thus ignore the returned boolean.
iter.on_success(peer, std::iter::empty());
}
}
}
updated
}
pub fn is_waiting(&self, peer: &PeerId) -> bool {
self.iters.iter().any(|i| i.is_waiting(peer))
}
pub fn next(&mut self, now: Instant) -> PeersIterState {
let mut state = None;
// Ensure querying each iterator at most once.
for _ in 0 .. self.iters.len() {
let i = self.iter_order.next().expect("Cycle never ends.");
let iter = &mut self.iters[i];
loop {
match iter.next(now) {
PeersIterState::Waiting(None) => {
match state {
Some(PeersIterState::Waiting(Some(_))) => {
// [`ClosestDisjointPeersIter::next`] returns immediately once a
// [`ClosestPeersIter`] yielded a peer. Thus this state is
// unreachable.
unreachable!();
},
Some(PeersIterState::Waiting(None)) => {}
Some(PeersIterState::WaitingAtCapacity) => {
// At least one ClosestPeersIter is no longer at capacity, thus the
// composite ClosestDisjointPeersIter is no longer at capacity.
state = Some(PeersIterState::Waiting(None))
}
Some(PeersIterState::Finished) => {
// `state` is never set to `Finished`.
unreachable!();
}
None => state = Some(PeersIterState::Waiting(None)),
};
break;
}
PeersIterState::Waiting(Some(peer)) => {
match self.contacted_peers.get_mut(&*peer) {
Some(PeerState{ response, .. }) => {
// Another iterator already contacted this peer.
let peer = peer.into_owned();
match response {
// The iterator will be notified later whether the given node
// was successfully contacted or not. See
// [`ClosestDisjointPeersIter::on_success`] for details.
ResponseState::Waiting => {},
ResponseState::Succeeded => {
// Given that iterator was not the first to contact the peer
// it will not be made aware of the closer peers discovered
// to uphold the S/Kademlia disjoint paths guarantee. See
// [`ClosestDisjointPeersIter::on_success`] for details.
iter.on_success(&peer, std::iter::empty());
},
ResponseState::Failed => {
iter.on_failure(&peer);
},
}
},
None => {
// The iterator is the first to contact this peer.
self.contacted_peers.insert(
peer.clone().into_owned(),
PeerState::new(i),
);
return PeersIterState::Waiting(Some(Cow::Owned(peer.into_owned())));
},
}
}
PeersIterState::WaitingAtCapacity => {
match state {
Some(PeersIterState::Waiting(Some(_))) => {
// [`ClosestDisjointPeersIter::next`] returns immediately once a
// [`ClosestPeersIter`] yielded a peer. Thus this state is
// unreachable.
unreachable!();
},
Some(PeersIterState::Waiting(None)) => {}
Some(PeersIterState::WaitingAtCapacity) => {}
Some(PeersIterState::Finished) => {
// `state` is never set to `Finished`.
unreachable!();
},
None => state = Some(PeersIterState::WaitingAtCapacity),
};
break;
}
PeersIterState::Finished => break,
}
}
}
state.unwrap_or(PeersIterState::Finished)
}
/// Finishes all paths containing one of the given peers.
///
/// See [`crate::query::Query::try_finish`] for details.
pub fn finish_paths<'a, I>(&mut self, peers: I) -> bool
where
I: IntoIterator<Item = &'a PeerId>
{
for peer in peers {
if let Some(PeerState{ initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
self.iters[*initiated_by].finish();
}
}
self.is_finished()
}
/// Immediately transitions the iterator to [`PeersIterState::Finished`].
pub fn finish(&mut self) {
for iter in &mut self.iters {
iter.finish();
}
}
/// Checks whether the iterator has finished.
pub fn is_finished(&self) -> bool {
self.iters.iter().all(|i| i.is_finished())
}
/// Note: In the case of no adversarial peers or connectivity issues along
/// any path, all paths return the same result, deduplicated through
/// the `ResultIter`, thus overall `into_result` returns
/// `num_results`. In the case of adversarial peers or connectivity
/// issues `ClosestDisjointPeersIter` tries to return the
/// `num_results` closest benign peers, but as it can not
/// differentiate benign from faulty paths it as well returns faulty
/// peers and thus overall returns more than `num_results` peers.
pub fn into_result(self) -> impl Iterator<Item = PeerId> {
let result_per_path= self.iters.into_iter()
.map(|iter| iter.into_result().map(Key::from));
ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
}
}
/// Index into the [`ClosestDisjointPeersIter`] `iters` vector.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct IteratorIndex(usize);
impl From<usize> for IteratorIndex {
fn from(i: usize) -> Self {
IteratorIndex(i)
}
}
impl From<IteratorIndex> for usize {
fn from(i: IteratorIndex) -> Self {
i.0
}
}
impl Add<usize> for IteratorIndex {
type Output = Self;
fn add(self, rhs: usize) -> Self::Output {
(self.0 + rhs).into()
}
}
impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
type Output = ClosestPeersIter;
fn index(&self, index: IteratorIndex) -> &Self::Output {
&self[index.0]
}
}
impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
&mut self[index.0]
}
}
/// State tracking the iterator that yielded (i.e. tried to contact) a peer. See
/// [`ClosestDisjointPeersIter::on_success`] for details.
#[derive(Debug, PartialEq, Eq)]
struct PeerState {
/// First iterator to yield the peer. Will be notified both of the outcome
/// (success/failure) as well as the closer peers.
initiated_by: IteratorIndex,
/// Keeping track of the response state. In case other iterators later on
/// yield the same peer, they can be notified of the response outcome.
response: ResponseState,
}
impl PeerState {
fn new(initiated_by: IteratorIndex) -> Self {
PeerState {
initiated_by,
response: ResponseState::Waiting,
}
}
}
#[derive(Debug, PartialEq, Eq)]
enum ResponseState {
Waiting,
Succeeded,
Failed,
}
/// Iterator combining the result of multiple [`ClosestPeersIter`] into a single
/// deduplicated ordered iterator.
//
// Note: This operates under the assumption that `I` is ordered.
#[derive(Clone, Debug)]
struct ResultIter<I> where
I: Iterator<Item = Key<PeerId>>,
{
target: KeyBytes,
iters: Vec<Peekable<I>>,
}
impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
ResultIter{
target,
iters: iters.map(Iterator::peekable).collect(),
}
}
}
impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
let target = &self.target;
self.iters.iter_mut()
// Find the iterator with the next closest peer.
.fold(
Option::<&mut Peekable<_>>::None,
|iter_a, iter_b| {
let iter_a = match iter_a {
Some(iter_a) => iter_a,
None => return Some(iter_b),
};
match (iter_a.peek(), iter_b.peek()) {
(Some(next_a), Some(next_b)) => {
if next_a == next_b {
// Remove from one for deduplication.
iter_b.next();
return Some(iter_a)
}
if target.distance(next_a) < target.distance(next_b) {
Some(iter_a)
} else {
Some(iter_b)
}
},
(Some(_), None) => Some(iter_a),
(None, Some(_)) => Some(iter_b),
(None, None) => None,
}
},
)
// Pop off the next closest peer from that iterator.
.and_then(Iterator::next)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::K_VALUE;
use quickcheck::*;
use rand::{Rng, seq::SliceRandom};
use std::collections::HashSet;
use std::iter;
impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let target = Target::arbitrary(g).0;
let num_closest_iters = g.gen_range(0, 20 + 1);
let peers = random_peers(
g.gen_range(0, 20 * num_closest_iters + 1),
g,
);
let iters: Vec<_> = (0..num_closest_iters)
.map(|_| {
let num_peers = g.gen_range(0, 20 + 1);
let mut peers = peers.choose_multiple(g, num_peers)
.cloned()
.map(Key::from)
.collect::<Vec<_>>();
peers.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
peers.into_iter()
})
.collect();
ResultIter::new(target, iters.into_iter())
}
fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
let peers = self.iters
.clone()
.into_iter()
.flatten()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let iters = self.iters.clone()
.into_iter()
.map(|iter| iter.collect::<Vec<_>>())
.collect();
Box::new(ResultIterShrinker {
target: self.target.clone(),
peers,
iters,
})
}
}
struct ResultIterShrinker {
target: KeyBytes,
peers: Vec<Key<PeerId>>,
iters: Vec<Vec<Key<PeerId>>>,
}
impl Iterator for ResultIterShrinker {
type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
/// Return an iterator of [`ResultIter`]s with each of them missing a
/// different peer from the original set.
fn next(&mut self) -> Option<Self::Item> {
// The peer that should not be included.
let peer = self.peers.pop()?;
let iters = self.iters.clone().into_iter()
.filter_map(|mut iter| {
iter.retain(|p| p != &peer);
if iter.is_empty() {
return None;
}
Some(iter.into_iter())
}).collect::<Vec<_>>();
Some(ResultIter::new(self.target.clone(), iters.into_iter()))
}
}
#[derive(Clone, Debug)]
struct Target(KeyBytes);
impl Arbitrary for Target {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
Target(Key::from(random_peers(1, g).pop().unwrap()).into())
}
}
fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
(0 .. n).map(|_| PeerId::from_multihash(
multihash::wrap(multihash::Code::Sha2_256, &g.gen::<[u8; 32]>())
).unwrap()).collect()
}
#[test]
fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
let expected = {
let mut deduplicated = result_iter.clone()
.iters
.into_iter()
.flatten()
.collect::<HashSet<_>>()
.into_iter()
.map(Key::from)
.collect::<Vec<_>>();
deduplicated.sort_unstable_by(|a, b| {
result_iter.target.distance(a).cmp(&result_iter.target.distance(b))
});
deduplicated
};
assert_eq!(expected, result_iter.collect::<Vec<_>>());
}
QuickCheck::new().quickcheck(prop as fn(_))
}
#[derive(Debug, Clone)]
struct Parallelism(NonZeroUsize);
impl Arbitrary for Parallelism{
fn arbitrary<G: Gen>(g: &mut G) -> Self {
Parallelism(NonZeroUsize::new(g.gen_range(1, 10)).unwrap())
}
}
#[derive(Debug, Clone)]
struct NumResults(NonZeroUsize);
impl Arbitrary for NumResults{
fn arbitrary<G: Gen>(g: &mut G) -> Self {
NumResults(NonZeroUsize::new(g.gen_range(1, K_VALUE.get())).unwrap())
}
}
impl Arbitrary for ClosestPeersIterConfig {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
ClosestPeersIterConfig {
parallelism: Parallelism::arbitrary(g).0,
num_results: NumResults::arbitrary(g).0,
peer_timeout: Duration::from_secs(1),
}
}
}
#[derive(Debug, Clone)]
struct PeerVec(pub Vec<Key<PeerId>>);
impl Arbitrary for PeerVec {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
PeerVec(
(0..g.gen_range(1, 60))
.map(|_| PeerId::random())
.map(Key::from)
.collect(),
)
}
}
#[test]
fn s_kademlia_disjoint_paths() {
let now = Instant::now();
let target: KeyBytes = Key::from(PeerId::random()).into();
let mut pool = [0; 12].iter()
.map(|_| Key::from(PeerId::random()))
.collect::<Vec<_>>();
pool.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
let known_closest_peers = pool.split_off(pool.len() - 3);
let config = ClosestPeersIterConfig {
parallelism: NonZeroUsize::new(3).unwrap(),
num_results: NonZeroUsize::new(3).unwrap(),
..ClosestPeersIterConfig::default()
};
let mut peers_iter = ClosestDisjointPeersIter::with_config(
config.clone(),
target,
known_closest_peers.clone(),
);
////////////////////////////////////////////////////////////////////////
// First round.
for _ in 0..3 {
if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
assert!(known_closest_peers.contains(&Key::from(peer)));
} else {
panic!("Expected iterator to return peer to query.");
}
}
assert_eq!(
PeersIterState::WaitingAtCapacity,
peers_iter.next(now),
);
let response_2 = pool.split_off(pool.len() - 3);
let response_3 = pool.split_off(pool.len() - 3);
// Keys are closer than any of the previous two responses from honest
// node 1 and 2.
let malicious_response_1 = pool.split_off(pool.len() - 3);
// Response from malicious peer 1.
peers_iter.on_success(
known_closest_peers[0].preimage(),
malicious_response_1.clone().into_iter().map(|k| k.preimage().clone()),
);
// Response from peer 2.
peers_iter.on_success(
known_closest_peers[1].preimage(),
response_2.clone().into_iter().map(|k| k.preimage().clone()),
);
// Response from peer 3.
peers_iter.on_success(
known_closest_peers[2].preimage(),
response_3.clone().into_iter().map(|k| k.preimage().clone()),
);
////////////////////////////////////////////////////////////////////////
// Second round.
let mut next_to_query = vec![];
for _ in 0..3 {
if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
next_to_query.push(peer)
} else {
panic!("Expected iterator to return peer to query.");
}
};
// Expect a peer from each disjoint path.
assert!(next_to_query.contains(malicious_response_1[0].preimage()));
assert!(next_to_query.contains(response_2[0].preimage()));
assert!(next_to_query.contains(response_3[0].preimage()));
for peer in next_to_query {
peers_iter.on_success(&peer, vec![]);
}
// Mark all remaining peers as succeeded.
for _ in 0..6 {
if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
peers_iter.on_success(&peer, vec![]);
} else {
panic!("Expected iterator to return peer to query.");
}
}
assert_eq!(
PeersIterState::Finished,
peers_iter.next(now),
);
let final_peers: Vec<_> = peers_iter.into_result().collect();
// Expect final result to contain peer from each disjoint path, even
// though not all are among the best ones.
assert!(final_peers.contains(malicious_response_1[0].preimage()));
assert!(final_peers.contains(response_2[0].preimage()));
assert!(final_peers.contains(response_3[0].preimage()));
}
#[derive(Clone)]
struct Graph(HashMap<PeerId, Peer>);
impl std::fmt::Debug for Graph {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_list().entries(self.0.iter().map(|(id, _)| id)).finish()
}
}
impl Arbitrary for Graph {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let mut peer_ids = random_peers(g.gen_range(K_VALUE.get(), 200), g)
.into_iter()
.map(|peer_id| (peer_id.clone(), Key::from(peer_id)))
.collect::<Vec<_>>();
// Make each peer aware of its direct neighborhood.
let mut peers = peer_ids.clone().into_iter()
.map(|(peer_id, key)| {
peer_ids.sort_unstable_by(|(_, a), (_, b)| {
key.distance(a).cmp(&key.distance(b))
});
assert_eq!(peer_id, peer_ids[0].0);
let known_peers = peer_ids.iter()
// Skip itself.
.skip(1)
.take(K_VALUE.get())
.cloned()
.collect::<Vec<_>>();
(peer_id, Peer{ known_peers })
})
.collect::<HashMap<_, _>>();
// Make each peer aware of a random set of other peers within the graph.
for (peer_id, peer) in peers.iter_mut() {
peer_ids.shuffle(g);
let num_peers = g.gen_range(K_VALUE.get(), peer_ids.len() + 1);
let mut random_peer_ids = peer_ids.choose_multiple(g, num_peers)
// Make sure not to include itself.
.filter(|(id, _)| peer_id != id)
.cloned()
.collect::<Vec<_>>();
peer.known_peers.append(&mut random_peer_ids);
peer.known_peers = std::mem::replace(&mut peer.known_peers, vec![])
// Deduplicate peer ids.
.into_iter().collect::<HashSet<_>>().into_iter().collect();
}
Graph(peers)
}
}
impl Graph {
fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
self.0.iter()
.map(|(peer_id, _)| (target.distance(&Key::new(peer_id.clone())), peer_id))
.fold(None, |acc, (distance_b, peer_id_b)| {
match acc {
None => Some((distance_b, peer_id_b)),
Some((distance_a, peer_id_a)) => if distance_a < distance_b {
Some((distance_a, peer_id_a))
} else {
Some((distance_b, peer_id_b))
}
}
})
.expect("Graph to have at least one peer.")
.1.clone()
}
}
#[derive(Debug, Clone)]
struct Peer {
known_peers: Vec<(PeerId, Key<PeerId>)>,
}
impl Peer {
fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
self.known_peers.sort_unstable_by(|(_, a), (_, b)| {
target.distance(a).cmp(&target.distance(b))
});
self.known_peers.iter().take(K_VALUE.get()).map(|(id, _)| id).cloned().collect()
}
}
enum PeerIterator {
Disjoint(ClosestDisjointPeersIter),
Closest(ClosestPeersIter),
}
impl PeerIterator {
fn next(&mut self, now: Instant) -> PeersIterState {
match self {
PeerIterator::Disjoint(iter) => iter.next(now),
PeerIterator::Closest(iter) => iter.next(now),
}
}
fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
match self {
PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
};
}
fn into_result(self) -> Vec<PeerId> {
match self {
PeerIterator::Disjoint(iter) => iter.into_result().collect(),
PeerIterator::Closest(iter) => iter.into_result().collect(),
}
}
}
/// Ensure [`ClosestPeersIter`] and [`ClosestDisjointPeersIter`] yield same closest peers.
#[test]
fn closest_and_disjoint_closest_yield_same_result() {
fn prop(
target: Target,
graph: Graph,
parallelism: Parallelism,
num_results: NumResults,
) -> TestResult {
if parallelism.0 > num_results.0 {
return TestResult::discard();
}
let target: KeyBytes = target.0;
let closest_peer = graph.get_closest_peer(&target);
let mut known_closest_peers = graph.0.iter()
.take(K_VALUE.get())
.map(|(key, _peers)| Key::new(key.clone()))
.collect::<Vec<_>>();
known_closest_peers.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
let cfg = ClosestPeersIterConfig{
parallelism: parallelism.0,
num_results: num_results.0,
..ClosestPeersIterConfig::default()
};
let closest = drive_to_finish(
PeerIterator::Closest(ClosestPeersIter::with_config(
cfg.clone(),
target.clone(),
known_closest_peers.clone(),
)),
graph.clone(),
&target,
);
let disjoint = drive_to_finish(
PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
cfg,
target.clone(),
known_closest_peers.clone(),
)),
graph.clone(),
&target,
);
assert!(
closest.contains(&closest_peer),
"Expected `ClosestPeersIter` to find closest peer.",
);
assert!(
disjoint.contains(&closest_peer),
"Expected `ClosestDisjointPeersIter` to find closest peer.",
);
assert!(
closest.len() == num_results.0.get(),
"Expected `ClosestPeersIter` to find `num_results` closest \
peers."
);
assert!(
disjoint.len() >= num_results.0.get(),
"Expected `ClosestDisjointPeersIter` to find at least \
`num_results` closest peers."
);
if closest.len() > disjoint.len() {
let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
panic!(
"Expected `ClosestDisjointPeersIter` to find all peers \
found by `ClosestPeersIter`, but it did not find {:?}.",
closest_only,
);
};
TestResult::passed()
}
fn drive_to_finish(
mut iter: PeerIterator,
mut graph: Graph,
target: &KeyBytes,
) -> HashSet<PeerId> {
let now = Instant::now();
loop {
match iter.next(now) {
PeersIterState::Waiting(Some(peer_id)) => {
let peer_id = peer_id.clone().into_owned();
let closest_peers = graph.0.get_mut(&peer_id)
.unwrap()
.get_closest_peers(&target);
iter.on_success(&peer_id, closest_peers);
} ,
PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) =>
panic!("There is never more than one request in flight."),
PeersIterState::Finished => break,
}
}
let mut result = iter.into_result().into_iter().map(Key::new).collect::<Vec<_>>();
result.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
result.into_iter().map(|k| k.into_preimage()).collect()
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_, _, _, _) -> _)
}
#[test]
fn failure_can_not_overwrite_previous_success() {
let now = Instant::now();
let peer = PeerId::random();
let mut iter = ClosestDisjointPeersIter::new(
Key::from(PeerId::random()).into(),
iter::once(Key::from(peer.clone())),
);
assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
// Expect peer to be marked as succeeded.
assert!(iter.on_success(&peer, iter::empty()));
assert_eq!(iter.contacted_peers.get(&peer), Some(&PeerState {
initiated_by: IteratorIndex(0),
response: ResponseState::Succeeded,
}));
// Expect peer to stay marked as succeeded.
assert!(!iter.on_failure(&peer));
assert_eq!(iter.contacted_peers.get(&peer), Some(&PeerState {
initiated_by: IteratorIndex(0),
response: ResponseState::Succeeded,
}));
}
}

View File

@ -22,12 +22,12 @@ use super::*;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use libp2p_core::PeerId; use libp2p_core::PeerId;
use std::{vec, collections::hash_map::Entry}; use std::{vec, collections::hash_map::Entry, num::NonZeroUsize};
/// A peer iterator for a fixed set of peers. /// A peer iterator for a fixed set of peers.
pub struct FixedPeersIter { pub struct FixedPeersIter {
/// Ther permitted parallelism, i.e. number of pending results. /// Ther permitted parallelism, i.e. number of pending results.
parallelism: usize, parallelism: NonZeroUsize,
/// The state of peers emitted by the iterator. /// The state of peers emitted by the iterator.
peers: FnvHashMap<PeerId, PeerState>, peers: FnvHashMap<PeerId, PeerState>,
@ -58,7 +58,7 @@ enum PeerState {
} }
impl FixedPeersIter { impl FixedPeersIter {
pub fn new<I>(peers: I, parallelism: usize) -> Self pub fn new<I>(peers: I, parallelism: NonZeroUsize) -> Self
where where
I: IntoIterator<Item = PeerId> I: IntoIterator<Item = PeerId>
{ {
@ -133,7 +133,7 @@ impl FixedPeersIter {
match &mut self.state { match &mut self.state {
State::Finished => return PeersIterState::Finished, State::Finished => return PeersIterState::Finished,
State::Waiting { num_waiting } => { State::Waiting { num_waiting } => {
if *num_waiting >= self.parallelism { if *num_waiting >= self.parallelism.get() {
return PeersIterState::WaitingAtCapacity return PeersIterState::WaitingAtCapacity
} }
loop { loop {
@ -175,7 +175,10 @@ mod test {
#[test] #[test]
fn decrease_num_waiting_on_failure() { fn decrease_num_waiting_on_failure() {
let mut iter = FixedPeersIter::new(vec![PeerId::random(), PeerId::random()], 1); let mut iter = FixedPeersIter::new(
vec![PeerId::random(), PeerId::random()],
NonZeroUsize::new(1).unwrap(),
);
match iter.next() { match iter.next() {
PeersIterState::Waiting(Some(peer)) => { PeersIterState::Waiting(Some(peer)) => {