Improve XOR metric. (#1108)

There are two issues with the current definition and use of Kademlia's
XOR metric:

  1. The distance is currently equated with the bucket index, i.e.
     `distance(a,b) - 1` is the index of the bucket into which either
     peer is put by the other. The result is a metric that is not
     unidirectional, as defined in the Kademlia paper and as implemented
     in e.g. libp2p-go and libp2p-js, which is to interpret the result
     of the XOR as an integer in its entirety.

  2. The current `KBucketsPeerId` trait and its instances allow computing
     distances between types with differing bit lengths as well as between
     types that hash all inputs again (i.e. `KadHash`) and "plain" `PeerId`s
     or `Multihash`es. This can result in computed distances that are either
     incorrect as per the requirement of the libp2p specs that all distances
     are to be computed from the XOR of the SHA256 of the input keys, or
     even fall outside of the image of the metric used for the `KBucketsTable`.
     In the latter case, such distances are not currently used as a bucket index
     - they can only occur in the context of comparing distances for the purpose
     of sorting peers - but that still seems undesirable.

These issues are addressed here as follows:

  * Unidirectionality of the XOR metric is restored by keeping the "full"
    integer representation of the bitwise XOR. The result is an XOR metric
    as defined in the paper. This also opens the door to avoiding the
    "full table scan" when searching for the keys closest to a given key -
    the ideal order in which to visit the buckets can be computed with the
    help of the distance bit string.

  * As a simplification and to make it easy to "do the right thing", the
    XOR metric is only defined on an opaque `kbucket::Key` type, partially
    derived from the current `KadHash`. `KadHash` and `KBucketsPeerId`
    are removed.
This commit is contained in:
Roman Borschel
2019-05-17 17:27:57 +02:00
committed by GitHub
parent 93d89964e1
commit c80205454a
7 changed files with 310 additions and 363 deletions

View File

@ -24,6 +24,7 @@ multihash = { package = "parity-multihash", version = "0.1.0", path = "../../mis
parking_lot = "0.7" parking_lot = "0.7"
protobuf = "2.3" protobuf = "2.3"
rand = "0.6.0" rand = "0.6.0"
sha2 = "0.8.0"
smallvec = "0.6" smallvec = "0.6"
tokio-codec = "0.1" tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
@ -36,4 +37,6 @@ libp2p-mplex = { version = "0.8.0", path = "../../muxers/mplex" }
libp2p-secio = { version = "0.8.0", path = "../secio" } libp2p-secio = { version = "0.8.0", path = "../secio" }
libp2p-tcp = { version = "0.8.0", path = "../../transports/tcp" } libp2p-tcp = { version = "0.8.0", path = "../../transports/tcp" }
libp2p-yamux = { version = "0.8.0", path = "../../muxers/yamux" } libp2p-yamux = { version = "0.8.0", path = "../../muxers/yamux" }
quickcheck = "0.8"
rand = "0.6.0"
tokio = "0.1" tokio = "0.1"

View File

@ -20,8 +20,7 @@
use crate::addresses::Addresses; use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn}; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kad_hash::KadHash; use crate::kbucket::{self, KBucketsTable};
use crate::kbucket::{self, KBucketsTable, KBucketsPeerId};
use crate::protocol::{KadConnectionType, KadPeer}; use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use fnv::{FnvHashMap, FnvHashSet}; use fnv::{FnvHashMap, FnvHashSet};
@ -30,7 +29,7 @@ use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActio
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use multihash::Multihash; use multihash::Multihash;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{borrow::Cow, error, marker::PhantomData, num::NonZeroUsize, time::Duration}; use std::{borrow::Cow, error, marker::PhantomData, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use wasm_timer::{Instant, Interval}; use wasm_timer::{Instant, Interval};
@ -39,7 +38,7 @@ mod test;
/// Network behaviour that handles Kademlia. /// Network behaviour that handles Kademlia.
pub struct Kademlia<TSubstream> { pub struct Kademlia<TSubstream> {
/// Storage for the nodes. Contains the known multiaddresses for this node. /// Storage for the nodes. Contains the known multiaddresses for this node.
kbuckets: KBucketsTable<KadHash, Addresses>, kbuckets: KBucketsTable<PeerId, Addresses>,
/// If `Some`, we overwrite the Kademlia protocol name with this one. /// If `Some`, we overwrite the Kademlia protocol name with this one.
protocol_name_override: Option<Cow<'static, [u8]>>, protocol_name_override: Option<Cow<'static, [u8]>>,
@ -133,34 +132,23 @@ enum QueryInfoInner {
}, },
} }
impl KBucketsPeerId<PeerId> for QueryInfo { impl Into<kbucket::Key<QueryInfo>> for QueryInfo {
fn distance_with(&self, other: &PeerId) -> u32 { fn into(self) -> kbucket::Key<QueryInfo> {
let other: &Multihash = other.as_ref(); kbucket::Key::new(self)
self.as_ref().distance_with(other)
}
fn max_distance() -> NonZeroUsize {
<PeerId as KBucketsPeerId>::max_distance()
} }
} }
impl AsRef<Multihash> for QueryInfo { impl AsRef<[u8]> for QueryInfo {
fn as_ref(&self) -> &Multihash { fn as_ref(&self) -> &[u8] {
match &self.inner { match &self.inner {
QueryInfoInner::Initialization { target } => target.as_ref(), QueryInfoInner::Initialization { target } => target.as_ref(),
QueryInfoInner::FindPeer(peer) => peer.as_ref(), QueryInfoInner::FindPeer(peer) => peer.as_ref(),
QueryInfoInner::GetProviders { target, .. } => target, QueryInfoInner::GetProviders { target, .. } => target.as_bytes(),
QueryInfoInner::AddProvider { target } => target, QueryInfoInner::AddProvider { target } => target.as_bytes(),
} }
} }
} }
impl PartialEq<PeerId> for QueryInfo {
fn eq(&self, other: &PeerId) -> bool {
self.as_ref().eq(other)
}
}
impl QueryInfo { impl QueryInfo {
/// Creates the corresponding RPC request to send to remote. /// Creates the corresponding RPC request to send to remote.
fn to_rpc_request<TUserData>(&self, user_data: TUserData) -> KademliaHandlerIn<TUserData> { fn to_rpc_request<TUserData>(&self, user_data: TUserData) -> KademliaHandlerIn<TUserData> {
@ -174,7 +162,7 @@ impl QueryInfo {
user_data, user_data,
}, },
QueryInfoInner::GetProviders { target, .. } => KademliaHandlerIn::GetProvidersReq { QueryInfoInner::GetProviders { target, .. } => KademliaHandlerIn::GetProvidersReq {
key: target.clone().into(), key: target.clone(),
user_data, user_data,
}, },
QueryInfoInner::AddProvider { .. } => KademliaHandlerIn::FindNodeReq { QueryInfoInner::AddProvider { .. } => KademliaHandlerIn::FindNodeReq {
@ -227,9 +215,8 @@ impl<TSubstream> Kademlia<TSubstream> {
/// Underlying implementation for `add_connected_address` and `add_not_connected_address`. /// Underlying implementation for `add_connected_address` and `add_not_connected_address`.
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) { fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) {
let kad_hash = KadHash::from(peer_id.clone()); let key = kbucket::Key::new(peer_id.clone());
match self.kbuckets.entry(&key) {
match self.kbuckets.entry(&kad_hash) {
kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address), kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address), kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address), kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address),
@ -248,7 +235,7 @@ impl<TSubstream> Kademlia<TSubstream> {
kbucket::InsertOutcome::Full => (), kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => { kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer { self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.peer_id().clone(), peer_id: to_ping.into_preimage(),
}) })
}, },
} }
@ -263,7 +250,7 @@ impl<TSubstream> Kademlia<TSubstream> {
let parallelism = 3; let parallelism = 3;
Kademlia { Kademlia {
kbuckets: KBucketsTable::new(local_peer_id.into(), Duration::from_secs(60)), // TODO: constant kbuckets: KBucketsTable::new(kbucket::Key::new(local_peer_id), Duration::from_secs(60)), // TODO: constant
protocol_name_override: None, protocol_name_override: None,
queued_events: SmallVec::new(), queued_events: SmallVec::new(),
active_queries: Default::default(), active_queries: Default::default(),
@ -283,7 +270,7 @@ impl<TSubstream> Kademlia<TSubstream> {
/// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes.
pub fn kbuckets_entries(&self) -> impl Iterator<Item = &PeerId> { pub fn kbuckets_entries(&self) -> impl Iterator<Item = &PeerId> {
self.kbuckets.entries_not_pending().map(|(kad_hash, _)| kad_hash.peer_id()) self.kbuckets.entries_not_pending().map(|(key, _)| key.preimage())
} }
/// Starts an iterative `FIND_NODE` request. /// Starts an iterative `FIND_NODE` request.
@ -310,9 +297,9 @@ impl<TSubstream> Kademlia<TSubstream> {
pub fn add_providing(&mut self, key: Multihash) { pub fn add_providing(&mut self, key: Multihash) {
self.providing_keys.insert(key.clone()); self.providing_keys.insert(key.clone());
let providers = self.values_providers.entry(key).or_insert_with(Default::default); let providers = self.values_providers.entry(key).or_insert_with(Default::default);
let my_id = self.kbuckets.my_id(); let local_id = self.kbuckets.local_key().preimage();
if !providers.iter().any(|peer_id| peer_id == my_id.peer_id()) { if !providers.iter().any(|peer_id| peer_id == local_id) {
providers.push(my_id.peer_id().clone()); providers.push(local_id.clone());
} }
// Trigger the next refresh now. // Trigger the next refresh now.
@ -348,9 +335,8 @@ impl<TSubstream> Kademlia<TSubstream> {
}; };
let known_closest_peers = self.kbuckets let known_closest_peers = self.kbuckets
.find_closest(target.as_ref()) .find_closest(&kbucket::Key::new(target.clone()))
.take(self.num_results) .take(self.num_results);
.map(|h| h.peer_id().clone());
self.active_queries.insert( self.active_queries.insert(
query_id, query_id,
@ -369,7 +355,7 @@ impl<TSubstream> Kademlia<TSubstream> {
where where
I: Iterator<Item=&'a KadPeer> + Clone I: Iterator<Item=&'a KadPeer> + Clone
{ {
let local_id = self.kbuckets.my_id().peer_id().clone(); let local_id = self.kbuckets.local_key().preimage().clone();
let others_iter = peers.filter(|p| p.node_id != local_id); let others_iter = peers.filter(|p| p.node_id != local_id);
for peer in others_iter.clone() { for peer in others_iter.clone() {
@ -411,7 +397,7 @@ where
// We should order addresses from decreasing likelyhood of connectivity, so start with // We should order addresses from decreasing likelyhood of connectivity, so start with
// the addresses of that peer in the k-buckets. // the addresses of that peer in the k-buckets.
let mut out_list = self.kbuckets let mut out_list = self.kbuckets
.entry(&KadHash::from(peer_id.clone())) .entry(&kbucket::Key::new(peer_id.clone()))
.value_not_pending() .value_not_pending()
.map(|l| l.iter().cloned().collect::<Vec<_>>()) .map(|l| l.iter().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new); .unwrap_or_else(Vec::new);
@ -442,9 +428,9 @@ where
ConnectedPoint::Listener { .. } => None, ConnectedPoint::Listener { .. } => None,
}; };
let id_kad_hash = KadHash::from(id.clone()); let key = kbucket::Key::new(id.clone());
match self.kbuckets.entry(&id_kad_hash) { match self.kbuckets.entry(&key) {
kbucket::Entry::InKbucketConnected(_) => { kbucket::Entry::InKbucketConnected(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED") unreachable!("Kbuckets are always kept in sync with the connection state; QED")
}, },
@ -482,7 +468,7 @@ where
kbucket::InsertOutcome::Full => (), kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => { kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer { self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.peer_id().clone(), peer_id: to_ping.into_preimage(),
}) })
}, },
} }
@ -498,16 +484,16 @@ where
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) { fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) {
if let Some(peer_id) = peer_id { if let Some(peer_id) = peer_id {
let id_kad_hash = KadHash::from(peer_id.clone()); let key = kbucket::Key::new(peer_id.clone());
if let Some(list) = self.kbuckets.entry(&id_kad_hash).value() { if let Some(list) = self.kbuckets.entry(&key).value() {
// TODO: don't remove the address if the error is that we are already connected // TODO: don't remove the address if the error is that we are already connected
// to this peer // to this peer
list.remove(addr); list.remove(addr);
} }
for query in self.active_queries.values_mut() { for query in self.active_queries.values_mut() {
if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(id_kad_hash.peer_id()) { if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(&peer_id) {
addrs.retain(|a| a != addr); addrs.retain(|a| a != addr);
} }
} }
@ -528,13 +514,13 @@ where
query.inject_rpc_error(id); query.inject_rpc_error(id);
} }
match self.kbuckets.entry(&KadHash::from(id.clone())) { match self.kbuckets.entry(&kbucket::Key::new(id.clone())) {
kbucket::Entry::InKbucketConnected(entry) => { kbucket::Entry::InKbucketConnected(entry) => {
match entry.set_disconnected() { match entry.set_disconnected() {
kbucket::SetDisconnectedOutcome::Kept(_) => {}, kbucket::SetDisconnectedOutcome::Kept(_) => {},
kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => { kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => {
let event = KademliaOut::KBucketAdded { let event = KademliaOut::KBucketAdded {
peer_id: replacement.peer_id().clone(), peer_id: replacement.into_preimage(),
replaced: Some(id.clone()), replaced: Some(id.clone()),
}; };
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
@ -568,7 +554,7 @@ where
} }
} }
if let Some(list) = self.kbuckets.entry(&KadHash::from(peer_id)).value() { if let Some(list) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
if let ConnectedPoint::Dialer { address } = new_endpoint { if let ConnectedPoint::Dialer { address } = new_endpoint {
list.insert(address); list.insert(address);
} }
@ -579,10 +565,10 @@ where
match event { match event {
KademliaHandlerEvent::FindNodeReq { key, request_id } => { KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.kbuckets let closer_peers = self.kbuckets
.find_closest(&KadHash::from(key.clone())) .find_closest(&kbucket::Key::new(key))
.filter(|p| p.peer_id() != &source) .filter(|p| p.preimage() != &source)
.take(self.num_results) .take(self.num_results)
.map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets)) .map(|key| build_kad_peer(&key, &mut self.kbuckets))
.collect(); .collect();
self.queued_events.push(NetworkBehaviourAction::SendEvent { self.queued_events.push(NetworkBehaviourAction::SendEvent {
@ -600,13 +586,6 @@ where
self.discovered(&user_data, &source, closer_peers.iter()); self.discovered(&user_data, &source, closer_peers.iter());
} }
KademliaHandlerEvent::GetProvidersReq { key, request_id } => { KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
let closer_peers = self.kbuckets
.find_closest(&key)
.filter(|p| p.peer_id() != &source)
.take(self.num_results)
.map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets))
.collect();
let provider_peers = { let provider_peers = {
let kbuckets = &mut self.kbuckets; let kbuckets = &mut self.kbuckets;
self.values_providers self.values_providers
@ -614,10 +593,16 @@ where
.into_iter() .into_iter()
.flat_map(|peers| peers) .flat_map(|peers| peers)
.filter(|p| *p != &source) .filter(|p| *p != &source)
.map(move |peer_id| build_kad_peer(&KadHash::from(peer_id.clone()), kbuckets)) .map(move |peer_id| build_kad_peer(&kbucket::Key::new(peer_id.clone()), kbuckets))
.collect() .collect()
}; };
let closer_peers = self.kbuckets
.find_closest(&kbucket::Key::from(key))
.take(self.num_results)
.map(|key| build_kad_peer(&key, &mut self.kbuckets))
.collect();
self.queued_events.push(NetworkBehaviourAction::SendEvent { self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source, peer_id: source,
event: KademliaHandlerIn::GetProvidersRes { event: KademliaHandlerIn::GetProvidersRes {
@ -675,7 +660,7 @@ where
// Flush the changes to the topology that we want to make. // Flush the changes to the topology that we want to make.
for (key, provider) in self.add_provider.drain() { for (key, provider) in self.add_provider.drain() {
// Don't add ourselves to the providers. // Don't add ourselves to the providers.
if provider == *self.kbuckets.my_id().peer_id() { if provider == *self.kbuckets.local_key().preimage() {
continue; continue;
} }
let providers = self.values_providers.entry(key).or_insert_with(Default::default); let providers = self.values_providers.entry(key).or_insert_with(Default::default);
@ -719,12 +704,12 @@ where
query_target, query_target,
}) => { }) => {
let rpc = query_target.to_rpc_request(query_id); let rpc = query_target.to_rpc_request(query_id);
if self.connected_peers.contains(&peer_id) { if self.connected_peers.contains(peer_id) {
return Async::Ready(NetworkBehaviourAction::SendEvent { return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(), peer_id: peer_id.clone(),
event: rpc, event: rpc,
}); });
} else if peer_id != self.kbuckets.my_id().peer_id() { } else if peer_id != self.kbuckets.local_key().preimage() {
self.pending_rpcs.push((peer_id.clone(), rpc)); self.pending_rpcs.push((peer_id.clone(), rpc));
return Async::Ready(NetworkBehaviourAction::DialPeer { return Async::Ready(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(), peer_id: peer_id.clone(),
@ -766,12 +751,13 @@ where
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}, },
QueryInfoInner::AddProvider { target } => { QueryInfoInner::AddProvider { target } => {
let local_key = kbucket::Key::new(parameters.local_peer_id().clone());
for closest in closer_peers { for closest in closer_peers {
let event = NetworkBehaviourAction::SendEvent { let event = NetworkBehaviourAction::SendEvent {
peer_id: closest, peer_id: closest,
event: KademliaHandlerIn::AddProvider { event: KademliaHandlerIn::AddProvider {
key: target.clone(), key: target.clone(),
provider_peer: build_kad_peer(&KadHash::from(parameters.local_peer_id().clone()), &mut self.kbuckets), provider_peer: build_kad_peer(&local_key, &mut self.kbuckets),
}, },
}; };
@ -834,20 +820,20 @@ pub enum KademliaOut {
/// ///
/// > **Note**: This is just a convenience function that doesn't do anything note-worthy. /// > **Note**: This is just a convenience function that doesn't do anything note-worthy.
fn build_kad_peer( fn build_kad_peer(
kad_hash: &KadHash, key: &kbucket::Key<PeerId>,
kbuckets: &mut KBucketsTable<KadHash, Addresses> kbuckets: &mut KBucketsTable<PeerId, Addresses>
) -> KadPeer { ) -> KadPeer {
let (multiaddrs, connection_ty) = match kbuckets.entry(kad_hash) { let (multiaddrs, connection_ty) = match kbuckets.entry(key) {
kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection? kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection?
kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the KadHash of the local ID"), kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the kbucket::Key of the local ID"),
}; };
KadPeer { KadPeer {
node_id: kad_hash.peer_id().clone(), node_id: key.preimage().clone(),
multiaddrs, multiaddrs,
connection_ty, connection_ty,
} }

View File

@ -20,7 +20,7 @@
#![cfg(test)] #![cfg(test)]
use crate::{Kademlia, KademliaOut, kbucket::KBucketsPeerId}; use crate::{Kademlia, KademliaOut, kbucket::{self, Distance}};
use futures::{future, prelude::*}; use futures::{future, prelude::*};
use libp2p_core::{ use libp2p_core::{
PeerId, PeerId,
@ -80,6 +80,13 @@ fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
#[test] #[test]
fn query_iter() { fn query_iter() {
fn distances(key: &kbucket::Key<PeerId>, peers: Vec<PeerId>) -> Vec<Distance> {
peers.into_iter()
.map(kbucket::Key::from)
.map(|k| k.distance(key))
.collect()
}
fn run(n: usize) { fn run(n: usize) {
// Build `n` nodes. Node `n` knows about node `n-1`, node `n-1` knows about node `n-2`, etc. // Build `n` nodes. Node `n` knows about node `n-1`, node `n-1` knows about node `n-2`, etc.
// Node `n` is queried for a random peer and should return nodes `1..n-1` sorted by // Node `n` is queried for a random peer and should return nodes `1..n-1` sorted by
@ -96,14 +103,13 @@ fn query_iter() {
// Ask the last peer in the list to search a random peer. The search should // Ask the last peer in the list to search a random peer. The search should
// propagate backwards through the list of peers. // propagate backwards through the list of peers.
let search_target = PeerId::random(); let search_target = PeerId::random();
let search_target_key = kbucket::Key::from(search_target.clone());
swarms.last_mut().unwrap().find_node(search_target.clone()); swarms.last_mut().unwrap().find_node(search_target.clone());
// Set up expectations. // Set up expectations.
let expected_swarm_id = swarm_ids.last().unwrap().clone(); let expected_swarm_id = swarm_ids.last().unwrap().clone();
let expected_peer_ids: Vec<_> = swarm_ids let expected_peer_ids: Vec<_> = swarm_ids.iter().cloned().take(n - 1).collect();
.iter().cloned().take(n - 1).collect(); let mut expected_distances = distances(&search_target_key, expected_peer_ids.clone());
let mut expected_distances: Vec<_> = expected_peer_ids
.iter().map(|p| p.distance_with(&search_target)).collect();
expected_distances.sort(); expected_distances.sort();
// Run test // Run test
@ -118,10 +124,8 @@ fn query_iter() {
assert_eq!(key, search_target); assert_eq!(key, search_target);
assert_eq!(swarm_ids[i], expected_swarm_id); assert_eq!(swarm_ids[i], expected_swarm_id);
assert!(expected_peer_ids.iter().all(|p| closer_peers.contains(p))); assert!(expected_peer_ids.iter().all(|p| closer_peers.contains(p)));
assert_eq!(expected_distances, let key = kbucket::Key::from(key);
closer_peers.iter() assert_eq!(expected_distances, distances(&key, closer_peers));
.map(|p| p.distance_with(&key))
.collect::<Vec<_>>());
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
Async::Ready(_) => (), Async::Ready(_) => (),

View File

@ -1,73 +0,0 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Inside a KBucketsTable we would like to store the hash of a PeerId
//! even if a PeerId is itself already a hash. When querying the table
//! we may be interested in getting the PeerId back. This module provides
//! a struct, KadHash that stores together a PeerId and its hash for
//! convenience.
use arrayref::array_ref;
use libp2p_core::PeerId;
/// Used as key in a KBucketsTable for Kademlia. Stores the hash of a
/// PeerId, and the PeerId itself because it may need to be queried.
#[derive(Clone, Debug, PartialEq)]
pub struct KadHash {
peer_id: PeerId,
hash: [u8; 32],
}
/// Provide convenience getters.
impl KadHash {
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn hash(&self) -> &[u8; 32] {
&self.hash
}
}
impl From<PeerId> for KadHash {
fn from(peer_id: PeerId) -> Self {
let encoding = multihash::encode(multihash::Hash::SHA2256, peer_id.as_bytes()).expect("sha2-256 is always supported");
KadHash{
peer_id: peer_id,
hash: array_ref!(encoding.digest(), 0, 32).clone(),
}
}
}
impl PartialEq<multihash::Multihash> for KadHash {
#[inline]
fn eq(&self, other: &multihash::Multihash) -> bool {
self.hash() == other.digest()
}
}
impl PartialEq<KadHash> for multihash::Multihash {
#[inline]
fn eq(&self, other: &KadHash) -> bool {
self.digest() == other.hash()
}
}

View File

@ -26,30 +26,101 @@
//! corresponding to its distance with the reference key. //! corresponding to its distance with the reference key.
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use bigint::{U512, U256}; use bigint::U256;
use crate::kad_hash::KadHash;
use libp2p_core::PeerId; use libp2p_core::PeerId;
use multihash::Multihash; use multihash::Multihash;
use std::num::NonZeroUsize; use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}};
use std::slice::IterMut as SliceIterMut; use std::slice::IterMut as SliceIterMut;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter; use std::vec::IntoIter as VecIntoIter;
/// Maximum number of nodes in a bucket. /// Maximum number of k-buckets.
pub const MAX_NODES_PER_BUCKET: usize = 20; const NUM_BUCKETS: usize = 256;
/// Maximum number of nodes in a bucket, i.e. `k`.
const MAX_NODES_PER_BUCKET: usize = 20;
/// Table of k-buckets. /// A table of `KBucket`s, i.e. a Kademlia routing table.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct KBucketsTable<TPeerId, TVal> { pub struct KBucketsTable<TPeerId, TVal> {
/// Peer ID of the local node. /// Peer ID of the local node.
my_id: TPeerId, local_key: Key<TPeerId>,
/// The actual tables that store peers or values. /// The actual tables that store peers or values.
tables: Vec<KBucket<TPeerId, TVal>>, tables: Vec<KBucket<TPeerId, TVal>>,
/// The timeout when trying to reach the youngest node after which we consider it unresponsive. /// The timeout when trying to reach the youngest node after which we consider it unresponsive.
unresponsive_timeout: Duration, unresponsive_timeout: Duration,
} }
/// An individual table that stores peers or values. /// A `Key` is a cryptographic hash, stored with an associated value in a `KBucket`
/// of a `KBucketsTable`.
///
/// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of
/// the hash digests, interpreted as an integer. See [`Key::distance`].
///
/// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`].
#[derive(Clone, Debug)]
pub struct Key<T> {
preimage: T,
hash: GenericArray<u8, U32>,
}
impl<T> PartialEq for Key<T> {
fn eq(&self, other: &Key<T>) -> bool {
self.hash == other.hash
}
}
impl<T> Eq for Key<T> {}
impl<T> Key<T> {
/// Construct a new `Key` by hashing the bytes of the given `preimage`.
///
/// The preimage of type `T` is preserved. See [`Key::preimage`] and
/// [`Key::into_preimage`].
pub fn new(preimage: T) -> Key<T>
where
T: AsRef<[u8]>
{
let hash = Sha256::digest(preimage.as_ref());
Key { preimage, hash }
}
/// Borrows the preimage of the key.
pub fn preimage(&self) -> &T {
&self.preimage
}
/// Converts the key into its preimage.
pub fn into_preimage(self) -> T {
self.preimage
}
/// Computes the distance of the keys according to the XOR metric.
pub fn distance<U>(&self, other: &Key<U>) -> Distance {
let a = U256::from(self.hash.as_ref());
let b = U256::from(other.hash.as_ref());
Distance(a ^ b)
}
}
impl From<Multihash> for Key<Multihash> {
fn from(h: Multihash) -> Self {
let k = Key::new(h.clone().into_bytes());
Key { preimage: h, hash: k.hash }
}
}
impl From<PeerId> for Key<PeerId> {
fn from(peer_id: PeerId) -> Self {
Key::new(peer_id)
}
}
/// A distance between two `Key`s.
#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)]
pub struct Distance(bigint::U256);
/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values,
/// ordered from least recently used to most recently used.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct KBucket<TPeerId, TVal> { struct KBucket<TPeerId, TVal> {
/// Nodes are always ordered from oldest to newest. The nodes we are connected to are always /// Nodes are always ordered from oldest to newest. The nodes we are connected to are always
@ -79,131 +150,25 @@ struct PendingNode<TPeerId, TVal> {
replace: Instant, replace: Instant,
} }
/// A single node in a k-bucket. /// A single node in a `KBucket`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Node<TPeerId, TVal> { struct Node<TPeerId, TVal> {
/// Id of the node. /// Id of the node.
id: TPeerId, id: Key<TPeerId>,
/// Value associated to it. /// Value associated to it.
value: TVal, value: TVal,
} }
/// Trait that must be implemented on types that can be used as an identifier in a k-bucket.
///
/// If `TOther` is not the same as `Self`, it represents an entry already in the k-buckets that
/// `Self` can compare against.
pub trait KBucketsPeerId<TOther = Self>: PartialEq<TOther> {
/// Computes the XOR of this value and another one. The lower the closer.
fn distance_with(&self, other: &TOther) -> u32;
/// Returns then number of bits that are necessary to store the distance between peer IDs.
/// Used for pre-allocations.
fn max_distance() -> NonZeroUsize;
}
impl KBucketsPeerId for PeerId {
fn distance_with(&self, other: &Self) -> u32 {
<Multihash as KBucketsPeerId<Multihash>>::distance_with(self.as_ref(), other.as_ref())
}
fn max_distance() -> NonZeroUsize {
<Multihash as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId<PeerId> for Multihash {
fn distance_with(&self, other: &PeerId) -> u32 {
<Multihash as KBucketsPeerId<Multihash>>::distance_with(self, other.as_ref())
}
fn max_distance() -> NonZeroUsize {
<PeerId as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId for KadHash {
fn distance_with(&self, other: &Self) -> u32 {
// Note that we don't compare the hash functions because there's no chance of collision
// of the same value hashed with two different hash functions.
let my_hash = U256::from(self.hash());
let other_hash = U256::from(other.hash());
let xor = my_hash ^ other_hash;
256 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
// Hash is SHA2256, so fixed value
NonZeroUsize::new(256).expect("256 is not zero; QED")
}
}
impl KBucketsPeerId<KadHash> for Multihash {
fn distance_with(&self, other: &KadHash) -> u32 {
let my_hash = U512::from(self.digest());
let other_hash = U512::from(U256::from(other.hash()));
let xor = my_hash ^ other_hash;
512 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
NonZeroUsize::new(512).expect("512 is not zero; QED")
}
}
impl KBucketsPeerId for Multihash {
fn distance_with(&self, other: &Self) -> u32 {
// Note that we don't compare the hash functions because there's no chance of collision
// of the same value hashed with two different hash functions.
let my_hash = U512::from(self.digest());
let other_hash = U512::from(other.digest());
let xor = my_hash ^ other_hash;
512 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
NonZeroUsize::new(512).expect("512 is not zero; QED")
}
}
impl<A, B> KBucketsPeerId for (A, B)
where
A: KBucketsPeerId + PartialEq,
B: KBucketsPeerId + PartialEq,
{
fn distance_with(&self, other: &(A, B)) -> u32 {
A::distance_with(&self.0, &other.0) + B::distance_with(&self.1, &other.1)
}
fn max_distance() -> NonZeroUsize {
let n = <A as KBucketsPeerId<A>>::max_distance().get()
.saturating_add(<B as KBucketsPeerId<B>>::max_distance().get());
NonZeroUsize::new(n).expect("Saturating-add of two non-zeros can't be zero; QED")
}
}
impl<'a, T> KBucketsPeerId for &'a T
where
T: KBucketsPeerId,
{
fn distance_with(&self, other: &&'a T) -> u32 {
T::distance_with(*self, *other)
}
fn max_distance() -> NonZeroUsize {
<T as KBucketsPeerId>::max_distance()
}
}
impl<TPeerId, TVal> KBucketsTable<TPeerId, TVal> impl<TPeerId, TVal> KBucketsTable<TPeerId, TVal>
where where
TPeerId: KBucketsPeerId + Clone, TPeerId: Clone,
{ {
/// Builds a new routing table. /// Builds a new routing table whose keys are distributed over `KBucket`s as
pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self { /// per the relative distance to `local_key`.
pub fn new(local_key: Key<TPeerId>, unresponsive_timeout: Duration) -> Self {
KBucketsTable { KBucketsTable {
my_id, local_key,
tables: (0..TPeerId::max_distance().get()) tables: (0..NUM_BUCKETS)
.map(|_| KBucket { .map(|_| KBucket {
nodes: ArrayVec::new(), nodes: ArrayVec::new(),
first_connected_pos: 0, first_connected_pos: 0,
@ -214,20 +179,20 @@ where
} }
} }
/// Returns the ID of the local node. /// Returns the local key.
pub fn my_id(&self) -> &TPeerId { pub fn local_key(&self) -> &Key<TPeerId> {
&self.my_id &self.local_key
} }
/// Returns the id of the bucket that should contain the peer with the given ID. /// Returns the id of the bucket that should contain the peer with the given ID.
/// ///
/// Returns `None` if out of range, which happens if `id` is the same as the local peer id. /// Returns `None` if out of range, which happens if `id` is the same as the local peer id.
fn bucket_num(&self, id: &TPeerId) -> Option<usize> { fn bucket_num(&self, key: &Key<TPeerId>) -> Option<usize> {
(self.my_id.distance_with(id) as usize).checked_sub(1) (NUM_BUCKETS - self.local_key.distance(key).0.leading_zeros() as usize).checked_sub(1)
} }
/// Returns an object containing the state of the given entry. /// Returns an object containing the state of the given entry.
pub fn entry<'a>(&'a mut self, peer_id: &'a TPeerId) -> Entry<'a, TPeerId, TVal> { pub fn entry<'a>(&'a mut self, peer_id: &'a Key<TPeerId>) -> Entry<'a, TPeerId, TVal> {
let bucket_num = if let Some(num) = self.bucket_num(peer_id) { let bucket_num = if let Some(num) = self.bucket_num(peer_id) {
num num
} else { } else {
@ -293,7 +258,7 @@ where
} }
/// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes.
pub fn entries_not_pending(&self) -> impl Iterator<Item = (&TPeerId, &TVal)> { pub fn entries_not_pending(&self) -> impl Iterator<Item = (&Key<TPeerId>, &TVal)> {
self.tables self.tables
.iter() .iter()
.flat_map(|table| table.nodes.iter()) .flat_map(|table| table.nodes.iter())
@ -308,10 +273,10 @@ where
BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout) BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout)
} }
/// Finds the nodes closest to `id`, ordered by distance. /// Finds the keys closest to `key`, ordered by distance.
/// ///
/// Pending nodes are ignored. /// Pending nodes are ignored.
pub fn find_closest(&mut self, id: &impl KBucketsPeerId<TPeerId>) -> VecIntoIter<TPeerId> { pub fn find_closest<T>(&mut self, key: &Key<T>) -> VecIntoIter<Key<TPeerId>> {
// TODO: optimize // TODO: optimize
let mut out = Vec::new(); let mut out = Vec::new();
for table in self.tables.iter_mut() { for table in self.tables.iter_mut() {
@ -331,7 +296,7 @@ where
} }
} }
} }
out.sort_by(|a, b| id.distance_with(a).cmp(&id.distance_with(b))); out.sort_by(|a, b| key.distance(a).cmp(&key.distance(b)));
out.into_iter() out.into_iter()
} }
} }
@ -354,7 +319,7 @@ pub enum Entry<'a, TPeerId, TVal> {
impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal>
where where
TPeerId: KBucketsPeerId + Clone, TPeerId: Clone,
{ {
/// Returns the value associated to the entry in the bucket, including if the node is pending. /// Returns the value associated to the entry in the bucket, including if the node is pending.
pub fn value(&mut self) -> Option<&mut TVal> { pub fn value(&mut self) -> Option<&mut TVal> {
@ -384,12 +349,12 @@ where
/// Represents an entry in a k-bucket. /// Represents an entry in a k-bucket.
pub struct EntryInKbucketConn<'a, TPeerId, TVal> { pub struct EntryInKbucketConn<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>, parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId, peer_id: &'a Key<TPeerId>,
} }
impl<'a, TPeerId, TVal> EntryInKbucketConn<'a, TPeerId, TVal> impl<'a, TPeerId, TVal> EntryInKbucketConn<'a, TPeerId, TVal>
where where
TPeerId: KBucketsPeerId + Clone, TPeerId: Clone,
{ {
/// Returns the value associated to the entry in the bucket. /// Returns the value associated to the entry in the bucket.
pub fn value(&mut self) -> &mut TVal { pub fn value(&mut self) -> &mut TVal {
@ -470,7 +435,7 @@ pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> {
Replaced { Replaced {
/// Node that replaced the node. /// Node that replaced the node.
// TODO: could be a EntryInKbucketConn, but we have borrow issues with the new peer id // TODO: could be a EntryInKbucketConn, but we have borrow issues with the new peer id
replacement: TPeerId, replacement: Key<TPeerId>,
/// Value os the node that has been pushed out. /// Value os the node that has been pushed out.
old_val: TVal, old_val: TVal,
}, },
@ -479,12 +444,12 @@ pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> {
/// Represents an entry waiting for a slot to be available in its k-bucket. /// Represents an entry waiting for a slot to be available in its k-bucket.
pub struct EntryInKbucketConnPending<'a, TPeerId, TVal> { pub struct EntryInKbucketConnPending<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>, parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId, peer_id: &'a Key<TPeerId>,
} }
impl<'a, TPeerId, TVal> EntryInKbucketConnPending<'a, TPeerId, TVal> impl<'a, TPeerId, TVal> EntryInKbucketConnPending<'a, TPeerId, TVal>
where where
TPeerId: KBucketsPeerId + Clone, TPeerId: Clone,
{ {
/// Returns the value associated to the entry in the bucket. /// Returns the value associated to the entry in the bucket.
pub fn value(&mut self) -> &mut TVal { pub fn value(&mut self) -> &mut TVal {
@ -526,12 +491,12 @@ where
/// Represents an entry waiting for a slot to be available in its k-bucket. /// Represents an entry waiting for a slot to be available in its k-bucket.
pub struct EntryInKbucketDiscPending<'a, TPeerId, TVal> { pub struct EntryInKbucketDiscPending<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>, parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId, peer_id: &'a Key<TPeerId>,
} }
impl<'a, TPeerId, TVal> EntryInKbucketDiscPending<'a, TPeerId, TVal> impl<'a, TPeerId, TVal> EntryInKbucketDiscPending<'a, TPeerId, TVal>
where where
TPeerId: KBucketsPeerId + Clone, TPeerId: Clone,
{ {
/// Returns the value associated to the entry in the bucket. /// Returns the value associated to the entry in the bucket.
pub fn value(&mut self) -> &mut TVal { pub fn value(&mut self) -> &mut TVal {
@ -573,12 +538,12 @@ where
/// Represents an entry in a k-bucket. /// Represents an entry in a k-bucket.
pub struct EntryInKbucketDisc<'a, TPeerId, TVal> { pub struct EntryInKbucketDisc<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>, parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId, peer_id: &'a Key<TPeerId>,
} }
impl<'a, TPeerId, TVal> EntryInKbucketDisc<'a, TPeerId, TVal> impl<'a, TPeerId, TVal> EntryInKbucketDisc<'a, TPeerId, TVal>
where where
TPeerId: KBucketsPeerId + Clone, TPeerId: Clone,
{ {
/// Returns the value associated to the entry in the bucket. /// Returns the value associated to the entry in the bucket.
pub fn value(&mut self) -> &mut TVal { pub fn value(&mut self) -> &mut TVal {
@ -640,12 +605,12 @@ where
/// Represents an entry not in any k-bucket. /// Represents an entry not in any k-bucket.
pub struct EntryNotInKbucket<'a, TPeerId, TVal> { pub struct EntryNotInKbucket<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>, parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId, peer_id: &'a Key<TPeerId>,
} }
impl<'a, TPeerId, TVal> EntryNotInKbucket<'a, TPeerId, TVal> impl<'a, TPeerId, TVal> EntryNotInKbucket<'a, TPeerId, TVal>
where where
TPeerId: KBucketsPeerId + Clone, TPeerId: Clone,
{ {
/// Inserts the node as connected, if possible. /// Inserts the node as connected, if possible.
pub fn insert_connected(self, value: TVal) -> InsertOutcome<TPeerId> { pub fn insert_connected(self, value: TVal) -> InsertOutcome<TPeerId> {
@ -703,14 +668,14 @@ where
/// Outcome of calling `insert`. /// Outcome of calling `insert`.
#[must_use] #[must_use]
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum InsertOutcome<TPeerId> { pub enum InsertOutcome<TPeerId> {
/// The entry has been successfully inserted. /// The entry has been successfully inserted.
Inserted, Inserted,
/// The entry has been inserted as a pending node. /// The entry has been inserted as a pending node.
Pending { Pending {
/// We have to try connect to the returned node. /// We have to try connect to the returned node.
to_ping: TPeerId, to_ping: Key<TPeerId>,
}, },
/// The entry was not inserted because the bucket was full of connected nodes. /// The entry was not inserted because the bucket was full of connected nodes.
Full, Full,
@ -760,17 +725,68 @@ impl<'a, TPeerId, TVal> Bucket<'a, TPeerId, TVal> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::kbucket::{Entry, InsertOutcome, KBucketsPeerId, KBucketsTable, MAX_NODES_PER_BUCKET}; use super::*;
use multihash::{Multihash, Hash}; use quickcheck::*;
use std::thread; use libp2p_core::PeerId;
use crate::kbucket::{Entry, InsertOutcome, KBucketsTable, MAX_NODES_PER_BUCKET};
use std::time::Duration; use std::time::Duration;
impl Arbitrary for Key<PeerId> {
fn arbitrary<G: Gen>(_: &mut G) -> Key<PeerId> {
Key::from(PeerId::random())
}
}
#[test]
fn identity() {
fn prop(a: Key<PeerId>) -> bool {
a.distance(&a) == Distance::default()
}
quickcheck(prop as fn(_) -> _)
}
#[test]
fn symmetry() {
fn prop(a: Key<PeerId>, b: Key<PeerId>) -> bool {
a.distance(&b) == b.distance(&a)
}
quickcheck(prop as fn(_,_) -> _)
}
#[test]
fn triangle_inequality() {
fn prop(a: Key<PeerId>, b: Key<PeerId>, c: Key<PeerId>) -> TestResult {
let ab = a.distance(&b);
let bc = b.distance(&c);
let (ab_plus_bc, overflow) = ab.0.overflowing_add(bc.0);
if overflow {
TestResult::discard()
} else {
TestResult::from_bool(a.distance(&c) <= Distance(ab_plus_bc))
}
}
quickcheck(prop as fn(_,_,_) -> _)
}
#[test]
fn unidirectionality() {
fn prop(a: Key<PeerId>, b: Key<PeerId>) -> bool {
let d = a.distance(&b);
(0..100).all(|_| {
let c = Key::from(PeerId::random());
a.distance(&c) != d || b == c
})
}
quickcheck(prop as fn(_,_) -> _)
}
#[test] #[test]
fn basic_closest() { fn basic_closest() {
let my_id = Multihash::random(Hash::SHA2256); let my_key = Key::from(PeerId::random());
let other_id = Multihash::random(Hash::SHA2256); let other_id = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5)); let mut table = KBucketsTable::<_, ()>::new(my_key, Duration::from_secs(5));
if let Entry::NotInKbucket(entry) = table.entry(&other_id) { if let Entry::NotInKbucket(entry) = table.entry(&other_id) {
match entry.insert_connected(()) { match entry.insert_connected(()) {
InsertOutcome::Inserted => (), InsertOutcome::Inserted => (),
@ -787,10 +803,10 @@ mod tests {
#[test] #[test]
fn update_local_id_fails() { fn update_local_id_fails() {
let my_id = Multihash::random(Hash::SHA2256); let my_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5)); let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5));
match table.entry(&my_id) { match table.entry(&my_key) {
Entry::SelfEntry => (), Entry::SelfEntry => (),
_ => panic!(), _ => panic!(),
} }
@ -798,23 +814,32 @@ mod tests {
#[test] #[test]
fn full_kbucket() { fn full_kbucket() {
let my_id = Multihash::random(Hash::SHA2256); let my_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5));
// Step 1: Fill the most distant bucket, i.e. bucket index `NUM_BUCKETS - 1`,
// with "disconnected" peers.
// Prepare `MAX_NODES_PER_BUCKET` keys to fill the bucket, plus 2
// additional keys which will be used to test the behavior on a full
// bucket.
assert!(MAX_NODES_PER_BUCKET <= 251); // Test doesn't work otherwise. assert!(MAX_NODES_PER_BUCKET <= 251); // Test doesn't work otherwise.
let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3) let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3)
.map(|n| { .map(|n| {
let mut id = my_id.clone().into_bytes(); let mut id = my_key.clone();
id[2] ^= 0x80; // Flip the first bit so that we get in the most distant bucket. // Flip the first bit so that we get in the most distant bucket.
id[33] = id[33].wrapping_add(n as u8); id.hash[0] ^= 0x80;
Multihash::from_bytes(id).unwrap() // Each ID gets a unique low-order byte (i.e. counter).
id.hash[31] = id.hash[31].wrapping_add(n as u8);
id
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let first_node = fill_ids[0].clone(); let first_node = fill_ids[0].clone();
let second_node = fill_ids[1].clone(); let second_node = fill_ids[1].clone();
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(1)); // Fill the bucket, consuming all but the last 3 test keys.
for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() {
if let Entry::NotInKbucket(entry) = table.entry(&id) { if let Entry::NotInKbucket(entry) = table.entry(&id) {
match entry.insert_disconnected(()) { match entry.insert_disconnected(()) {
@ -826,12 +851,16 @@ mod tests {
} }
assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1);
} }
assert_eq!( assert_eq!(
table.buckets().nth(255).unwrap().num_entries(), table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET MAX_NODES_PER_BUCKET
); );
assert!(!table.buckets().nth(255).unwrap().has_pending()); assert!(!table.buckets().nth(255).unwrap().has_pending());
// Step 2: Insert another key on the full bucket. It must be marked as
// pending and the first (i.e. "least recently used") entry scheduled
// for replacement.
if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) {
match entry.insert_connected(()) { match entry.insert_connected(()) {
InsertOutcome::Pending { ref to_ping } if *to_ping == first_node => (), InsertOutcome::Pending { ref to_ping } if *to_ping == first_node => (),
@ -840,13 +869,13 @@ mod tests {
} else { } else {
panic!() panic!()
} }
assert_eq!( assert_eq!(
table.buckets().nth(255).unwrap().num_entries(), table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET MAX_NODES_PER_BUCKET
); );
assert!(table.buckets().nth(255).unwrap().has_pending()); assert!(table.buckets().nth(255).unwrap().has_pending());
if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { // Trying to insert yet another key is rejected.
if let Entry::NotInKbucket(entry) = table.entry(&Key::from(fill_ids.remove(0))) {
match entry.insert_connected(()) { match entry.insert_connected(()) {
InsertOutcome::Full => (), InsertOutcome::Full => (),
_ => panic!() _ => panic!()
@ -855,7 +884,12 @@ mod tests {
panic!() panic!()
} }
thread::sleep(Duration::from_secs(2)); // Step 3: Make the pending nodes eligible for replacing existing nodes.
// The pending node must be consumed and replace the first (i.e. "least
// recently used") node.
let elapsed = Instant::now() - Duration::from_secs(1);
table.tables[255].pending_node.as_mut().map(|n| n.replace = elapsed);
assert!(!table.buckets().nth(255).unwrap().has_pending()); assert!(!table.buckets().nth(255).unwrap().has_pending());
if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) {
match entry.insert_connected(()) { match entry.insert_connected(()) {
@ -866,18 +900,4 @@ mod tests {
panic!() panic!()
} }
} }
#[test]
fn self_distance_zero() {
let a = Multihash::random(Hash::SHA2256);
assert_eq!(a.distance_with(&a), 0);
}
#[test]
fn distance_correct_order() {
let a = Multihash::random(Hash::SHA2256);
let b = Multihash::random(Hash::SHA2256);
assert!(a.distance_with(&a) < b.distance_with(&a));
assert!(a.distance_with(&b) > b.distance_with(&b));
}
} }

View File

@ -25,7 +25,6 @@
#![allow(dead_code)] #![allow(dead_code)]
pub use self::behaviour::{Kademlia, KademliaOut}; pub use self::behaviour::{Kademlia, KademliaOut};
pub use self::kbucket::KBucketsPeerId;
pub use self::protocol::KadConnectionType; pub use self::protocol::KadConnectionType;
pub mod handler; pub mod handler;
@ -34,6 +33,5 @@ pub mod protocol;
mod addresses; mod addresses;
mod behaviour; mod behaviour;
mod kad_hash;
mod protobuf_structs; mod protobuf_structs;
mod query; mod query;

View File

@ -23,7 +23,7 @@
//! This allows one to create queries that iterate on the DHT on nodes that become closer and //! This allows one to create queries that iterate on the DHT on nodes that become closer and
//! closer to the target. //! closer to the target.
use crate::kbucket::KBucketsPeerId; use crate::kbucket;
use futures::prelude::*; use futures::prelude::*;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{cmp::PartialEq, time::Duration}; use std::{cmp::PartialEq, time::Duration};
@ -44,13 +44,16 @@ pub struct QueryState<TTarget, TPeerId> {
/// Target we're looking for. /// Target we're looking for.
target: TTarget, target: TTarget,
/// The `kbucket::Key` representation of the `target`.
target_key: kbucket::Key<TTarget>,
/// Stage of the query. See the documentation of `QueryStage`. /// Stage of the query. See the documentation of `QueryStage`.
stage: QueryStage, stage: QueryStage,
/// Ordered list of the peers closest to the result we're looking for. /// Ordered list of the peers closest to the result we're looking for.
/// Entries that are `InProgress` shouldn't be removed from the list before they complete. /// Entries that are `InProgress` shouldn't be removed from the list before they complete.
/// Must never contain two entries with the same peer IDs. /// Must never contain two entries with the same peer IDs.
closest_peers: SmallVec<[(TPeerId, QueryPeerState); 32]>, closest_peers: SmallVec<[(kbucket::Key<TPeerId>, QueryPeerState); 32]>,
/// Allowed level of parallelism. /// Allowed level of parallelism.
parallelism: usize, parallelism: usize,
@ -100,25 +103,27 @@ enum QueryStage {
impl<TTarget, TPeerId> QueryState<TTarget, TPeerId> impl<TTarget, TPeerId> QueryState<TTarget, TPeerId>
where where
TPeerId: Eq, TTarget: Into<kbucket::Key<TTarget>> + Clone,
TTarget: KBucketsPeerId<TPeerId> TPeerId: Into<kbucket::Key<TPeerId>> + Eq
{ {
/// Creates a new query. /// Creates a new query.
/// ///
/// You should call `poll()` this function returns in order to know what to do. /// You should call `poll()` this function returns in order to know what to do.
pub fn new(config: QueryConfig<impl IntoIterator<Item = TPeerId>, TTarget>) -> Self { pub fn new(config: QueryConfig<impl IntoIterator<Item = kbucket::Key<TPeerId>>, TTarget>) -> Self {
let mut closest_peers: SmallVec<[_; 32]> = config let mut closest_peers: SmallVec<[_; 32]> = config
.known_closest_peers .known_closest_peers
.into_iter() .into_iter()
.map(|peer_id| (peer_id, QueryPeerState::NotContacted)) .map(|key| (key, QueryPeerState::NotContacted))
.take(config.num_results) .take(config.num_results)
.collect(); .collect();
let target = config.target;
closest_peers.sort_by_key(|e| target.distance_with(&e.0)); let target_key = config.target.clone().into();
closest_peers.sort_by_key(|e| target_key.distance(&e.0));
closest_peers.dedup_by(|a, b| a.0 == b.0); closest_peers.dedup_by(|a, b| a.0 == b.0);
QueryState { QueryState {
target, target: config.target,
target_key,
stage: QueryStage::Iterating { stage: QueryStage::Iterating {
no_closer_in_a_row: 0, no_closer_in_a_row: 0,
}, },
@ -158,7 +163,7 @@ where
) { ) {
// Mark the peer as succeeded. // Mark the peer as succeeded.
for (peer_id, state) in self.closest_peers.iter_mut() { for (peer_id, state) in self.closest_peers.iter_mut() {
if result_source == peer_id { if result_source == peer_id.preimage() {
if let state @ QueryPeerState::InProgress(_) = state { if let state @ QueryPeerState::InProgress(_) = state {
*state = QueryPeerState::Succeeded; *state = QueryPeerState::Succeeded;
} }
@ -172,14 +177,16 @@ where
ref mut no_closer_in_a_row, ref mut no_closer_in_a_row,
} = self.stage } = self.stage
{ {
let target = &self.target_key;
// We increment now, and reset to 0 if we find a closer node. // We increment now, and reset to 0 if we find a closer node.
*no_closer_in_a_row += 1; *no_closer_in_a_row += 1;
for elem_to_add in closer_peers { for peer in closer_peers {
let target = &self.target; let peer_key = peer.into();
let elem_to_add_distance = target.distance_with(&elem_to_add); let peer_distance = target.distance(&peer_key);
let insert_pos_start = self.closest_peers.iter().position(|(id, _)| { let insert_pos_start = self.closest_peers.iter().position(|(key, _)| {
target.distance_with(&id) >= elem_to_add_distance target.distance(&key) >= peer_distance
}); });
if let Some(insert_pos_start) = insert_pos_start { if let Some(insert_pos_start) = insert_pos_start {
@ -187,30 +194,29 @@ where
// `insert_pos_start + insert_pos_size`. // `insert_pos_start + insert_pos_size`.
let insert_pos_size = self.closest_peers.iter() let insert_pos_size = self.closest_peers.iter()
.skip(insert_pos_start) .skip(insert_pos_start)
.position(|(id, _)| { .position(|(key, _)| {
target.distance_with(&id) > elem_to_add_distance target.distance(&key) > peer_distance
}); });
// Make sure we don't insert duplicates. // Make sure we don't insert duplicates.
let mut iter_start = self.closest_peers.iter().skip(insert_pos_start); let mut iter_start = self.closest_peers.iter().skip(insert_pos_start);
let duplicate = if let Some(insert_pos_size) = insert_pos_size { let duplicate = if let Some(insert_pos_size) = insert_pos_size {
iter_start.take(insert_pos_size).any(|e| e.0 == elem_to_add) iter_start.take(insert_pos_size).any(|e| e.0 == peer_key)
} else { } else {
iter_start.any(|e| e.0 == elem_to_add) iter_start.any(|e| e.0 == peer_key)
}; };
if !duplicate { if !duplicate {
if insert_pos_start == 0 { if insert_pos_start == 0 {
*no_closer_in_a_row = 0; *no_closer_in_a_row = 0;
} }
debug_assert!(self.closest_peers.iter().all(|e| e.0 != elem_to_add)); debug_assert!(self.closest_peers.iter().all(|e| e.0 != peer_key));
self.closest_peers self.closest_peers
.insert(insert_pos_start, (elem_to_add, QueryPeerState::NotContacted)); .insert(insert_pos_start, (peer_key, QueryPeerState::NotContacted));
} }
} else if num_closest < self.num_results { } else if num_closest < self.num_results {
debug_assert!(self.closest_peers.iter().all(|e| e.0 != elem_to_add)); debug_assert!(self.closest_peers.iter().all(|e| e.0 != peer_key));
self.closest_peers self.closest_peers.push((peer_key, QueryPeerState::NotContacted));
.push((elem_to_add, QueryPeerState::NotContacted));
} }
} }
} }
@ -246,7 +252,7 @@ where
QueryPeerState::Failed => false, QueryPeerState::Failed => false,
} }
}) })
.map(|(id, _)| id) .map(|(key, _)| key.preimage())
} }
/// Returns true if we are waiting for a query answer from that peer. /// Returns true if we are waiting for a query answer from that peer.
@ -268,7 +274,7 @@ where
.closest_peers .closest_peers
.iter_mut() .iter_mut()
.find_map(|(peer_id, state)| .find_map(|(peer_id, state)|
if peer_id == id { if peer_id.preimage() == id {
Some(state) Some(state)
} else { } else {
None None
@ -303,7 +309,7 @@ where
match timeout.poll() { match timeout.poll() {
Ok(Async::Ready(_)) | Err(_) => { Ok(Async::Ready(_)) | Err(_) => {
*state = QueryPeerState::Failed; *state = QueryPeerState::Failed;
return Async::Ready(QueryStatePollOut::CancelRpc { peer_id }); return Async::Ready(QueryStatePollOut::CancelRpc { peer_id: peer_id.preimage() });
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
active_counter += 1 active_counter += 1
@ -328,7 +334,7 @@ where
let delay = Delay::new(Instant::now() + self.rpc_timeout); let delay = Delay::new(Instant::now() + self.rpc_timeout);
*state = QueryPeerState::InProgress(delay); *state = QueryPeerState::InProgress(delay);
return Async::Ready(QueryStatePollOut::SendRpc { return Async::Ready(QueryStatePollOut::SendRpc {
peer_id, peer_id: peer_id.preimage(),
query_target: &self.target, query_target: &self.target,
}); });
} }
@ -353,7 +359,7 @@ where
.into_iter() .into_iter()
.filter_map(|(peer_id, state)| { .filter_map(|(peer_id, state)| {
if let QueryPeerState::Succeeded = state { if let QueryPeerState::Succeeded = state {
Some(peer_id) Some(peer_id.into_preimage())
} else { } else {
None None
} }
@ -423,7 +429,7 @@ enum QueryPeerState {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{QueryConfig, QueryState, QueryStatePollOut}; use super::{kbucket, QueryConfig, QueryState, QueryStatePollOut};
use futures::{self, try_ready, prelude::*}; use futures::{self, try_ready, prelude::*};
use libp2p_core::PeerId; use libp2p_core::PeerId;
use std::{iter, time::Duration, sync::Arc, sync::Mutex, thread}; use std::{iter, time::Duration, sync::Arc, sync::Mutex, thread};
@ -432,11 +438,12 @@ mod tests {
#[test] #[test]
fn start_by_sending_rpc_to_known_peers() { fn start_by_sending_rpc_to_known_peers() {
let random_id = PeerId::random(); let random_id = PeerId::random();
let random_key = kbucket::Key::new(random_id.clone());
let target = PeerId::random(); let target = PeerId::random();
let mut query = QueryState::new(QueryConfig { let mut query = QueryState::new(QueryConfig {
target, target,
known_closest_peers: iter::once(random_id.clone()), known_closest_peers: iter::once(random_key),
parallelism: 3, parallelism: 3,
num_results: 100, num_results: 100,
rpc_timeout: Duration::from_secs(10), rpc_timeout: Duration::from_secs(10),
@ -455,12 +462,13 @@ mod tests {
#[test] #[test]
fn continue_second_result() { fn continue_second_result() {
let random_id = PeerId::random(); let random_id = PeerId::random();
let random_key = kbucket::Key::from(random_id.clone());
let random_id2 = PeerId::random(); let random_id2 = PeerId::random();
let target = PeerId::random(); let target = PeerId::random();
let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { let query = Arc::new(Mutex::new(QueryState::new(QueryConfig {
target, target,
known_closest_peers: iter::once(random_id.clone()), known_closest_peers: iter::once(random_key),
parallelism: 3, parallelism: 3,
num_results: 100, num_results: 100,
rpc_timeout: Duration::from_secs(10), rpc_timeout: Duration::from_secs(10),
@ -500,10 +508,11 @@ mod tests {
#[test] #[test]
fn timeout_works() { fn timeout_works() {
let random_id = PeerId::random(); let random_id = PeerId::random();
let random_key = kbucket::Key::from(random_id.clone());
let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { let query = Arc::new(Mutex::new(QueryState::new(QueryConfig {
target: PeerId::random(), target: PeerId::random(),
known_closest_peers: iter::once(random_id.clone()), known_closest_peers: iter::once(random_key),
parallelism: 3, parallelism: 3,
num_results: 100, num_results: 100,
rpc_timeout: Duration::from_millis(100), rpc_timeout: Duration::from_millis(100),