diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 014418d3..71773c00 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -33,13 +33,14 @@ use async_std::{io, task}; use futures::prelude::*; use libp2p::kad::record::store::MemoryStore; use libp2p::kad::{ - record::Key, Kademlia, KademliaEvent, + PeerRecord, PutRecordOk, QueryResult, Quorum, - Record + Record, + record::Key, }; use libp2p::{ NetworkBehaviour, @@ -86,7 +87,7 @@ fn main() -> Result<(), Box> { match message { KademliaEvent::QueryResult { result, .. } => match result { QueryResult::GetRecord(Ok(ok)) => { - for Record { key, value, .. } in ok.records { + for PeerRecord { record: Record { key, value, .. }, ..} in ok.records { println!( "Got record {:?} {:?}", std::str::from_utf8(key.as_ref()).unwrap(), diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 4225975f..54aac662 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -30,6 +30,7 @@ unsigned-varint = { version = "0.3", features = ["futures-codec"] } void = "1.0" [dev-dependencies] +futures-timer = "3.0" libp2p-secio = { path = "../secio" } libp2p-yamux = { path = "../../muxers/yamux" } quickcheck = "0.9.0" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 6bfcdba7..57811434 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -127,8 +127,9 @@ impl Default for KademliaConfig { impl KademliaConfig { /// Sets a custom protocol name. /// - /// Kademlia nodes only communicate with other nodes using the same protocol name. Using a - /// custom name therefore allows to segregate the DHT from others, if that is desired. + /// Kademlia nodes only communicate with other nodes using the same protocol + /// name. Using a custom name therefore allows to segregate the DHT from + /// others, if that is desired. pub fn set_protocol_name(&mut self, name: impl Into>) -> &mut Self { self.protocol_config.set_protocol_name(name); self @@ -154,10 +155,41 @@ impl KademliaConfig { self } + /// Sets the allowed level of parallelism for iterative queries. + /// + /// The `α` parameter in the Kademlia paper. The maximum number of peers + /// that an iterative query is allowed to wait for in parallel while + /// iterating towards the closest nodes to a target. Defaults to + /// `ALPHA_VALUE`. + /// + /// This only controls the level of parallelism of an iterative query, not + /// the level of parallelism of a query to a fixed set of peers. + /// + /// When used with [`KademliaConfig::disjoint_query_paths`] it equals + /// the amount of disjoint paths used. + pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self { + self.query_config.parallelism = parallelism; + self + } + + /// Require iterative queries to use disjoint paths for increased resiliency + /// in the presence of potentially adversarial nodes. + /// + /// When enabled the number of disjoint paths used equals the configured + /// parallelism. + /// + /// See the S/Kademlia paper for more information on the high level design + /// as well as its security improvements. + pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self { + self.query_config.disjoint_query_paths = enabled; + self + } + /// Sets the TTL for stored records. /// /// The TTL should be significantly longer than the (re-)publication - /// interval, to avoid premature expiration of records. The default is 36 hours. + /// interval, to avoid premature expiration of records. The default is 36 + /// hours. /// /// `None` means records never expire. /// @@ -191,10 +223,10 @@ impl KademliaConfig { /// Sets the (re-)publication interval of stored records. /// - /// Records persist in the DHT until they expire. By default, published records - /// are re-published in regular intervals for as long as the record exists - /// in the local storage of the original publisher, thereby extending the - /// records lifetime. + /// Records persist in the DHT until they expire. By default, published + /// records are re-published in regular intervals for as long as the record + /// exists in the local storage of the original publisher, thereby extending + /// the records lifetime. /// /// This interval should be significantly shorter than the record TTL, to /// ensure records do not expire prematurely. The default is 24 hours. @@ -220,7 +252,8 @@ impl KademliaConfig { /// Sets the interval at which provider records for keys provided /// by the local node are re-published. /// - /// `None` means that stored provider records are never automatically re-published. + /// `None` means that stored provider records are never automatically + /// re-published. /// /// Must be significantly less than the provider record TTL. pub fn set_provider_publication_interval(&mut self, interval: Option) -> &mut Self { @@ -236,7 +269,8 @@ impl KademliaConfig { /// Modifies the maximum allowed size of individual Kademlia packets. /// - /// It might be necessary to increase this value if trying to put large records. + /// It might be necessary to increase this value if trying to put large + /// records. pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self { self.protocol_config.set_max_packet_size(size); self @@ -247,7 +281,7 @@ impl Kademlia where for<'a> TStore: RecordStore<'a> { - /// Creates a new `Kademlia` network behaviour with the given configuration. + /// Creates a new `Kademlia` network behaviour with a default configuration. pub fn new(id: PeerId, store: TStore) -> Self { Self::with_config(id, store, Default::default()) } @@ -430,7 +464,7 @@ where if record.is_expired(Instant::now()) { self.store.remove(key) } else { - records.push(record.into_owned()); + records.push(PeerRecord{ peer: None, record: record.into_owned()}); } } @@ -892,7 +926,7 @@ where if let Some(cache_key) = cache_at { // Cache the record at the closest node to the key that // did not return the record. - let record = records.first().expect("[not empty]").clone(); + let record = records.first().expect("[not empty]").record.clone(); let quorum = NonZeroUsize::new(1).expect("1 > 0"); let context = PutRecordContext::Cache; let info = QueryInfo::PutRecord { @@ -900,7 +934,7 @@ where record, quorum, phase: PutRecordPhase::PutRecord { - num_results: 0, + success: vec![], get_closest_peers_stats: QueryStats::empty() } }; @@ -934,7 +968,7 @@ where record, quorum, phase: PutRecordPhase::PutRecord { - num_results: 0, + success: vec![], get_closest_peers_stats: result.stats } }; @@ -947,13 +981,13 @@ where context, record, quorum, - phase: PutRecordPhase::PutRecord { num_results, get_closest_peers_stats } + phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats } } => { let mk_result = |key: record::Key| { - if num_results >= quorum.get() { + if success.len() >= quorum.get() { Ok(PutRecordOk { key }) } else { - Err(PutRecordError::QuorumFailed { key, quorum, num_results }) + Err(PutRecordError::QuorumFailed { key, quorum, success }) } }; match context { @@ -1050,9 +1084,9 @@ where let err = Err(PutRecordError::Timeout { key: record.key, quorum, - num_results: match phase { - PutRecordPhase::GetClosestPeers => 0, - PutRecordPhase::PutRecord { num_results, .. } => num_results + success: match phase { + PutRecordPhase::GetClosestPeers => vec![], + PutRecordPhase::PutRecord { ref success, .. } => success.clone(), } }); match context { @@ -1098,7 +1132,7 @@ where id: query_id, stats: result.stats, result: QueryResult::GetRecord(Err( - GetRecordError::Timeout { key, records, quorum } + GetRecordError::Timeout { key, records, quorum }, )) }), @@ -1475,9 +1509,24 @@ where key, records, quorum, cache_at } = &mut query.inner.info { if let Some(record) = record { - records.push(record); - if records.len() >= quorum.get() { - query.finish() + records.push(PeerRecord{ peer: Some(source.clone()), record }); + + let quorum = quorum.get(); + if records.len() >= quorum { + // Desired quorum reached. The query may finish. See + // [`Query::try_finish`] for details. + let peers = records.iter() + .filter_map(|PeerRecord{ peer, .. }| peer.as_ref()) + .cloned() + .collect::>(); + let finished = query.try_finish(peers.iter()); + if !finished { + debug!( + "GetRecord query ({:?}) reached quorum ({}/{}) with \ + response from peer {} but could not yet finish.", + user_data, peers.len(), quorum, source, + ); + } } } else if quorum.get() == 1 { // It is a "standard" Kademlia query, for which the @@ -1513,11 +1562,21 @@ where if let Some(query) = self.queries.get_mut(&user_data) { query.on_success(&source, vec![]); if let QueryInfo::PutRecord { - phase: PutRecordPhase::PutRecord { num_results, .. }, quorum, .. + phase: PutRecordPhase::PutRecord { success, .. }, quorum, .. } = &mut query.inner.info { - *num_results += 1; - if *num_results >= quorum.get() { - query.finish() + success.push(source.clone()); + + let quorum = quorum.get(); + if success.len() >= quorum { + let peers = success.clone(); + let finished = query.try_finish(peers.iter()); + if !finished { + debug!( + "PutRecord query ({:?}) reached quorum ({}/{}) with response \ + from peer {} but could not yet finish.", + user_data, peers.len(), quorum, source, + ); + } } } } @@ -1659,6 +1718,16 @@ impl Quorum { } } +/// A record either received by the given peer or retrieved from the local +/// record store. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PeerRecord { + /// The peer from whom the record was received. `None` if the record was + /// retrieved from local storage. + pub peer: Option, + pub record: Record, +} + ////////////////////////////////////////////////////////////////////////////// // Events @@ -1742,7 +1811,7 @@ pub type GetRecordResult = Result; /// The successful result of [`Kademlia::get_record`]. #[derive(Debug, Clone)] pub struct GetRecordOk { - pub records: Vec + pub records: Vec } /// The error result of [`Kademlia::get_record`]. @@ -1754,12 +1823,12 @@ pub enum GetRecordError { }, QuorumFailed { key: record::Key, - records: Vec, + records: Vec, quorum: NonZeroUsize }, Timeout { key: record::Key, - records: Vec, + records: Vec, quorum: NonZeroUsize } } @@ -1799,12 +1868,14 @@ pub struct PutRecordOk { pub enum PutRecordError { QuorumFailed { key: record::Key, - num_results: usize, + /// [`PeerId`]s of the peers the record was successfully stored on. + success: Vec, quorum: NonZeroUsize }, Timeout { key: record::Key, - num_results: usize, + /// [`PeerId`]s of the peers the record was successfully stored on. + success: Vec, quorum: NonZeroUsize }, } @@ -2061,8 +2132,9 @@ pub enum QueryInfo { GetRecord { /// The key to look for. key: record::Key, - /// The records found so far. - records: Vec, + /// The records with the id of the peer that returned them. `None` when + /// the record was found in the local store. + records: Vec, /// The number of records to look for. quorum: NonZeroUsize, /// The closest peer to `key` that did not return a record. @@ -2150,8 +2222,8 @@ pub enum PutRecordPhase { /// The query is replicating the record to the closest nodes to the key. PutRecord { - /// The number of successful replication requests so far. - num_results: usize, + /// A list of peers the given record has been successfully replicated to. + success: Vec, /// Query statistics from the finished `GetClosestPeers` phase. get_closest_peers_stats: QueryStats, }, diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 6e64f67a..6b5358c3 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -22,14 +22,15 @@ use super::*; -use crate::{ALPHA_VALUE, K_VALUE}; +use crate::K_VALUE; use crate::kbucket::Distance; -use crate::record::store::MemoryStore; +use crate::record::{Key, store::MemoryStore}; use futures::{ prelude::*, executor::block_on, future::poll_fn, }; +use futures_timer::Delay; use libp2p_core::{ PeerId, Transport, @@ -43,8 +44,8 @@ use libp2p_secio::SecioConfig; use libp2p_swarm::Swarm; use libp2p_yamux as yamux; use quickcheck::*; -use rand::{Rng, random, thread_rng}; -use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64}; +use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng}; +use std::{collections::{HashSet, HashMap}, time::Duration, io, num::NonZeroUsize, u64}; use multihash::{wrap, Code, Multihash}; type TestSwarm = Swarm>; @@ -132,21 +133,45 @@ fn random_multihash() -> Multihash { wrap(Code::Sha2_256, &thread_rng().gen::<[u8; 32]>()) } +#[derive(Clone, Debug)] +struct Seed([u8; 32]); + +impl Arbitrary for Seed { + fn arbitrary(g: &mut G) -> Seed { + Seed(g.gen()) + } +} + #[test] fn bootstrap() { - fn run(rng: &mut impl Rng) { - let num_total = rng.gen_range(2, 20); - // When looking for the closest node to a key, Kademlia considers ALPHA_VALUE nodes to query - // at initialization. If `num_groups` is larger than ALPHA_VALUE the remaining locally known - // nodes will not be considered. Given that no other node is aware of them, they would be - // lost entirely. To prevent the above restrict `num_groups` to be equal or smaller than - // ALPHA_VALUE. - let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2); + fn prop(seed: Seed) { + let mut rng = StdRng::from_seed(seed.0); - let mut swarms = build_connected_nodes(num_total, num_group).into_iter() + let num_total = rng.gen_range(2, 20); + // When looking for the closest node to a key, Kademlia considers + // K_VALUE nodes to query at initialization. If `num_group` is larger + // than K_VALUE the remaining locally known nodes will not be + // considered. Given that no other node is aware of them, they would be + // lost entirely. To prevent the above restrict `num_group` to be equal + // or smaller than K_VALUE. + let num_group = rng.gen_range(1, (num_total % K_VALUE.get()) + 2); + + let mut cfg = KademliaConfig::default(); + if rng.gen() { + cfg.disjoint_query_paths(true); + } + + let mut swarms = build_connected_nodes_with_config( + num_total, + num_group, + cfg, + ).into_iter() .map(|(_a, s)| s) .collect::>(); - let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); + let swarm_ids: Vec<_> = swarms.iter() + .map(Swarm::local_peer_id) + .cloned() + .collect(); let qid = swarms[0].bootstrap().unwrap(); @@ -190,10 +215,7 @@ fn bootstrap() { ) } - let mut rng = thread_rng(); - for _ in 0 .. 10 { - run(&mut rng) - } + QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _) } #[test] @@ -415,16 +437,22 @@ fn get_record_not_found() { ) } -/// A node joining a fully connected network via a single bootnode should be able to put a record to -/// the X closest nodes of the network where X is equal to the configured replication factor. +/// A node joining a fully connected network via three (ALPHA_VALUE) bootnodes +/// should be able to put a record to the X closest nodes of the network where X +/// is equal to the configured replication factor. #[test] fn put_record() { - fn prop(replication_factor: usize, records: Vec) { - let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); - let num_total = replication_factor.get() * 2; + fn prop(records: Vec, seed: Seed) { + let mut rng = StdRng::from_seed(seed.0); + let replication_factor = NonZeroUsize::new(rng.gen_range(1, (K_VALUE.get() / 2) + 1)).unwrap(); + // At least 4 nodes, 1 under test + 3 bootnodes. + let num_total = usize::max(4, replication_factor.get() * 2); let mut config = KademliaConfig::default(); config.set_replication_factor(replication_factor); + if rng.gen() { + config.disjoint_query_paths(true); + } let mut swarms = { let mut fully_connected_swarms = build_fully_connected_nodes_with_config( @@ -433,10 +461,13 @@ fn put_record() { ); let mut single_swarm = build_node_with_config(config); - single_swarm.1.add_address( - Swarm::local_peer_id(&fully_connected_swarms[0].1), - fully_connected_swarms[0].0.clone(), - ); + // Connect `single_swarm` to three bootnodes. + for i in 0..3 { + single_swarm.1.add_address( + Swarm::local_peer_id(&fully_connected_swarms[i].1), + fully_connected_swarms[i].0.clone(), + ); + } let mut swarms = vec![single_swarm]; swarms.append(&mut fully_connected_swarms); @@ -618,11 +649,13 @@ fn get_record() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::QueryResult { - id, result: QueryResult::GetRecord(Ok(ok)), .. + id, + result: QueryResult::GetRecord(Ok(GetRecordOk { records })), + .. })) => { assert_eq!(id, qid); - assert_eq!(ok.records.len(), 1); - assert_eq!(ok.records.first(), Some(&record)); + assert_eq!(records.len(), 1); + assert_eq!(records.first().unwrap().record, record); return Poll::Ready(()); } // Ignore any other event. @@ -662,11 +695,13 @@ fn get_record_many() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::QueryResult { - id, result: QueryResult::GetRecord(Ok(ok)), .. + id, + result: QueryResult::GetRecord(Ok(GetRecordOk { records })), + .. })) => { assert_eq!(id, qid); - assert_eq!(ok.records.len(), num_results); - assert_eq!(ok.records.first(), Some(&record)); + assert_eq!(records.len(), num_results); + assert_eq!(records.first().unwrap().record, record); return Poll::Ready(()); } // Ignore any other event. @@ -681,17 +716,22 @@ fn get_record_many() { ) } -/// A node joining a fully connected network via a single bootnode should be able to add itself as a -/// provider to the X closest nodes of the network where X is equal to the configured replication -/// factor. +/// A node joining a fully connected network via three (ALPHA_VALUE) bootnodes +/// should be able to add itself as a provider to the X closest nodes of the +/// network where X is equal to the configured replication factor. #[test] fn add_provider() { - fn prop(replication_factor: usize, keys: Vec) { - let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); - let num_total = replication_factor.get() * 2; + fn prop(keys: Vec, seed: Seed) { + let mut rng = StdRng::from_seed(seed.0); + let replication_factor = NonZeroUsize::new(rng.gen_range(1, (K_VALUE.get() / 2) + 1)).unwrap(); + // At least 4 nodes, 1 under test + 3 bootnodes. + let num_total = usize::max(4, replication_factor.get() * 2); let mut config = KademliaConfig::default(); config.set_replication_factor(replication_factor); + if rng.gen() { + config.disjoint_query_paths(true); + } let mut swarms = { let mut fully_connected_swarms = build_fully_connected_nodes_with_config( @@ -700,10 +740,13 @@ fn add_provider() { ); let mut single_swarm = build_node_with_config(config); - single_swarm.1.add_address( - Swarm::local_peer_id(&fully_connected_swarms[0].1), - fully_connected_swarms[0].0.clone(), - ); + // Connect `single_swarm` to three bootnodes. + for i in 0..3 { + single_swarm.1.add_address( + Swarm::local_peer_id(&fully_connected_swarms[i].1), + fully_connected_swarms[i].0.clone(), + ); + } let mut swarms = vec![single_swarm]; swarms.append(&mut fully_connected_swarms); @@ -877,3 +920,135 @@ fn exp_decr_expiration_overflow() { quickcheck(prop_no_panic as fn(_, _)) } + +#[test] +fn disjoint_query_does_not_finish_before_all_paths_did() { + let mut config = KademliaConfig::default(); + config.disjoint_query_paths(true); + // I.e. setting the amount disjoint paths to be explored to 2. + config.set_parallelism(NonZeroUsize::new(2).unwrap()); + + let mut alice = build_node_with_config(config); + let mut trudy = build_node(); // Trudy the intrudor, an adversary. + let mut bob = build_node(); + + let key = Key::new(&multihash::Sha2_256::digest(&thread_rng().gen::<[u8; 32]>())); + let record_bob = Record::new(key.clone(), b"bob".to_vec()); + let record_trudy = Record::new(key.clone(), b"trudy".to_vec()); + + // Make `bob` and `trudy` aware of their version of the record searched by + // `alice`. + bob.1.store.put(record_bob.clone()).unwrap(); + trudy.1.store.put(record_trudy.clone()).unwrap(); + + // Make `trudy` and `bob` known to `alice`. + alice.1.add_address(Swarm::local_peer_id(&trudy.1), trudy.0.clone()); + alice.1.add_address(Swarm::local_peer_id(&bob.1), bob.0.clone()); + + // Drop the swarm addresses. + let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1); + + // Have `alice` query the Dht for `key` with a quorum of 1. + alice.get_record(&key, Quorum::One); + + // The default peer timeout is 10 seconds. Choosing 1 seconds here should + // give enough head room to prevent connections to `bob` to time out. + let mut before_timeout = Delay::new(Duration::from_secs(1)); + + // Poll only `alice` and `trudy` expecting `alice` not yet to return a query + // result as it is not able to connect to `bob` just yet. + block_on( + poll_fn(|ctx| { + for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() { + loop { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(KademliaEvent::QueryResult{ + result: QueryResult::GetRecord(result), + .. + })) => { + if i != 0 { + panic!("Expected `QueryResult` from Alice.") + } + + match result { + Ok(_) => panic!( + "Expected query not to finish until all \ + disjoint paths have been explored.", + ), + Err(e) => panic!("{:?}", e), + } + } + // Ignore any other event. + Poll::Ready(Some(_)) => (), + Poll::Ready(None) => panic!("Expected Kademlia behaviour not to finish."), + Poll::Pending => break, + } + } + } + + // Make sure not to wait until connections to `bob` time out. + before_timeout.poll_unpin(ctx) + }) + ); + + // Make sure `alice` has exactly one query with `trudy`'s record only. + assert_eq!(1, alice.queries.iter().count()); + alice.queries.iter().for_each(|q| { + match &q.inner.info { + QueryInfo::GetRecord{ records, .. } => { + assert_eq!( + *records, + vec![PeerRecord { + peer: Some(Swarm::local_peer_id(&trudy).clone()), + record: record_trudy.clone(), + }], + ); + }, + i @ _ => panic!("Unexpected query info: {:?}", i), + } + }); + + // Poll `alice` and `bob` expecting `alice` to return a successful query + // result as it is now able to explore the second disjoint path. + let records = block_on( + poll_fn(|ctx| { + for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() { + loop { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(KademliaEvent::QueryResult{ + result: QueryResult::GetRecord(result), + .. + })) => { + if i != 0 { + panic!("Expected `QueryResult` from Alice.") + } + + match result { + Ok(ok) => return Poll::Ready(ok.records), + Err(e) => unreachable!("{:?}", e), + } + } + // Ignore any other event. + Poll::Ready(Some(_)) => (), + Poll::Ready(None) => panic!( + "Expected Kademlia behaviour not to finish.", + ), + Poll::Pending => break, + } + } + } + + Poll::Pending + }) + ); + + assert_eq!(2, records.len()); + assert!(records.contains(&PeerRecord { + peer: Some(Swarm::local_peer_id(&bob).clone()), + record: record_bob, + })); + assert!(records.contains(&PeerRecord { + peer: Some(Swarm::local_peer_id(&trudy).clone()), + record: record_trudy, + })); +} diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 28d005cd..368ed587 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -48,6 +48,8 @@ pub use behaviour::{ QueryInfo, QueryStats, + PeerRecord, + BootstrapResult, BootstrapOk, BootstrapError, @@ -107,4 +109,3 @@ pub const K_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) }; /// /// The current value is `3`. pub const ALPHA_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(3) }; - diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 706ca622..67d9fb58 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -21,10 +21,10 @@ mod peers; use peers::PeersIterState; -use peers::closest::{ClosestPeersIter, ClosestPeersIterConfig}; +use peers::closest::{ClosestPeersIterConfig, ClosestPeersIter, disjoint::ClosestDisjointPeersIter}; use peers::fixed::FixedPeersIter; -use crate::K_VALUE; +use crate::{ALPHA_VALUE, K_VALUE}; use crate::kbucket::{Key, KeyBytes}; use either::Either; use fnv::FnvHashMap; @@ -104,7 +104,7 @@ impl QueryPool { I: IntoIterator { assert!(!self.queries.contains_key(&id)); - let parallelism = self.config.replication_factor.get(); + let parallelism = self.config.replication_factor; let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism)); let query = Query::new(id, peer_iter, inner); self.queries.insert(id, query); @@ -113,7 +113,7 @@ impl QueryPool { /// Adds a query to the pool that iterates towards the closest peers to the target. pub fn add_iter_closest(&mut self, target: T, peers: I, inner: TInner) -> QueryId where - T: Into, + T: Into + Clone, I: IntoIterator> { let id = self.next_query_id(); @@ -124,14 +124,23 @@ impl QueryPool { /// Adds a query to the pool that iterates towards the closest peers to the target. pub fn continue_iter_closest(&mut self, id: QueryId, target: T, peers: I, inner: TInner) where - T: Into, + T: Into + Clone, I: IntoIterator> { let cfg = ClosestPeersIterConfig { - num_results: self.config.replication_factor.get(), + num_results: self.config.replication_factor, + parallelism: self.config.parallelism, .. ClosestPeersIterConfig::default() }; - let peer_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers)); + + let peer_iter = if self.config.disjoint_query_paths { + QueryPeerIter::ClosestDisjoint( + ClosestDisjointPeersIter::with_config(cfg, target, peers), + ) + } else { + QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers)) + }; + let query = Query::new(id, peer_iter, inner); self.queries.insert(id, query); } @@ -212,15 +221,34 @@ pub struct QueryId(usize); /// The configuration for queries in a `QueryPool`. #[derive(Debug, Clone)] pub struct QueryConfig { + /// Timeout of a single query. + /// + /// See [`crate::behaviour::KademliaConfig::set_query_timeout`] for details. pub timeout: Duration, + + /// The replication factor to use. + /// + /// See [`crate::behaviour::KademliaConfig::set_replication_factor`] for details. pub replication_factor: NonZeroUsize, + + /// Allowed level of parallelism for iterative queries. + /// + /// See [`crate::behaviour::KademliaConfig::set_parallelism`] for details. + pub parallelism: NonZeroUsize, + + /// Whether to use disjoint paths on iterative lookups. + /// + /// See [`crate::behaviour::KademliaConfig::disjoint_query_paths`] for details. + pub disjoint_query_paths: bool, } impl Default for QueryConfig { fn default() -> Self { QueryConfig { timeout: Duration::from_secs(60), - replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0") + replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"), + parallelism: ALPHA_VALUE, + disjoint_query_paths: false, } } } @@ -240,6 +268,7 @@ pub struct Query { /// The peer selection strategies that can be used by queries. enum QueryPeerIter { Closest(ClosestPeersIter), + ClosestDisjoint(ClosestDisjointPeersIter), Fixed(FixedPeersIter) } @@ -263,6 +292,7 @@ impl Query { pub fn on_failure(&mut self, peer: &PeerId) { let updated = match &mut self.peer_iter { QueryPeerIter::Closest(iter) => iter.on_failure(peer), + QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer), QueryPeerIter::Fixed(iter) => iter.on_failure(peer) }; if updated { @@ -279,6 +309,7 @@ impl Query { { let updated = match &mut self.peer_iter { QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers), + QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers), QueryPeerIter::Fixed(iter) => iter.on_success(peer) }; if updated { @@ -290,6 +321,7 @@ impl Query { pub fn is_waiting(&self, peer: &PeerId) -> bool { match &self.peer_iter { QueryPeerIter::Closest(iter) => iter.is_waiting(peer), + QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer), QueryPeerIter::Fixed(iter) => iter.is_waiting(peer) } } @@ -298,6 +330,7 @@ impl Query { fn next(&mut self, now: Instant) -> PeersIterState { let state = match &mut self.peer_iter { QueryPeerIter::Closest(iter) => iter.next(now), + QueryPeerIter::ClosestDisjoint(iter) => iter.next(now), QueryPeerIter::Fixed(iter) => iter.next() }; @@ -308,6 +341,34 @@ impl Query { state } + /// Tries to (gracefully) finish the query prematurely, providing the peers + /// that are no longer of interest for further progress of the query. + /// + /// A query may require that in order to finish gracefully a certain subset + /// of peers must be contacted. E.g. in the case of disjoint query paths a + /// query may only finish gracefully if every path contacted a peer whose + /// response permits termination of the query. The given peers are those for + /// which this is considered to be the case, i.e. for which a termination + /// condition is satisfied. + /// + /// Returns `true` if the query did indeed finish, `false` otherwise. In the + /// latter case, a new attempt at finishing the query may be made with new + /// `peers`. + /// + /// A finished query immediately stops yielding new peers to contact and + /// will be reported by [`QueryPool::poll`] via + /// [`QueryPoolState::Finished`]. + pub fn try_finish<'a, I>(&mut self, peers: I) -> bool + where + I: IntoIterator + { + match &mut self.peer_iter { + QueryPeerIter::Closest(iter) => { iter.finish(); true }, + QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers), + QueryPeerIter::Fixed(iter) => { iter.finish(); true } + } + } + /// Finishes the query prematurely. /// /// A finished query immediately stops yielding new peers to contact and will be @@ -315,6 +376,7 @@ impl Query { pub fn finish(&mut self) { match &mut self.peer_iter { QueryPeerIter::Closest(iter) => iter.finish(), + QueryPeerIter::ClosestDisjoint(iter) => iter.finish(), QueryPeerIter::Fixed(iter) => iter.finish() } } @@ -326,6 +388,7 @@ impl Query { pub fn is_finished(&self) -> bool { match &self.peer_iter { QueryPeerIter::Closest(iter) => iter.is_finished(), + QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(), QueryPeerIter::Fixed(iter) => iter.is_finished() } } @@ -333,7 +396,8 @@ impl Query { /// Consumes the query, producing the final `QueryResult`. pub fn into_result(self) -> QueryResult> { let peers = match self.peer_iter { - QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()), + QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())), + QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())), QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()) }; QueryResult { peers, inner: self.inner, stats: self.stats } diff --git a/protocols/kad/src/query/peers.rs b/protocols/kad/src/query/peers.rs index 049ffe58..964068aa 100644 --- a/protocols/kad/src/query/peers.rs +++ b/protocols/kad/src/query/peers.rs @@ -65,4 +65,3 @@ pub enum PeersIterState<'a> { /// The iterator finished. Finished } - diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index dda9b716..7b751a4f 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -23,10 +23,12 @@ use super::*; use crate::{K_VALUE, ALPHA_VALUE}; use crate::kbucket::{Key, KeyBytes, Distance}; use libp2p_core::PeerId; -use std::{time::Duration, iter::FromIterator}; +use std::{time::Duration, iter::FromIterator, num::NonZeroUsize}; use std::collections::btree_map::{BTreeMap, Entry}; use wasm_timer::Instant; +pub mod disjoint; + /// A peer iterator for a dynamically changing list of peers, sorted by increasing /// distance to a chosen target. #[derive(Debug, Clone)] @@ -55,13 +57,13 @@ pub struct ClosestPeersIterConfig { /// The `α` parameter in the Kademlia paper. The maximum number of peers that /// the iterator is allowed to wait for in parallel while iterating towards the closest /// nodes to a target. Defaults to `ALPHA_VALUE`. - pub parallelism: usize, + pub parallelism: NonZeroUsize, /// Number of results (closest peers) to search for. /// /// The number of closest peers for which the iterator must obtain successful results /// in order to finish successfully. Defaults to `K_VALUE`. - pub num_results: usize, + pub num_results: NonZeroUsize, /// The timeout for a single peer. /// @@ -75,8 +77,8 @@ pub struct ClosestPeersIterConfig { impl Default for ClosestPeersIterConfig { fn default() -> Self { ClosestPeersIterConfig { - parallelism: ALPHA_VALUE.get(), - num_results: K_VALUE.get(), + parallelism: ALPHA_VALUE, + num_results: K_VALUE, peer_timeout: Duration::from_secs(10), } } @@ -180,14 +182,14 @@ impl ClosestPeersIter { // than any peer seen so far (i.e. is the first entry), or the iterator did // not yet accumulate enough closest peers. progress = self.closest_peers.keys().next() == Some(&distance) - || num_closest < self.config.num_results; + || num_closest < self.config.num_results.get(); } // Update the iterator state. self.state = match self.state { State::Iterating { no_progress } => { let no_progress = if progress { 0 } else { no_progress + 1 }; - if no_progress >= self.config.parallelism { + if no_progress >= self.config.parallelism.get() { State::Stalled } else { State::Iterating { no_progress } @@ -310,7 +312,7 @@ impl ClosestPeersIter { *cnt += 1; // If `num_results` successful results have been delivered for the // closest peers, the iterator is done. - if *cnt >= self.config.num_results { + if *cnt >= self.config.num_results.get() { self.state = State::Finished; return PeersIterState::Finished } @@ -355,7 +357,7 @@ impl ClosestPeersIter { self.state == State::Finished } - /// Consumes the iterator, returning the target and the closest peers. + /// Consumes the iterator, returning the closest peers. pub fn into_result(self) -> impl Iterator { self.closest_peers .into_iter() @@ -366,7 +368,7 @@ impl ClosestPeersIter { None } }) - .take(self.config.num_results) + .take(self.config.num_results.get()) } /// Checks if the iterator is at capacity w.r.t. the permitted parallelism. @@ -378,9 +380,9 @@ impl ClosestPeersIter { fn at_capacity(&self) -> bool { match self.state { State::Stalled => self.num_waiting >= usize::max( - self.config.num_results, self.config.parallelism + self.config.num_results.get(), self.config.parallelism.get() ), - State::Iterating { .. } => self.num_waiting >= self.config.parallelism, + State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(), State::Finished => true } } @@ -487,8 +489,8 @@ mod tests { .map(Key::from); let target = Key::from(Into::::into(PeerId::random())); let config = ClosestPeersIterConfig { - parallelism: g.gen_range(1, 10), - num_results: g.gen_range(1, 25), + parallelism: NonZeroUsize::new(g.gen_range(1, 10)).unwrap(), + num_results: NonZeroUsize::new(g.gen_range(1, 25)).unwrap(), peer_timeout: Duration::from_secs(g.gen_range(10, 30)), }; ClosestPeersIter::with_config(config, target, known_closest_peers) @@ -545,7 +547,7 @@ mod tests { .map(|e| e.key.clone()) .collect::>(); let num_known = expected.len(); - let max_parallelism = usize::min(iter.config.parallelism, num_known); + let max_parallelism = usize::min(iter.config.parallelism.get(), num_known); let target = iter.target.clone(); let mut remaining; @@ -584,7 +586,7 @@ mod tests { // peers or an error, thus finishing the "in-flight requests". for (i, k) in expected.iter().enumerate() { if rng.gen_bool(0.75) { - let num_closer = rng.gen_range(0, iter.config.num_results + 1); + let num_closer = rng.gen_range(0, iter.config.num_results.get() + 1); let closer_peers = random_peers(num_closer, &mut rng); remaining.extend(closer_peers.iter().cloned().map(Key::from)); iter.on_success(k.preimage(), closer_peers); @@ -620,16 +622,16 @@ mod tests { assert!(sorted(&target, &closest)); - if closest.len() < num_results { + if closest.len() < num_results.get() { // The iterator returned fewer results than requested. Therefore // either the initial number of known peers must have been // less than the desired number of results, or there must // have been failures. - assert!(num_known < num_results || num_failures > 0); + assert!(num_known < num_results.get() || num_failures > 0); // All peers must have been contacted. assert!(all_contacted, "Not all peers have been contacted."); } else { - assert_eq!(num_results, closest.len(), "Too many results."); + assert_eq!(num_results.get(), closest.len(), "Too many results."); } } @@ -741,7 +743,7 @@ mod tests { fn prop(mut iter: ClosestPeersIter) { iter.state = State::Stalled; - for i in 0..usize::max(iter.config.parallelism, iter.config.num_results) { + for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) { iter.num_waiting = i; assert!( !iter.at_capacity(), @@ -750,7 +752,10 @@ mod tests { ) } - iter.num_waiting = usize::max(iter.config.parallelism, iter.config.num_results); + iter.num_waiting = usize::max( + iter.config.parallelism.get(), + iter.config.num_results.get(), + ); assert!( iter.at_capacity(), "Iterator should be at capacity if `max(parallelism, num_results)` requests are \ diff --git a/protocols/kad/src/query/peers/closest/disjoint.rs b/protocols/kad/src/query/peers/closest/disjoint.rs new file mode 100644 index 00000000..2487d8cc --- /dev/null +++ b/protocols/kad/src/query/peers/closest/disjoint.rs @@ -0,0 +1,991 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use super::*; +use crate::kbucket::{Key, KeyBytes}; +use libp2p_core::PeerId; +use std::{ + collections::HashMap, + iter::{Cycle, Map, Peekable}, + ops::{Add, Index, IndexMut, Range}, +}; +use wasm_timer::Instant; + +/// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery +/// path per configured parallelism according to the S/Kademlia paper. +pub struct ClosestDisjointPeersIter { + config: ClosestPeersIterConfig, + target: KeyBytes, + + /// The set of wrapped [`ClosestPeersIter`]. + iters: Vec, + /// Order in which to query the iterators ensuring fairness across + /// [`ClosestPeersIter::next`] calls. + iter_order: Cycle, fn(usize) -> IteratorIndex>>, + + /// Mapping of contacted peers by their [`PeerId`] to [`PeerState`] + /// containing the corresponding iterator indices as well as the response + /// state. + /// + /// Used to track which iterator contacted which peer. See [`PeerState`] + /// for details. + contacted_peers: HashMap, +} + +impl ClosestDisjointPeersIter { + /// Creates a new iterator with a default configuration. + pub fn new(target: KeyBytes, known_closest_peers: I) -> Self + where + I: IntoIterator>, + { + Self::with_config( + ClosestPeersIterConfig::default(), + target, + known_closest_peers, + ) + } + + /// Creates a new iterator with the given configuration. + pub fn with_config( + config: ClosestPeersIterConfig, + target: T, + known_closest_peers: I, + ) -> Self + where + I: IntoIterator>, + T: Into + Clone, + { + let peers = known_closest_peers.into_iter().take(K_VALUE.get()).collect::>(); + let iters = (0..config.parallelism.get()) + // NOTE: All [`ClosestPeersIter`] share the same set of peers at + // initialization. The [`ClosestDisjointPeersIter.contacted_peers`] + // mapping ensures that a successful response from a peer is only + // ever passed to a single [`ClosestPeersIter`]. See + // [`ClosestDisjointPeersIter::on_success`] for details. + .map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone())) + .collect::>(); + + let iters_len = iters.len(); + + ClosestDisjointPeersIter { + config, + target: target.into(), + iters, + iter_order: (0..iters_len).map(IteratorIndex as fn(usize) -> IteratorIndex).cycle(), + contacted_peers: HashMap::new(), + } + } + + /// Callback for informing the iterator about a failed request to a peer. + /// + /// If the iterator is currently waiting for a result from `peer`, + /// the iterator state is updated and `true` is returned. In that + /// case, after calling this function, `next` should eventually be + /// called again to obtain the new state of the iterator. + /// + /// If the iterator is finished, it is not currently waiting for a + /// result from `peer`, or a result for `peer` has already been reported, + /// calling this function has no effect and `false` is returned. + pub fn on_failure(&mut self, peer: &PeerId) -> bool { + let mut updated = false; + + if let Some(PeerState{ initiated_by, response }) = self.contacted_peers.get_mut(peer) { + updated = self.iters[*initiated_by].on_failure(peer); + + if updated { + *response = ResponseState::Failed; + } + + for (i, iter) in &mut self.iters.iter_mut().enumerate() { + if i != (*initiated_by).into() { + // This iterator never triggered an actual request to the + // given peer - thus ignore the returned boolean. + iter.on_failure(peer); + } + } + } + + updated + } + + /// Callback for delivering the result of a successful request to a peer. + /// + /// Delivering results of requests back to the iterator allows the iterator + /// to make progress. The iterator is said to make progress either when the + /// given `closer_peers` contain a peer closer to the target than any peer + /// seen so far, or when the iterator did not yet accumulate `num_results` + /// closest peers and `closer_peers` contains a new peer, regardless of its + /// distance to the target. + /// + /// If the iterator is currently waiting for a result from `peer`, + /// the iterator state is updated and `true` is returned. In that + /// case, after calling this function, `next` should eventually be + /// called again to obtain the new state of the iterator. + /// + /// If the iterator is finished, it is not currently waiting for a + /// result from `peer`, or a result for `peer` has already been reported, + /// calling this function has no effect and `false` is returned. + pub fn on_success(&mut self, peer: &PeerId, closer_peers: I) -> bool + where + I: IntoIterator, + { + let mut updated = false; + + if let Some(PeerState{ initiated_by, response }) = self.contacted_peers.get_mut(peer) { + // Pass the new `closer_peers` to the iterator that first yielded + // the peer. + updated = self.iters[*initiated_by].on_success(peer, closer_peers); + + if updated { + // Mark the response as succeeded for future iterators yielding + // this peer. There is no need to keep the `closer_peers` + // around, given that they are only passed to the first + // iterator. + *response = ResponseState::Succeeded; + } + + for (i, iter) in &mut self.iters.iter_mut().enumerate() { + if i != (*initiated_by).into() { + // Only report the success to all remaining not-first + // iterators. Do not pass the `closer_peers` in order to + // uphold the S/Kademlia disjoint paths guarantee. + // + // This iterator never triggered an actual request to the + // given peer - thus ignore the returned boolean. + iter.on_success(peer, std::iter::empty()); + } + } + } + + updated + } + + pub fn is_waiting(&self, peer: &PeerId) -> bool { + self.iters.iter().any(|i| i.is_waiting(peer)) + } + + pub fn next(&mut self, now: Instant) -> PeersIterState { + let mut state = None; + + // Ensure querying each iterator at most once. + for _ in 0 .. self.iters.len() { + let i = self.iter_order.next().expect("Cycle never ends."); + let iter = &mut self.iters[i]; + + loop { + match iter.next(now) { + PeersIterState::Waiting(None) => { + match state { + Some(PeersIterState::Waiting(Some(_))) => { + // [`ClosestDisjointPeersIter::next`] returns immediately once a + // [`ClosestPeersIter`] yielded a peer. Thus this state is + // unreachable. + unreachable!(); + }, + Some(PeersIterState::Waiting(None)) => {} + Some(PeersIterState::WaitingAtCapacity) => { + // At least one ClosestPeersIter is no longer at capacity, thus the + // composite ClosestDisjointPeersIter is no longer at capacity. + state = Some(PeersIterState::Waiting(None)) + } + Some(PeersIterState::Finished) => { + // `state` is never set to `Finished`. + unreachable!(); + } + None => state = Some(PeersIterState::Waiting(None)), + + }; + + break; + } + PeersIterState::Waiting(Some(peer)) => { + match self.contacted_peers.get_mut(&*peer) { + Some(PeerState{ response, .. }) => { + // Another iterator already contacted this peer. + let peer = peer.into_owned(); + + match response { + // The iterator will be notified later whether the given node + // was successfully contacted or not. See + // [`ClosestDisjointPeersIter::on_success`] for details. + ResponseState::Waiting => {}, + ResponseState::Succeeded => { + // Given that iterator was not the first to contact the peer + // it will not be made aware of the closer peers discovered + // to uphold the S/Kademlia disjoint paths guarantee. See + // [`ClosestDisjointPeersIter::on_success`] for details. + iter.on_success(&peer, std::iter::empty()); + }, + ResponseState::Failed => { + iter.on_failure(&peer); + }, + } + }, + None => { + // The iterator is the first to contact this peer. + self.contacted_peers.insert( + peer.clone().into_owned(), + PeerState::new(i), + ); + return PeersIterState::Waiting(Some(Cow::Owned(peer.into_owned()))); + }, + } + } + PeersIterState::WaitingAtCapacity => { + match state { + Some(PeersIterState::Waiting(Some(_))) => { + // [`ClosestDisjointPeersIter::next`] returns immediately once a + // [`ClosestPeersIter`] yielded a peer. Thus this state is + // unreachable. + unreachable!(); + }, + Some(PeersIterState::Waiting(None)) => {} + Some(PeersIterState::WaitingAtCapacity) => {} + Some(PeersIterState::Finished) => { + // `state` is never set to `Finished`. + unreachable!(); + }, + None => state = Some(PeersIterState::WaitingAtCapacity), + }; + + break; + } + PeersIterState::Finished => break, + } + } + } + + state.unwrap_or(PeersIterState::Finished) + } + + /// Finishes all paths containing one of the given peers. + /// + /// See [`crate::query::Query::try_finish`] for details. + pub fn finish_paths<'a, I>(&mut self, peers: I) -> bool + where + I: IntoIterator + { + for peer in peers { + if let Some(PeerState{ initiated_by, .. }) = self.contacted_peers.get_mut(peer) { + self.iters[*initiated_by].finish(); + } + } + + self.is_finished() + } + + /// Immediately transitions the iterator to [`PeersIterState::Finished`]. + pub fn finish(&mut self) { + for iter in &mut self.iters { + iter.finish(); + } + } + + /// Checks whether the iterator has finished. + pub fn is_finished(&self) -> bool { + self.iters.iter().all(|i| i.is_finished()) + } + + /// Note: In the case of no adversarial peers or connectivity issues along + /// any path, all paths return the same result, deduplicated through + /// the `ResultIter`, thus overall `into_result` returns + /// `num_results`. In the case of adversarial peers or connectivity + /// issues `ClosestDisjointPeersIter` tries to return the + /// `num_results` closest benign peers, but as it can not + /// differentiate benign from faulty paths it as well returns faulty + /// peers and thus overall returns more than `num_results` peers. + pub fn into_result(self) -> impl Iterator { + let result_per_path= self.iters.into_iter() + .map(|iter| iter.into_result().map(Key::from)); + + ResultIter::new(self.target, result_per_path).map(Key::into_preimage) + } +} + +/// Index into the [`ClosestDisjointPeersIter`] `iters` vector. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct IteratorIndex(usize); + +impl From for IteratorIndex { + fn from(i: usize) -> Self { + IteratorIndex(i) + } +} + +impl From for usize { + fn from(i: IteratorIndex) -> Self { + i.0 + } +} + +impl Add for IteratorIndex { + type Output = Self; + + fn add(self, rhs: usize) -> Self::Output { + (self.0 + rhs).into() + } +} + +impl Index for Vec { + type Output = ClosestPeersIter; + + fn index(&self, index: IteratorIndex) -> &Self::Output { + &self[index.0] + } +} + +impl IndexMut for Vec { + fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output { + &mut self[index.0] + } +} + +/// State tracking the iterator that yielded (i.e. tried to contact) a peer. See +/// [`ClosestDisjointPeersIter::on_success`] for details. +#[derive(Debug, PartialEq, Eq)] +struct PeerState { + /// First iterator to yield the peer. Will be notified both of the outcome + /// (success/failure) as well as the closer peers. + initiated_by: IteratorIndex, + /// Keeping track of the response state. In case other iterators later on + /// yield the same peer, they can be notified of the response outcome. + response: ResponseState, +} + +impl PeerState { + fn new(initiated_by: IteratorIndex) -> Self { + PeerState { + initiated_by, + response: ResponseState::Waiting, + } + } +} + +#[derive(Debug, PartialEq, Eq)] +enum ResponseState { + Waiting, + Succeeded, + Failed, +} + +/// Iterator combining the result of multiple [`ClosestPeersIter`] into a single +/// deduplicated ordered iterator. +// +// Note: This operates under the assumption that `I` is ordered. +#[derive(Clone, Debug)] +struct ResultIter where + I: Iterator>, +{ + target: KeyBytes, + iters: Vec>, +} + +impl>> ResultIter { + fn new(target: KeyBytes, iters: impl Iterator) -> Self { + ResultIter{ + target, + iters: iters.map(Iterator::peekable).collect(), + } + } +} + +impl>> Iterator for ResultIter { + type Item = I::Item; + + fn next(&mut self) -> Option { + let target = &self.target; + + self.iters.iter_mut() + // Find the iterator with the next closest peer. + .fold( + Option::<&mut Peekable<_>>::None, + |iter_a, iter_b| { + let iter_a = match iter_a { + Some(iter_a) => iter_a, + None => return Some(iter_b), + }; + + match (iter_a.peek(), iter_b.peek()) { + (Some(next_a), Some(next_b)) => { + if next_a == next_b { + // Remove from one for deduplication. + iter_b.next(); + return Some(iter_a) + } + + if target.distance(next_a) < target.distance(next_b) { + Some(iter_a) + } else { + Some(iter_b) + } + }, + (Some(_), None) => Some(iter_a), + (None, Some(_)) => Some(iter_b), + (None, None) => None, + } + }, + ) + // Pop off the next closest peer from that iterator. + .and_then(Iterator::next) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::K_VALUE; + use quickcheck::*; + use rand::{Rng, seq::SliceRandom}; + use std::collections::HashSet; + use std::iter; + + impl Arbitrary for ResultIter>> { + fn arbitrary(g: &mut G) -> Self { + let target = Target::arbitrary(g).0; + let num_closest_iters = g.gen_range(0, 20 + 1); + let peers = random_peers( + g.gen_range(0, 20 * num_closest_iters + 1), + g, + ); + + let iters: Vec<_> = (0..num_closest_iters) + .map(|_| { + let num_peers = g.gen_range(0, 20 + 1); + let mut peers = peers.choose_multiple(g, num_peers) + .cloned() + .map(Key::from) + .collect::>(); + + peers.sort_unstable_by(|a, b| { + target.distance(a).cmp(&target.distance(b)) + }); + + peers.into_iter() + }) + .collect(); + + ResultIter::new(target, iters.into_iter()) + } + + fn shrink(&self) -> Box> { + let peers = self.iters + .clone() + .into_iter() + .flatten() + .collect::>() + .into_iter() + .collect::>(); + + let iters = self.iters.clone() + .into_iter() + .map(|iter| iter.collect::>()) + .collect(); + + Box::new(ResultIterShrinker { + target: self.target.clone(), + peers, + iters, + }) + } + } + + struct ResultIterShrinker { + target: KeyBytes, + peers: Vec>, + iters: Vec>>, + } + + impl Iterator for ResultIterShrinker { + type Item = ResultIter>>; + + /// Return an iterator of [`ResultIter`]s with each of them missing a + /// different peer from the original set. + fn next(&mut self) -> Option { + // The peer that should not be included. + let peer = self.peers.pop()?; + + let iters = self.iters.clone().into_iter() + .filter_map(|mut iter| { + iter.retain(|p| p != &peer); + if iter.is_empty() { + return None; + } + Some(iter.into_iter()) + }).collect::>(); + + Some(ResultIter::new(self.target.clone(), iters.into_iter())) + } + } + + #[derive(Clone, Debug)] + struct Target(KeyBytes); + + impl Arbitrary for Target { + fn arbitrary(g: &mut G) -> Self { + Target(Key::from(random_peers(1, g).pop().unwrap()).into()) + } + } + + fn random_peers(n: usize, g: &mut R) -> Vec { + (0 .. n).map(|_| PeerId::from_multihash( + multihash::wrap(multihash::Code::Sha2_256, &g.gen::<[u8; 32]>()) + ).unwrap()).collect() + } + + #[test] + fn result_iter_returns_deduplicated_ordered_peer_id_stream() { + fn prop(result_iter: ResultIter>>) { + let expected = { + let mut deduplicated = result_iter.clone() + .iters + .into_iter() + .flatten() + .collect::>() + .into_iter() + .map(Key::from) + .collect::>(); + + deduplicated.sort_unstable_by(|a, b| { + result_iter.target.distance(a).cmp(&result_iter.target.distance(b)) + }); + + deduplicated + }; + + assert_eq!(expected, result_iter.collect::>()); + } + + QuickCheck::new().quickcheck(prop as fn(_)) + } + + #[derive(Debug, Clone)] + struct Parallelism(NonZeroUsize); + + impl Arbitrary for Parallelism{ + fn arbitrary(g: &mut G) -> Self { + Parallelism(NonZeroUsize::new(g.gen_range(1, 10)).unwrap()) + } + } + + #[derive(Debug, Clone)] + struct NumResults(NonZeroUsize); + + impl Arbitrary for NumResults{ + fn arbitrary(g: &mut G) -> Self { + NumResults(NonZeroUsize::new(g.gen_range(1, K_VALUE.get())).unwrap()) + } + } + + impl Arbitrary for ClosestPeersIterConfig { + fn arbitrary(g: &mut G) -> Self { + ClosestPeersIterConfig { + parallelism: Parallelism::arbitrary(g).0, + num_results: NumResults::arbitrary(g).0, + peer_timeout: Duration::from_secs(1), + } + } + } + + #[derive(Debug, Clone)] + struct PeerVec(pub Vec>); + + impl Arbitrary for PeerVec { + fn arbitrary(g: &mut G) -> Self { + PeerVec( + (0..g.gen_range(1, 60)) + .map(|_| PeerId::random()) + .map(Key::from) + .collect(), + ) + } + } + + #[test] + fn s_kademlia_disjoint_paths() { + let now = Instant::now(); + let target: KeyBytes = Key::from(PeerId::random()).into(); + + let mut pool = [0; 12].iter() + .map(|_| Key::from(PeerId::random())) + .collect::>(); + + pool.sort_unstable_by(|a, b| { + target.distance(a).cmp(&target.distance(b)) + }); + + let known_closest_peers = pool.split_off(pool.len() - 3); + + let config = ClosestPeersIterConfig { + parallelism: NonZeroUsize::new(3).unwrap(), + num_results: NonZeroUsize::new(3).unwrap(), + ..ClosestPeersIterConfig::default() + }; + + let mut peers_iter = ClosestDisjointPeersIter::with_config( + config.clone(), + target, + known_closest_peers.clone(), + ); + + //////////////////////////////////////////////////////////////////////// + // First round. + + for _ in 0..3 { + if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) { + assert!(known_closest_peers.contains(&Key::from(peer))); + } else { + panic!("Expected iterator to return peer to query."); + } + } + + assert_eq!( + PeersIterState::WaitingAtCapacity, + peers_iter.next(now), + ); + + let response_2 = pool.split_off(pool.len() - 3); + let response_3 = pool.split_off(pool.len() - 3); + // Keys are closer than any of the previous two responses from honest + // node 1 and 2. + let malicious_response_1 = pool.split_off(pool.len() - 3); + + // Response from malicious peer 1. + peers_iter.on_success( + known_closest_peers[0].preimage(), + malicious_response_1.clone().into_iter().map(|k| k.preimage().clone()), + ); + + // Response from peer 2. + peers_iter.on_success( + known_closest_peers[1].preimage(), + response_2.clone().into_iter().map(|k| k.preimage().clone()), + ); + + // Response from peer 3. + peers_iter.on_success( + known_closest_peers[2].preimage(), + response_3.clone().into_iter().map(|k| k.preimage().clone()), + ); + + //////////////////////////////////////////////////////////////////////// + // Second round. + + let mut next_to_query = vec![]; + for _ in 0..3 { + if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) { + next_to_query.push(peer) + } else { + panic!("Expected iterator to return peer to query."); + } + }; + + // Expect a peer from each disjoint path. + assert!(next_to_query.contains(malicious_response_1[0].preimage())); + assert!(next_to_query.contains(response_2[0].preimage())); + assert!(next_to_query.contains(response_3[0].preimage())); + + for peer in next_to_query { + peers_iter.on_success(&peer, vec![]); + } + + // Mark all remaining peers as succeeded. + for _ in 0..6 { + if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) { + peers_iter.on_success(&peer, vec![]); + } else { + panic!("Expected iterator to return peer to query."); + } + } + + assert_eq!( + PeersIterState::Finished, + peers_iter.next(now), + ); + + let final_peers: Vec<_> = peers_iter.into_result().collect(); + + // Expect final result to contain peer from each disjoint path, even + // though not all are among the best ones. + assert!(final_peers.contains(malicious_response_1[0].preimage())); + assert!(final_peers.contains(response_2[0].preimage())); + assert!(final_peers.contains(response_3[0].preimage())); + } + + #[derive(Clone)] + struct Graph(HashMap); + + impl std::fmt::Debug for Graph { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_list().entries(self.0.iter().map(|(id, _)| id)).finish() + } + } + + impl Arbitrary for Graph { + fn arbitrary(g: &mut G) -> Self { + let mut peer_ids = random_peers(g.gen_range(K_VALUE.get(), 200), g) + .into_iter() + .map(|peer_id| (peer_id.clone(), Key::from(peer_id))) + .collect::>(); + + // Make each peer aware of its direct neighborhood. + let mut peers = peer_ids.clone().into_iter() + .map(|(peer_id, key)| { + peer_ids.sort_unstable_by(|(_, a), (_, b)| { + key.distance(a).cmp(&key.distance(b)) + }); + + assert_eq!(peer_id, peer_ids[0].0); + + let known_peers = peer_ids.iter() + // Skip itself. + .skip(1) + .take(K_VALUE.get()) + .cloned() + .collect::>(); + + (peer_id, Peer{ known_peers }) + }) + .collect::>(); + + // Make each peer aware of a random set of other peers within the graph. + for (peer_id, peer) in peers.iter_mut() { + peer_ids.shuffle(g); + + let num_peers = g.gen_range(K_VALUE.get(), peer_ids.len() + 1); + let mut random_peer_ids = peer_ids.choose_multiple(g, num_peers) + // Make sure not to include itself. + .filter(|(id, _)| peer_id != id) + .cloned() + .collect::>(); + + peer.known_peers.append(&mut random_peer_ids); + peer.known_peers = std::mem::replace(&mut peer.known_peers, vec![]) + // Deduplicate peer ids. + .into_iter().collect::>().into_iter().collect(); + } + + Graph(peers) + } + } + + impl Graph { + fn get_closest_peer(&self, target: &KeyBytes) -> PeerId { + self.0.iter() + .map(|(peer_id, _)| (target.distance(&Key::new(peer_id.clone())), peer_id)) + .fold(None, |acc, (distance_b, peer_id_b)| { + match acc { + None => Some((distance_b, peer_id_b)), + Some((distance_a, peer_id_a)) => if distance_a < distance_b { + Some((distance_a, peer_id_a)) + } else { + Some((distance_b, peer_id_b)) + } + } + + }) + .expect("Graph to have at least one peer.") + .1.clone() + } + } + + #[derive(Debug, Clone)] + struct Peer { + known_peers: Vec<(PeerId, Key)>, + } + + impl Peer { + fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec { + self.known_peers.sort_unstable_by(|(_, a), (_, b)| { + target.distance(a).cmp(&target.distance(b)) + }); + + self.known_peers.iter().take(K_VALUE.get()).map(|(id, _)| id).cloned().collect() + } + } + + enum PeerIterator { + Disjoint(ClosestDisjointPeersIter), + Closest(ClosestPeersIter), + } + + impl PeerIterator { + fn next(&mut self, now: Instant) -> PeersIterState { + match self { + PeerIterator::Disjoint(iter) => iter.next(now), + PeerIterator::Closest(iter) => iter.next(now), + } + } + + fn on_success(&mut self, peer: &PeerId, closer_peers: Vec) { + match self { + PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers), + PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers), + }; + } + + fn into_result(self) -> Vec { + match self { + PeerIterator::Disjoint(iter) => iter.into_result().collect(), + PeerIterator::Closest(iter) => iter.into_result().collect(), + } + } + } + + /// Ensure [`ClosestPeersIter`] and [`ClosestDisjointPeersIter`] yield same closest peers. + #[test] + fn closest_and_disjoint_closest_yield_same_result() { + fn prop( + target: Target, + graph: Graph, + parallelism: Parallelism, + num_results: NumResults, + ) -> TestResult { + if parallelism.0 > num_results.0 { + return TestResult::discard(); + } + + let target: KeyBytes = target.0; + let closest_peer = graph.get_closest_peer(&target); + + let mut known_closest_peers = graph.0.iter() + .take(K_VALUE.get()) + .map(|(key, _peers)| Key::new(key.clone())) + .collect::>(); + known_closest_peers.sort_unstable_by(|a, b| { + target.distance(a).cmp(&target.distance(b)) + }); + + let cfg = ClosestPeersIterConfig{ + parallelism: parallelism.0, + num_results: num_results.0, + ..ClosestPeersIterConfig::default() + }; + + let closest = drive_to_finish( + PeerIterator::Closest(ClosestPeersIter::with_config( + cfg.clone(), + target.clone(), + known_closest_peers.clone(), + )), + graph.clone(), + &target, + ); + + let disjoint = drive_to_finish( + PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config( + cfg, + target.clone(), + known_closest_peers.clone(), + )), + graph.clone(), + &target, + ); + + assert!( + closest.contains(&closest_peer), + "Expected `ClosestPeersIter` to find closest peer.", + ); + assert!( + disjoint.contains(&closest_peer), + "Expected `ClosestDisjointPeersIter` to find closest peer.", + ); + + assert!( + closest.len() == num_results.0.get(), + "Expected `ClosestPeersIter` to find `num_results` closest \ + peers." + ); + assert!( + disjoint.len() >= num_results.0.get(), + "Expected `ClosestDisjointPeersIter` to find at least \ + `num_results` closest peers." + ); + + if closest.len() > disjoint.len() { + let closest_only = closest.difference(&disjoint).collect::>(); + + panic!( + "Expected `ClosestDisjointPeersIter` to find all peers \ + found by `ClosestPeersIter`, but it did not find {:?}.", + closest_only, + ); + }; + + TestResult::passed() + } + + fn drive_to_finish( + mut iter: PeerIterator, + mut graph: Graph, + target: &KeyBytes, + ) -> HashSet { + let now = Instant::now(); + loop { + match iter.next(now) { + PeersIterState::Waiting(Some(peer_id)) => { + let peer_id = peer_id.clone().into_owned(); + let closest_peers = graph.0.get_mut(&peer_id) + .unwrap() + .get_closest_peers(&target); + iter.on_success(&peer_id, closest_peers); + } , + PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) => + panic!("There is never more than one request in flight."), + PeersIterState::Finished => break, + } + } + + let mut result = iter.into_result().into_iter().map(Key::new).collect::>(); + result.sort_unstable_by(|a, b| { + target.distance(a).cmp(&target.distance(b)) + }); + result.into_iter().map(|k| k.into_preimage()).collect() + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_, _, _, _) -> _) + } + + #[test] + fn failure_can_not_overwrite_previous_success() { + let now = Instant::now(); + let peer = PeerId::random(); + let mut iter = ClosestDisjointPeersIter::new( + Key::from(PeerId::random()).into(), + iter::once(Key::from(peer.clone())), + ); + + assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_)))); + + // Expect peer to be marked as succeeded. + assert!(iter.on_success(&peer, iter::empty())); + assert_eq!(iter.contacted_peers.get(&peer), Some(&PeerState { + initiated_by: IteratorIndex(0), + response: ResponseState::Succeeded, + })); + + // Expect peer to stay marked as succeeded. + assert!(!iter.on_failure(&peer)); + assert_eq!(iter.contacted_peers.get(&peer), Some(&PeerState { + initiated_by: IteratorIndex(0), + response: ResponseState::Succeeded, + })); + } +} diff --git a/protocols/kad/src/query/peers/fixed.rs b/protocols/kad/src/query/peers/fixed.rs index edb86ef4..723ce414 100644 --- a/protocols/kad/src/query/peers/fixed.rs +++ b/protocols/kad/src/query/peers/fixed.rs @@ -22,12 +22,12 @@ use super::*; use fnv::FnvHashMap; use libp2p_core::PeerId; -use std::{vec, collections::hash_map::Entry}; +use std::{vec, collections::hash_map::Entry, num::NonZeroUsize}; /// A peer iterator for a fixed set of peers. pub struct FixedPeersIter { /// Ther permitted parallelism, i.e. number of pending results. - parallelism: usize, + parallelism: NonZeroUsize, /// The state of peers emitted by the iterator. peers: FnvHashMap, @@ -58,7 +58,7 @@ enum PeerState { } impl FixedPeersIter { - pub fn new(peers: I, parallelism: usize) -> Self + pub fn new(peers: I, parallelism: NonZeroUsize) -> Self where I: IntoIterator { @@ -133,7 +133,7 @@ impl FixedPeersIter { match &mut self.state { State::Finished => return PeersIterState::Finished, State::Waiting { num_waiting } => { - if *num_waiting >= self.parallelism { + if *num_waiting >= self.parallelism.get() { return PeersIterState::WaitingAtCapacity } loop { @@ -175,7 +175,10 @@ mod test { #[test] fn decrease_num_waiting_on_failure() { - let mut iter = FixedPeersIter::new(vec![PeerId::random(), PeerId::random()], 1); + let mut iter = FixedPeersIter::new( + vec![PeerId::random(), PeerId::random()], + NonZeroUsize::new(1).unwrap(), + ); match iter.next() { PeersIterState::Waiting(Some(peer)) => {