diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 40e7be16..12edc567 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -24,6 +24,7 @@ multihash = { package = "parity-multihash", version = "0.1.0", path = "../../mis parking_lot = "0.7" protobuf = "2.3" rand = "0.6.0" +sha2 = "0.8.0" smallvec = "0.6" tokio-codec = "0.1" tokio-io = "0.1" @@ -36,4 +37,6 @@ libp2p-mplex = { version = "0.8.0", path = "../../muxers/mplex" } libp2p-secio = { version = "0.8.0", path = "../secio" } libp2p-tcp = { version = "0.8.0", path = "../../transports/tcp" } libp2p-yamux = { version = "0.8.0", path = "../../muxers/yamux" } +quickcheck = "0.8" +rand = "0.6.0" tokio = "0.1" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index f4dfac85..2fe8d4e0 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -20,8 +20,7 @@ use crate::addresses::Addresses; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn}; -use crate::kad_hash::KadHash; -use crate::kbucket::{self, KBucketsTable, KBucketsPeerId}; +use crate::kbucket::{self, KBucketsTable}; use crate::protocol::{KadConnectionType, KadPeer}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; use fnv::{FnvHashMap, FnvHashSet}; @@ -30,7 +29,7 @@ use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActio use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use multihash::Multihash; use smallvec::SmallVec; -use std::{borrow::Cow, error, marker::PhantomData, num::NonZeroUsize, time::Duration}; +use std::{borrow::Cow, error, marker::PhantomData, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; use wasm_timer::{Instant, Interval}; @@ -39,7 +38,7 @@ mod test; /// Network behaviour that handles Kademlia. pub struct Kademlia { /// Storage for the nodes. Contains the known multiaddresses for this node. - kbuckets: KBucketsTable, + kbuckets: KBucketsTable, /// If `Some`, we overwrite the Kademlia protocol name with this one. protocol_name_override: Option>, @@ -133,34 +132,23 @@ enum QueryInfoInner { }, } -impl KBucketsPeerId for QueryInfo { - fn distance_with(&self, other: &PeerId) -> u32 { - let other: &Multihash = other.as_ref(); - self.as_ref().distance_with(other) - } - - fn max_distance() -> NonZeroUsize { - ::max_distance() +impl Into> for QueryInfo { + fn into(self) -> kbucket::Key { + kbucket::Key::new(self) } } -impl AsRef for QueryInfo { - fn as_ref(&self) -> &Multihash { +impl AsRef<[u8]> for QueryInfo { + fn as_ref(&self) -> &[u8] { match &self.inner { QueryInfoInner::Initialization { target } => target.as_ref(), QueryInfoInner::FindPeer(peer) => peer.as_ref(), - QueryInfoInner::GetProviders { target, .. } => target, - QueryInfoInner::AddProvider { target } => target, + QueryInfoInner::GetProviders { target, .. } => target.as_bytes(), + QueryInfoInner::AddProvider { target } => target.as_bytes(), } } } -impl PartialEq for QueryInfo { - fn eq(&self, other: &PeerId) -> bool { - self.as_ref().eq(other) - } -} - impl QueryInfo { /// Creates the corresponding RPC request to send to remote. fn to_rpc_request(&self, user_data: TUserData) -> KademliaHandlerIn { @@ -174,7 +162,7 @@ impl QueryInfo { user_data, }, QueryInfoInner::GetProviders { target, .. } => KademliaHandlerIn::GetProvidersReq { - key: target.clone().into(), + key: target.clone(), user_data, }, QueryInfoInner::AddProvider { .. } => KademliaHandlerIn::FindNodeReq { @@ -227,9 +215,8 @@ impl Kademlia { /// Underlying implementation for `add_connected_address` and `add_not_connected_address`. fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) { - let kad_hash = KadHash::from(peer_id.clone()); - - match self.kbuckets.entry(&kad_hash) { + let key = kbucket::Key::new(peer_id.clone()); + match self.kbuckets.entry(&key) { kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address), kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address), kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address), @@ -248,7 +235,7 @@ impl Kademlia { kbucket::InsertOutcome::Full => (), kbucket::InsertOutcome::Pending { to_ping } => { self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.peer_id().clone(), + peer_id: to_ping.into_preimage(), }) }, } @@ -263,7 +250,7 @@ impl Kademlia { let parallelism = 3; Kademlia { - kbuckets: KBucketsTable::new(local_peer_id.into(), Duration::from_secs(60)), // TODO: constant + kbuckets: KBucketsTable::new(kbucket::Key::new(local_peer_id), Duration::from_secs(60)), // TODO: constant protocol_name_override: None, queued_events: SmallVec::new(), active_queries: Default::default(), @@ -283,7 +270,7 @@ impl Kademlia { /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. pub fn kbuckets_entries(&self) -> impl Iterator { - self.kbuckets.entries_not_pending().map(|(kad_hash, _)| kad_hash.peer_id()) + self.kbuckets.entries_not_pending().map(|(key, _)| key.preimage()) } /// Starts an iterative `FIND_NODE` request. @@ -310,9 +297,9 @@ impl Kademlia { pub fn add_providing(&mut self, key: Multihash) { self.providing_keys.insert(key.clone()); let providers = self.values_providers.entry(key).or_insert_with(Default::default); - let my_id = self.kbuckets.my_id(); - if !providers.iter().any(|peer_id| peer_id == my_id.peer_id()) { - providers.push(my_id.peer_id().clone()); + let local_id = self.kbuckets.local_key().preimage(); + if !providers.iter().any(|peer_id| peer_id == local_id) { + providers.push(local_id.clone()); } // Trigger the next refresh now. @@ -348,9 +335,8 @@ impl Kademlia { }; let known_closest_peers = self.kbuckets - .find_closest(target.as_ref()) - .take(self.num_results) - .map(|h| h.peer_id().clone()); + .find_closest(&kbucket::Key::new(target.clone())) + .take(self.num_results); self.active_queries.insert( query_id, @@ -369,7 +355,7 @@ impl Kademlia { where I: Iterator + Clone { - let local_id = self.kbuckets.my_id().peer_id().clone(); + let local_id = self.kbuckets.local_key().preimage().clone(); let others_iter = peers.filter(|p| p.node_id != local_id); for peer in others_iter.clone() { @@ -411,7 +397,7 @@ where // We should order addresses from decreasing likelyhood of connectivity, so start with // the addresses of that peer in the k-buckets. let mut out_list = self.kbuckets - .entry(&KadHash::from(peer_id.clone())) + .entry(&kbucket::Key::new(peer_id.clone())) .value_not_pending() .map(|l| l.iter().cloned().collect::>()) .unwrap_or_else(Vec::new); @@ -442,9 +428,9 @@ where ConnectedPoint::Listener { .. } => None, }; - let id_kad_hash = KadHash::from(id.clone()); + let key = kbucket::Key::new(id.clone()); - match self.kbuckets.entry(&id_kad_hash) { + match self.kbuckets.entry(&key) { kbucket::Entry::InKbucketConnected(_) => { unreachable!("Kbuckets are always kept in sync with the connection state; QED") }, @@ -482,7 +468,7 @@ where kbucket::InsertOutcome::Full => (), kbucket::InsertOutcome::Pending { to_ping } => { self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.peer_id().clone(), + peer_id: to_ping.into_preimage(), }) }, } @@ -498,16 +484,16 @@ where fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) { if let Some(peer_id) = peer_id { - let id_kad_hash = KadHash::from(peer_id.clone()); + let key = kbucket::Key::new(peer_id.clone()); - if let Some(list) = self.kbuckets.entry(&id_kad_hash).value() { + if let Some(list) = self.kbuckets.entry(&key).value() { // TODO: don't remove the address if the error is that we are already connected // to this peer list.remove(addr); } for query in self.active_queries.values_mut() { - if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(id_kad_hash.peer_id()) { + if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(&peer_id) { addrs.retain(|a| a != addr); } } @@ -528,13 +514,13 @@ where query.inject_rpc_error(id); } - match self.kbuckets.entry(&KadHash::from(id.clone())) { + match self.kbuckets.entry(&kbucket::Key::new(id.clone())) { kbucket::Entry::InKbucketConnected(entry) => { match entry.set_disconnected() { kbucket::SetDisconnectedOutcome::Kept(_) => {}, kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => { let event = KademliaOut::KBucketAdded { - peer_id: replacement.peer_id().clone(), + peer_id: replacement.into_preimage(), replaced: Some(id.clone()), }; self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); @@ -568,7 +554,7 @@ where } } - if let Some(list) = self.kbuckets.entry(&KadHash::from(peer_id)).value() { + if let Some(list) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() { if let ConnectedPoint::Dialer { address } = new_endpoint { list.insert(address); } @@ -579,10 +565,10 @@ where match event { KademliaHandlerEvent::FindNodeReq { key, request_id } => { let closer_peers = self.kbuckets - .find_closest(&KadHash::from(key.clone())) - .filter(|p| p.peer_id() != &source) + .find_closest(&kbucket::Key::new(key)) + .filter(|p| p.preimage() != &source) .take(self.num_results) - .map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets)) + .map(|key| build_kad_peer(&key, &mut self.kbuckets)) .collect(); self.queued_events.push(NetworkBehaviourAction::SendEvent { @@ -600,13 +586,6 @@ where self.discovered(&user_data, &source, closer_peers.iter()); } KademliaHandlerEvent::GetProvidersReq { key, request_id } => { - let closer_peers = self.kbuckets - .find_closest(&key) - .filter(|p| p.peer_id() != &source) - .take(self.num_results) - .map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets)) - .collect(); - let provider_peers = { let kbuckets = &mut self.kbuckets; self.values_providers @@ -614,10 +593,16 @@ where .into_iter() .flat_map(|peers| peers) .filter(|p| *p != &source) - .map(move |peer_id| build_kad_peer(&KadHash::from(peer_id.clone()), kbuckets)) + .map(move |peer_id| build_kad_peer(&kbucket::Key::new(peer_id.clone()), kbuckets)) .collect() }; + let closer_peers = self.kbuckets + .find_closest(&kbucket::Key::from(key)) + .take(self.num_results) + .map(|key| build_kad_peer(&key, &mut self.kbuckets)) + .collect(); + self.queued_events.push(NetworkBehaviourAction::SendEvent { peer_id: source, event: KademliaHandlerIn::GetProvidersRes { @@ -675,7 +660,7 @@ where // Flush the changes to the topology that we want to make. for (key, provider) in self.add_provider.drain() { // Don't add ourselves to the providers. - if provider == *self.kbuckets.my_id().peer_id() { + if provider == *self.kbuckets.local_key().preimage() { continue; } let providers = self.values_providers.entry(key).or_insert_with(Default::default); @@ -719,12 +704,12 @@ where query_target, }) => { let rpc = query_target.to_rpc_request(query_id); - if self.connected_peers.contains(&peer_id) { + if self.connected_peers.contains(peer_id) { return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), event: rpc, }); - } else if peer_id != self.kbuckets.my_id().peer_id() { + } else if peer_id != self.kbuckets.local_key().preimage() { self.pending_rpcs.push((peer_id.clone(), rpc)); return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id: peer_id.clone(), @@ -766,12 +751,13 @@ where break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); }, QueryInfoInner::AddProvider { target } => { + let local_key = kbucket::Key::new(parameters.local_peer_id().clone()); for closest in closer_peers { let event = NetworkBehaviourAction::SendEvent { peer_id: closest, event: KademliaHandlerIn::AddProvider { key: target.clone(), - provider_peer: build_kad_peer(&KadHash::from(parameters.local_peer_id().clone()), &mut self.kbuckets), + provider_peer: build_kad_peer(&local_key, &mut self.kbuckets), }, }; @@ -834,20 +820,20 @@ pub enum KademliaOut { /// /// > **Note**: This is just a convenience function that doesn't do anything note-worthy. fn build_kad_peer( - kad_hash: &KadHash, - kbuckets: &mut KBucketsTable + key: &kbucket::Key, + kbuckets: &mut KBucketsTable ) -> KadPeer { - let (multiaddrs, connection_ty) = match kbuckets.entry(kad_hash) { + let (multiaddrs, connection_ty) = match kbuckets.entry(key) { kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection? kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), - kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the KadHash of the local ID"), + kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the kbucket::Key of the local ID"), }; KadPeer { - node_id: kad_hash.peer_id().clone(), + node_id: key.preimage().clone(), multiaddrs, connection_ty, } diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index ecb82a2d..72d2cb0f 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -20,7 +20,7 @@ #![cfg(test)] -use crate::{Kademlia, KademliaOut, kbucket::KBucketsPeerId}; +use crate::{Kademlia, KademliaOut, kbucket::{self, Distance}}; use futures::{future, prelude::*}; use libp2p_core::{ PeerId, @@ -80,6 +80,13 @@ fn build_nodes(num: usize) -> (u64, Vec) { #[test] fn query_iter() { + fn distances(key: &kbucket::Key, peers: Vec) -> Vec { + peers.into_iter() + .map(kbucket::Key::from) + .map(|k| k.distance(key)) + .collect() + } + fn run(n: usize) { // Build `n` nodes. Node `n` knows about node `n-1`, node `n-1` knows about node `n-2`, etc. // Node `n` is queried for a random peer and should return nodes `1..n-1` sorted by @@ -96,14 +103,13 @@ fn query_iter() { // Ask the last peer in the list to search a random peer. The search should // propagate backwards through the list of peers. let search_target = PeerId::random(); + let search_target_key = kbucket::Key::from(search_target.clone()); swarms.last_mut().unwrap().find_node(search_target.clone()); // Set up expectations. let expected_swarm_id = swarm_ids.last().unwrap().clone(); - let expected_peer_ids: Vec<_> = swarm_ids - .iter().cloned().take(n - 1).collect(); - let mut expected_distances: Vec<_> = expected_peer_ids - .iter().map(|p| p.distance_with(&search_target)).collect(); + let expected_peer_ids: Vec<_> = swarm_ids.iter().cloned().take(n - 1).collect(); + let mut expected_distances = distances(&search_target_key, expected_peer_ids.clone()); expected_distances.sort(); // Run test @@ -118,10 +124,8 @@ fn query_iter() { assert_eq!(key, search_target); assert_eq!(swarm_ids[i], expected_swarm_id); assert!(expected_peer_ids.iter().all(|p| closer_peers.contains(p))); - assert_eq!(expected_distances, - closer_peers.iter() - .map(|p| p.distance_with(&key)) - .collect::>()); + let key = kbucket::Key::from(key); + assert_eq!(expected_distances, distances(&key, closer_peers)); return Ok(Async::Ready(())); } Async::Ready(_) => (), diff --git a/protocols/kad/src/kad_hash.rs b/protocols/kad/src/kad_hash.rs deleted file mode 100644 index f630ea84..00000000 --- a/protocols/kad/src/kad_hash.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Inside a KBucketsTable we would like to store the hash of a PeerId -//! even if a PeerId is itself already a hash. When querying the table -//! we may be interested in getting the PeerId back. This module provides -//! a struct, KadHash that stores together a PeerId and its hash for -//! convenience. - -use arrayref::array_ref; -use libp2p_core::PeerId; - -/// Used as key in a KBucketsTable for Kademlia. Stores the hash of a -/// PeerId, and the PeerId itself because it may need to be queried. -#[derive(Clone, Debug, PartialEq)] -pub struct KadHash { - peer_id: PeerId, - hash: [u8; 32], -} - -/// Provide convenience getters. -impl KadHash { - pub fn peer_id(&self) -> &PeerId { - &self.peer_id - } - - pub fn hash(&self) -> &[u8; 32] { - &self.hash - } -} - -impl From for KadHash { - fn from(peer_id: PeerId) -> Self { - let encoding = multihash::encode(multihash::Hash::SHA2256, peer_id.as_bytes()).expect("sha2-256 is always supported"); - - KadHash{ - peer_id: peer_id, - hash: array_ref!(encoding.digest(), 0, 32).clone(), - } - } -} - -impl PartialEq for KadHash { - #[inline] - fn eq(&self, other: &multihash::Multihash) -> bool { - self.hash() == other.digest() - } -} - - -impl PartialEq for multihash::Multihash { - #[inline] - fn eq(&self, other: &KadHash) -> bool { - self.digest() == other.hash() - } -} diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 87c85f69..5d6c8dd6 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -26,30 +26,101 @@ //! corresponding to its distance with the reference key. use arrayvec::ArrayVec; -use bigint::{U512, U256}; -use crate::kad_hash::KadHash; +use bigint::U256; use libp2p_core::PeerId; use multihash::Multihash; -use std::num::NonZeroUsize; +use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}}; use std::slice::IterMut as SliceIterMut; use std::time::{Duration, Instant}; use std::vec::IntoIter as VecIntoIter; -/// Maximum number of nodes in a bucket. -pub const MAX_NODES_PER_BUCKET: usize = 20; +/// Maximum number of k-buckets. +const NUM_BUCKETS: usize = 256; +/// Maximum number of nodes in a bucket, i.e. `k`. +const MAX_NODES_PER_BUCKET: usize = 20; -/// Table of k-buckets. +/// A table of `KBucket`s, i.e. a Kademlia routing table. #[derive(Debug, Clone)] pub struct KBucketsTable { /// Peer ID of the local node. - my_id: TPeerId, + local_key: Key, /// The actual tables that store peers or values. tables: Vec>, /// The timeout when trying to reach the youngest node after which we consider it unresponsive. unresponsive_timeout: Duration, } -/// An individual table that stores peers or values. +/// A `Key` is a cryptographic hash, stored with an associated value in a `KBucket` +/// of a `KBucketsTable`. +/// +/// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of +/// the hash digests, interpreted as an integer. See [`Key::distance`]. +/// +/// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`]. +#[derive(Clone, Debug)] +pub struct Key { + preimage: T, + hash: GenericArray, +} + +impl PartialEq for Key { + fn eq(&self, other: &Key) -> bool { + self.hash == other.hash + } +} + +impl Eq for Key {} + +impl Key { + /// Construct a new `Key` by hashing the bytes of the given `preimage`. + /// + /// The preimage of type `T` is preserved. See [`Key::preimage`] and + /// [`Key::into_preimage`]. + pub fn new(preimage: T) -> Key + where + T: AsRef<[u8]> + { + let hash = Sha256::digest(preimage.as_ref()); + Key { preimage, hash } + } + + /// Borrows the preimage of the key. + pub fn preimage(&self) -> &T { + &self.preimage + } + + /// Converts the key into its preimage. + pub fn into_preimage(self) -> T { + self.preimage + } + + /// Computes the distance of the keys according to the XOR metric. + pub fn distance(&self, other: &Key) -> Distance { + let a = U256::from(self.hash.as_ref()); + let b = U256::from(other.hash.as_ref()); + Distance(a ^ b) + } +} + +impl From for Key { + fn from(h: Multihash) -> Self { + let k = Key::new(h.clone().into_bytes()); + Key { preimage: h, hash: k.hash } + } +} + +impl From for Key { + fn from(peer_id: PeerId) -> Self { + Key::new(peer_id) + } +} + +/// A distance between two `Key`s. +#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)] +pub struct Distance(bigint::U256); + +/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values, +/// ordered from least recently used to most recently used. #[derive(Debug, Clone)] struct KBucket { /// Nodes are always ordered from oldest to newest. The nodes we are connected to are always @@ -79,131 +150,25 @@ struct PendingNode { replace: Instant, } -/// A single node in a k-bucket. +/// A single node in a `KBucket`. #[derive(Debug, Clone)] struct Node { /// Id of the node. - id: TPeerId, + id: Key, /// Value associated to it. value: TVal, } -/// Trait that must be implemented on types that can be used as an identifier in a k-bucket. -/// -/// If `TOther` is not the same as `Self`, it represents an entry already in the k-buckets that -/// `Self` can compare against. -pub trait KBucketsPeerId: PartialEq { - /// Computes the XOR of this value and another one. The lower the closer. - fn distance_with(&self, other: &TOther) -> u32; - - /// Returns then number of bits that are necessary to store the distance between peer IDs. - /// Used for pre-allocations. - fn max_distance() -> NonZeroUsize; -} - -impl KBucketsPeerId for PeerId { - fn distance_with(&self, other: &Self) -> u32 { - >::distance_with(self.as_ref(), other.as_ref()) - } - - fn max_distance() -> NonZeroUsize { - ::max_distance() - } -} - -impl KBucketsPeerId for Multihash { - fn distance_with(&self, other: &PeerId) -> u32 { - >::distance_with(self, other.as_ref()) - } - - fn max_distance() -> NonZeroUsize { - ::max_distance() - } -} - -impl KBucketsPeerId for KadHash { - fn distance_with(&self, other: &Self) -> u32 { - // Note that we don't compare the hash functions because there's no chance of collision - // of the same value hashed with two different hash functions. - let my_hash = U256::from(self.hash()); - let other_hash = U256::from(other.hash()); - let xor = my_hash ^ other_hash; - 256 - xor.leading_zeros() - } - - fn max_distance() -> NonZeroUsize { - // Hash is SHA2256, so fixed value - NonZeroUsize::new(256).expect("256 is not zero; QED") - } -} - -impl KBucketsPeerId for Multihash { - fn distance_with(&self, other: &KadHash) -> u32 { - let my_hash = U512::from(self.digest()); - let other_hash = U512::from(U256::from(other.hash())); - - let xor = my_hash ^ other_hash; - 512 - xor.leading_zeros() - } - - fn max_distance() -> NonZeroUsize { - NonZeroUsize::new(512).expect("512 is not zero; QED") - } -} - -impl KBucketsPeerId for Multihash { - fn distance_with(&self, other: &Self) -> u32 { - // Note that we don't compare the hash functions because there's no chance of collision - // of the same value hashed with two different hash functions. - let my_hash = U512::from(self.digest()); - let other_hash = U512::from(other.digest()); - let xor = my_hash ^ other_hash; - 512 - xor.leading_zeros() - } - - fn max_distance() -> NonZeroUsize { - NonZeroUsize::new(512).expect("512 is not zero; QED") - } -} - -impl KBucketsPeerId for (A, B) -where - A: KBucketsPeerId + PartialEq, - B: KBucketsPeerId + PartialEq, -{ - fn distance_with(&self, other: &(A, B)) -> u32 { - A::distance_with(&self.0, &other.0) + B::distance_with(&self.1, &other.1) - } - - fn max_distance() -> NonZeroUsize { - let n = >::max_distance().get() - .saturating_add(>::max_distance().get()); - NonZeroUsize::new(n).expect("Saturating-add of two non-zeros can't be zero; QED") - } -} - -impl<'a, T> KBucketsPeerId for &'a T -where - T: KBucketsPeerId, -{ - fn distance_with(&self, other: &&'a T) -> u32 { - T::distance_with(*self, *other) - } - - fn max_distance() -> NonZeroUsize { - ::max_distance() - } -} - impl KBucketsTable where - TPeerId: KBucketsPeerId + Clone, + TPeerId: Clone, { - /// Builds a new routing table. - pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self { + /// Builds a new routing table whose keys are distributed over `KBucket`s as + /// per the relative distance to `local_key`. + pub fn new(local_key: Key, unresponsive_timeout: Duration) -> Self { KBucketsTable { - my_id, - tables: (0..TPeerId::max_distance().get()) + local_key, + tables: (0..NUM_BUCKETS) .map(|_| KBucket { nodes: ArrayVec::new(), first_connected_pos: 0, @@ -214,20 +179,20 @@ where } } - /// Returns the ID of the local node. - pub fn my_id(&self) -> &TPeerId { - &self.my_id + /// Returns the local key. + pub fn local_key(&self) -> &Key { + &self.local_key } /// Returns the id of the bucket that should contain the peer with the given ID. /// /// Returns `None` if out of range, which happens if `id` is the same as the local peer id. - fn bucket_num(&self, id: &TPeerId) -> Option { - (self.my_id.distance_with(id) as usize).checked_sub(1) + fn bucket_num(&self, key: &Key) -> Option { + (NUM_BUCKETS - self.local_key.distance(key).0.leading_zeros() as usize).checked_sub(1) } /// Returns an object containing the state of the given entry. - pub fn entry<'a>(&'a mut self, peer_id: &'a TPeerId) -> Entry<'a, TPeerId, TVal> { + pub fn entry<'a>(&'a mut self, peer_id: &'a Key) -> Entry<'a, TPeerId, TVal> { let bucket_num = if let Some(num) = self.bucket_num(peer_id) { num } else { @@ -293,7 +258,7 @@ where } /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. - pub fn entries_not_pending(&self) -> impl Iterator { + pub fn entries_not_pending(&self) -> impl Iterator, &TVal)> { self.tables .iter() .flat_map(|table| table.nodes.iter()) @@ -308,10 +273,10 @@ where BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout) } - /// Finds the nodes closest to `id`, ordered by distance. + /// Finds the keys closest to `key`, ordered by distance. /// /// Pending nodes are ignored. - pub fn find_closest(&mut self, id: &impl KBucketsPeerId) -> VecIntoIter { + pub fn find_closest(&mut self, key: &Key) -> VecIntoIter> { // TODO: optimize let mut out = Vec::new(); for table in self.tables.iter_mut() { @@ -331,7 +296,7 @@ where } } } - out.sort_by(|a, b| id.distance_with(a).cmp(&id.distance_with(b))); + out.sort_by(|a, b| key.distance(a).cmp(&key.distance(b))); out.into_iter() } } @@ -354,7 +319,7 @@ pub enum Entry<'a, TPeerId, TVal> { impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> where - TPeerId: KBucketsPeerId + Clone, + TPeerId: Clone, { /// Returns the value associated to the entry in the bucket, including if the node is pending. pub fn value(&mut self) -> Option<&mut TVal> { @@ -384,12 +349,12 @@ where /// Represents an entry in a k-bucket. pub struct EntryInKbucketConn<'a, TPeerId, TVal> { parent: &'a mut KBucketsTable, - peer_id: &'a TPeerId, + peer_id: &'a Key, } impl<'a, TPeerId, TVal> EntryInKbucketConn<'a, TPeerId, TVal> where - TPeerId: KBucketsPeerId + Clone, + TPeerId: Clone, { /// Returns the value associated to the entry in the bucket. pub fn value(&mut self) -> &mut TVal { @@ -470,7 +435,7 @@ pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> { Replaced { /// Node that replaced the node. // TODO: could be a EntryInKbucketConn, but we have borrow issues with the new peer id - replacement: TPeerId, + replacement: Key, /// Value os the node that has been pushed out. old_val: TVal, }, @@ -479,12 +444,12 @@ pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> { /// Represents an entry waiting for a slot to be available in its k-bucket. pub struct EntryInKbucketConnPending<'a, TPeerId, TVal> { parent: &'a mut KBucketsTable, - peer_id: &'a TPeerId, + peer_id: &'a Key, } impl<'a, TPeerId, TVal> EntryInKbucketConnPending<'a, TPeerId, TVal> where - TPeerId: KBucketsPeerId + Clone, + TPeerId: Clone, { /// Returns the value associated to the entry in the bucket. pub fn value(&mut self) -> &mut TVal { @@ -526,12 +491,12 @@ where /// Represents an entry waiting for a slot to be available in its k-bucket. pub struct EntryInKbucketDiscPending<'a, TPeerId, TVal> { parent: &'a mut KBucketsTable, - peer_id: &'a TPeerId, + peer_id: &'a Key, } impl<'a, TPeerId, TVal> EntryInKbucketDiscPending<'a, TPeerId, TVal> where - TPeerId: KBucketsPeerId + Clone, + TPeerId: Clone, { /// Returns the value associated to the entry in the bucket. pub fn value(&mut self) -> &mut TVal { @@ -573,12 +538,12 @@ where /// Represents an entry in a k-bucket. pub struct EntryInKbucketDisc<'a, TPeerId, TVal> { parent: &'a mut KBucketsTable, - peer_id: &'a TPeerId, + peer_id: &'a Key, } impl<'a, TPeerId, TVal> EntryInKbucketDisc<'a, TPeerId, TVal> where - TPeerId: KBucketsPeerId + Clone, + TPeerId: Clone, { /// Returns the value associated to the entry in the bucket. pub fn value(&mut self) -> &mut TVal { @@ -640,12 +605,12 @@ where /// Represents an entry not in any k-bucket. pub struct EntryNotInKbucket<'a, TPeerId, TVal> { parent: &'a mut KBucketsTable, - peer_id: &'a TPeerId, + peer_id: &'a Key, } impl<'a, TPeerId, TVal> EntryNotInKbucket<'a, TPeerId, TVal> where - TPeerId: KBucketsPeerId + Clone, + TPeerId: Clone, { /// Inserts the node as connected, if possible. pub fn insert_connected(self, value: TVal) -> InsertOutcome { @@ -703,14 +668,14 @@ where /// Outcome of calling `insert`. #[must_use] -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum InsertOutcome { /// The entry has been successfully inserted. Inserted, /// The entry has been inserted as a pending node. Pending { /// We have to try connect to the returned node. - to_ping: TPeerId, + to_ping: Key, }, /// The entry was not inserted because the bucket was full of connected nodes. Full, @@ -760,17 +725,68 @@ impl<'a, TPeerId, TVal> Bucket<'a, TPeerId, TVal> { #[cfg(test)] mod tests { - use crate::kbucket::{Entry, InsertOutcome, KBucketsPeerId, KBucketsTable, MAX_NODES_PER_BUCKET}; - use multihash::{Multihash, Hash}; - use std::thread; + use super::*; + use quickcheck::*; + use libp2p_core::PeerId; + use crate::kbucket::{Entry, InsertOutcome, KBucketsTable, MAX_NODES_PER_BUCKET}; use std::time::Duration; + impl Arbitrary for Key { + fn arbitrary(_: &mut G) -> Key { + Key::from(PeerId::random()) + } + } + + #[test] + fn identity() { + fn prop(a: Key) -> bool { + a.distance(&a) == Distance::default() + } + quickcheck(prop as fn(_) -> _) + } + + #[test] + fn symmetry() { + fn prop(a: Key, b: Key) -> bool { + a.distance(&b) == b.distance(&a) + } + quickcheck(prop as fn(_,_) -> _) + } + + #[test] + fn triangle_inequality() { + fn prop(a: Key, b: Key, c: Key) -> TestResult { + let ab = a.distance(&b); + let bc = b.distance(&c); + let (ab_plus_bc, overflow) = ab.0.overflowing_add(bc.0); + if overflow { + TestResult::discard() + } else { + TestResult::from_bool(a.distance(&c) <= Distance(ab_plus_bc)) + } + } + quickcheck(prop as fn(_,_,_) -> _) + } + + #[test] + fn unidirectionality() { + fn prop(a: Key, b: Key) -> bool { + let d = a.distance(&b); + (0..100).all(|_| { + let c = Key::from(PeerId::random()); + a.distance(&c) != d || b == c + }) + } + quickcheck(prop as fn(_,_) -> _) + } + + #[test] fn basic_closest() { - let my_id = Multihash::random(Hash::SHA2256); - let other_id = Multihash::random(Hash::SHA2256); + let my_key = Key::from(PeerId::random()); + let other_id = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5)); + let mut table = KBucketsTable::<_, ()>::new(my_key, Duration::from_secs(5)); if let Entry::NotInKbucket(entry) = table.entry(&other_id) { match entry.insert_connected(()) { InsertOutcome::Inserted => (), @@ -787,10 +803,10 @@ mod tests { #[test] fn update_local_id_fails() { - let my_id = Multihash::random(Hash::SHA2256); + let my_key = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5)); - match table.entry(&my_id) { + let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5)); + match table.entry(&my_key) { Entry::SelfEntry => (), _ => panic!(), } @@ -798,23 +814,32 @@ mod tests { #[test] fn full_kbucket() { - let my_id = Multihash::random(Hash::SHA2256); + let my_key = Key::from(PeerId::random()); + let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5)); + + // Step 1: Fill the most distant bucket, i.e. bucket index `NUM_BUCKETS - 1`, + // with "disconnected" peers. + + // Prepare `MAX_NODES_PER_BUCKET` keys to fill the bucket, plus 2 + // additional keys which will be used to test the behavior on a full + // bucket. assert!(MAX_NODES_PER_BUCKET <= 251); // Test doesn't work otherwise. let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3) .map(|n| { - let mut id = my_id.clone().into_bytes(); - id[2] ^= 0x80; // Flip the first bit so that we get in the most distant bucket. - id[33] = id[33].wrapping_add(n as u8); - Multihash::from_bytes(id).unwrap() + let mut id = my_key.clone(); + // Flip the first bit so that we get in the most distant bucket. + id.hash[0] ^= 0x80; + // Each ID gets a unique low-order byte (i.e. counter). + id.hash[31] = id.hash[31].wrapping_add(n as u8); + id }) .collect::>(); let first_node = fill_ids[0].clone(); let second_node = fill_ids[1].clone(); - let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(1)); - + // Fill the bucket, consuming all but the last 3 test keys. for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { if let Entry::NotInKbucket(entry) = table.entry(&id) { match entry.insert_disconnected(()) { @@ -826,12 +851,16 @@ mod tests { } assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); } - assert_eq!( table.buckets().nth(255).unwrap().num_entries(), MAX_NODES_PER_BUCKET ); assert!(!table.buckets().nth(255).unwrap().has_pending()); + + // Step 2: Insert another key on the full bucket. It must be marked as + // pending and the first (i.e. "least recently used") entry scheduled + // for replacement. + if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { match entry.insert_connected(()) { InsertOutcome::Pending { ref to_ping } if *to_ping == first_node => (), @@ -840,13 +869,13 @@ mod tests { } else { panic!() } - assert_eq!( table.buckets().nth(255).unwrap().num_entries(), MAX_NODES_PER_BUCKET ); assert!(table.buckets().nth(255).unwrap().has_pending()); - if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { + // Trying to insert yet another key is rejected. + if let Entry::NotInKbucket(entry) = table.entry(&Key::from(fill_ids.remove(0))) { match entry.insert_connected(()) { InsertOutcome::Full => (), _ => panic!() @@ -855,7 +884,12 @@ mod tests { panic!() } - thread::sleep(Duration::from_secs(2)); + // Step 3: Make the pending nodes eligible for replacing existing nodes. + // The pending node must be consumed and replace the first (i.e. "least + // recently used") node. + + let elapsed = Instant::now() - Duration::from_secs(1); + table.tables[255].pending_node.as_mut().map(|n| n.replace = elapsed); assert!(!table.buckets().nth(255).unwrap().has_pending()); if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { match entry.insert_connected(()) { @@ -866,18 +900,4 @@ mod tests { panic!() } } - - #[test] - fn self_distance_zero() { - let a = Multihash::random(Hash::SHA2256); - assert_eq!(a.distance_with(&a), 0); - } - - #[test] - fn distance_correct_order() { - let a = Multihash::random(Hash::SHA2256); - let b = Multihash::random(Hash::SHA2256); - assert!(a.distance_with(&a) < b.distance_with(&a)); - assert!(a.distance_with(&b) > b.distance_with(&b)); - } } diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 37c336de..3d8ef2a7 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -25,7 +25,6 @@ #![allow(dead_code)] pub use self::behaviour::{Kademlia, KademliaOut}; -pub use self::kbucket::KBucketsPeerId; pub use self::protocol::KadConnectionType; pub mod handler; @@ -34,6 +33,5 @@ pub mod protocol; mod addresses; mod behaviour; -mod kad_hash; mod protobuf_structs; mod query; diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 7bf6a0cb..01c34f0e 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -23,7 +23,7 @@ //! This allows one to create queries that iterate on the DHT on nodes that become closer and //! closer to the target. -use crate::kbucket::KBucketsPeerId; +use crate::kbucket; use futures::prelude::*; use smallvec::SmallVec; use std::{cmp::PartialEq, time::Duration}; @@ -44,13 +44,16 @@ pub struct QueryState { /// Target we're looking for. target: TTarget, + /// The `kbucket::Key` representation of the `target`. + target_key: kbucket::Key, + /// Stage of the query. See the documentation of `QueryStage`. stage: QueryStage, /// Ordered list of the peers closest to the result we're looking for. /// Entries that are `InProgress` shouldn't be removed from the list before they complete. /// Must never contain two entries with the same peer IDs. - closest_peers: SmallVec<[(TPeerId, QueryPeerState); 32]>, + closest_peers: SmallVec<[(kbucket::Key, QueryPeerState); 32]>, /// Allowed level of parallelism. parallelism: usize, @@ -100,25 +103,27 @@ enum QueryStage { impl QueryState where - TPeerId: Eq, - TTarget: KBucketsPeerId + TTarget: Into> + Clone, + TPeerId: Into> + Eq { /// Creates a new query. /// /// You should call `poll()` this function returns in order to know what to do. - pub fn new(config: QueryConfig, TTarget>) -> Self { + pub fn new(config: QueryConfig>, TTarget>) -> Self { let mut closest_peers: SmallVec<[_; 32]> = config .known_closest_peers .into_iter() - .map(|peer_id| (peer_id, QueryPeerState::NotContacted)) + .map(|key| (key, QueryPeerState::NotContacted)) .take(config.num_results) .collect(); - let target = config.target; - closest_peers.sort_by_key(|e| target.distance_with(&e.0)); + + let target_key = config.target.clone().into(); + closest_peers.sort_by_key(|e| target_key.distance(&e.0)); closest_peers.dedup_by(|a, b| a.0 == b.0); QueryState { - target, + target: config.target, + target_key, stage: QueryStage::Iterating { no_closer_in_a_row: 0, }, @@ -158,7 +163,7 @@ where ) { // Mark the peer as succeeded. for (peer_id, state) in self.closest_peers.iter_mut() { - if result_source == peer_id { + if result_source == peer_id.preimage() { if let state @ QueryPeerState::InProgress(_) = state { *state = QueryPeerState::Succeeded; } @@ -172,14 +177,16 @@ where ref mut no_closer_in_a_row, } = self.stage { + let target = &self.target_key; + // We increment now, and reset to 0 if we find a closer node. *no_closer_in_a_row += 1; - for elem_to_add in closer_peers { - let target = &self.target; - let elem_to_add_distance = target.distance_with(&elem_to_add); - let insert_pos_start = self.closest_peers.iter().position(|(id, _)| { - target.distance_with(&id) >= elem_to_add_distance + for peer in closer_peers { + let peer_key = peer.into(); + let peer_distance = target.distance(&peer_key); + let insert_pos_start = self.closest_peers.iter().position(|(key, _)| { + target.distance(&key) >= peer_distance }); if let Some(insert_pos_start) = insert_pos_start { @@ -187,30 +194,29 @@ where // `insert_pos_start + insert_pos_size`. let insert_pos_size = self.closest_peers.iter() .skip(insert_pos_start) - .position(|(id, _)| { - target.distance_with(&id) > elem_to_add_distance + .position(|(key, _)| { + target.distance(&key) > peer_distance }); // Make sure we don't insert duplicates. let mut iter_start = self.closest_peers.iter().skip(insert_pos_start); let duplicate = if let Some(insert_pos_size) = insert_pos_size { - iter_start.take(insert_pos_size).any(|e| e.0 == elem_to_add) + iter_start.take(insert_pos_size).any(|e| e.0 == peer_key) } else { - iter_start.any(|e| e.0 == elem_to_add) + iter_start.any(|e| e.0 == peer_key) }; if !duplicate { if insert_pos_start == 0 { *no_closer_in_a_row = 0; } - debug_assert!(self.closest_peers.iter().all(|e| e.0 != elem_to_add)); + debug_assert!(self.closest_peers.iter().all(|e| e.0 != peer_key)); self.closest_peers - .insert(insert_pos_start, (elem_to_add, QueryPeerState::NotContacted)); + .insert(insert_pos_start, (peer_key, QueryPeerState::NotContacted)); } } else if num_closest < self.num_results { - debug_assert!(self.closest_peers.iter().all(|e| e.0 != elem_to_add)); - self.closest_peers - .push((elem_to_add, QueryPeerState::NotContacted)); + debug_assert!(self.closest_peers.iter().all(|e| e.0 != peer_key)); + self.closest_peers.push((peer_key, QueryPeerState::NotContacted)); } } } @@ -246,7 +252,7 @@ where QueryPeerState::Failed => false, } }) - .map(|(id, _)| id) + .map(|(key, _)| key.preimage()) } /// Returns true if we are waiting for a query answer from that peer. @@ -268,7 +274,7 @@ where .closest_peers .iter_mut() .find_map(|(peer_id, state)| - if peer_id == id { + if peer_id.preimage() == id { Some(state) } else { None @@ -303,7 +309,7 @@ where match timeout.poll() { Ok(Async::Ready(_)) | Err(_) => { *state = QueryPeerState::Failed; - return Async::Ready(QueryStatePollOut::CancelRpc { peer_id }); + return Async::Ready(QueryStatePollOut::CancelRpc { peer_id: peer_id.preimage() }); } Ok(Async::NotReady) => { active_counter += 1 @@ -328,7 +334,7 @@ where let delay = Delay::new(Instant::now() + self.rpc_timeout); *state = QueryPeerState::InProgress(delay); return Async::Ready(QueryStatePollOut::SendRpc { - peer_id, + peer_id: peer_id.preimage(), query_target: &self.target, }); } @@ -353,7 +359,7 @@ where .into_iter() .filter_map(|(peer_id, state)| { if let QueryPeerState::Succeeded = state { - Some(peer_id) + Some(peer_id.into_preimage()) } else { None } @@ -423,7 +429,7 @@ enum QueryPeerState { #[cfg(test)] mod tests { - use super::{QueryConfig, QueryState, QueryStatePollOut}; + use super::{kbucket, QueryConfig, QueryState, QueryStatePollOut}; use futures::{self, try_ready, prelude::*}; use libp2p_core::PeerId; use std::{iter, time::Duration, sync::Arc, sync::Mutex, thread}; @@ -432,11 +438,12 @@ mod tests { #[test] fn start_by_sending_rpc_to_known_peers() { let random_id = PeerId::random(); + let random_key = kbucket::Key::new(random_id.clone()); let target = PeerId::random(); let mut query = QueryState::new(QueryConfig { target, - known_closest_peers: iter::once(random_id.clone()), + known_closest_peers: iter::once(random_key), parallelism: 3, num_results: 100, rpc_timeout: Duration::from_secs(10), @@ -455,12 +462,13 @@ mod tests { #[test] fn continue_second_result() { let random_id = PeerId::random(); + let random_key = kbucket::Key::from(random_id.clone()); let random_id2 = PeerId::random(); let target = PeerId::random(); let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { target, - known_closest_peers: iter::once(random_id.clone()), + known_closest_peers: iter::once(random_key), parallelism: 3, num_results: 100, rpc_timeout: Duration::from_secs(10), @@ -500,10 +508,11 @@ mod tests { #[test] fn timeout_works() { let random_id = PeerId::random(); + let random_key = kbucket::Key::from(random_id.clone()); let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { target: PeerId::random(), - known_closest_peers: iter::once(random_id.clone()), + known_closest_peers: iter::once(random_key), parallelism: 3, num_results: 100, rpc_timeout: Duration::from_millis(100),