From 09f54df44d475d98cec333cd401d6bc20680bc13 Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Wed, 22 May 2019 14:49:38 +0200 Subject: [PATCH] Kademlia: Optimise iteration over closest keys / entries. (#1117) * Kademlia: Optimise iteration over closest entries. The current implementation for finding the entries whose keys are closest to some target key in the Kademlia routing table involves copying the keys of all buckets into a new `Vec` which is then sorted based on the distances to the target and turned into an iterator from which only a small number of elements (by default 20) are drawn. This commit introduces an iterator over buckets for finding the closest keys to a target that visits the buckets in the optimal order, based on the information contained in the distance bit-string representing the distance between the local key and the target. Correctness is tested against full-table scans. Also included: * Updated documentation. * The `Entry` API was moved to the `kbucket::entry` sub-module for ease of maintenance. * The pending node handling has been slightly refactored in order to bring code and documentation in agreement and clarify the semantics a little. * Rewrite pending node handling and add tests. --- examples/ipfs-kad.rs | 20 +- protocols/kad/src/addresses.rs | 7 +- protocols/kad/src/behaviour.rs | 329 ++++---- protocols/kad/src/behaviour/test.rs | 8 +- protocols/kad/src/kbucket.rs | 1200 ++++++++++----------------- protocols/kad/src/kbucket/bucket.rs | 552 ++++++++++++ protocols/kad/src/kbucket/entry.rs | 254 ++++++ protocols/kad/src/kbucket/key.rs | 156 ++++ protocols/kad/src/lib.rs | 2 +- protocols/kad/src/protocol.rs | 4 + protocols/kad/src/query.rs | 2 +- 11 files changed, 1558 insertions(+), 976 deletions(-) create mode 100644 protocols/kad/src/kbucket/bucket.rs create mode 100644 protocols/kad/src/kbucket/entry.rs create mode 100644 protocols/kad/src/kbucket/key.rs diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 79b3c2f0..afcd1b22 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -44,16 +44,16 @@ fn main() { // to insert our local node in the DHT. However here we use `without_init` because this // example is very ephemeral and we don't want to pollute the DHT. In a real world // application, you want to use `new` instead. - let mut behaviour = libp2p::kad::Kademlia::without_init(local_peer_id.clone()); - behaviour.add_connected_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap()); + let mut behaviour = libp2p::kad::Kademlia::new(local_peer_id.clone()); + behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap()); libp2p::core::Swarm::new(transport, behaviour, local_peer_id) }; diff --git a/protocols/kad/src/addresses.rs b/protocols/kad/src/addresses.rs index 4efb1423..3a1d6d57 100644 --- a/protocols/kad/src/addresses.rs +++ b/protocols/kad/src/addresses.rs @@ -36,11 +36,16 @@ impl Addresses { } } - /// Returns the list of addresses. + /// Returns an iterator over the list of addresses. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Converts the addresses into a `Vec`. + pub fn into_vec(self) -> Vec { + self.addrs.into_vec() + } + /// Returns true if the list of addresses is empty. pub fn is_empty(&self) -> bool { self.addrs.is_empty() diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 2fe8d4e0..2ca90e3c 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -20,7 +20,7 @@ use crate::addresses::Addresses; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn}; -use crate::kbucket::{self, KBucketsTable}; +use crate::kbucket::{self, KBucketsTable, NodeStatus}; use crate::protocol::{KadConnectionType, KadPeer}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; use fnv::{FnvHashMap, FnvHashSet}; @@ -74,13 +74,14 @@ pub struct Kademlia { /// perform in parallel. parallelism: usize, - /// `k` in the Kademlia reference papers. Number of results in a find node query. + /// The number of results to return from a query. Defaults to the maximum number + /// of entries in a single k-bucket, i.e. the `k` parameter. num_results: usize, - /// Timeout for each individual RPC query. + /// Timeout for a single RPC. rpc_timeout: Duration, - /// Events to return when polling. + /// Queued events to return when the behaviour is being polled. queued_events: SmallVec<[NetworkBehaviourAction, KademliaOut>; 32]>, /// List of providers to add to the topology as soon as we are in `poll()`. @@ -200,42 +201,37 @@ impl Kademlia { Self::new_inner(local_peer_id) } - /// Adds a known address for the given `PeerId`. We are connected to this address. - // TODO: report if the address was inserted? also, semantics unclear - pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.add_address(peer_id, address, true) - } - - /// Adds a known address for the given `PeerId`. We are not connected or don't know whether we - /// are connected to this address. - // TODO: report if the address was inserted? also, semantics unclear - pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.add_address(peer_id, address, false) - } - - /// Underlying implementation for `add_connected_address` and `add_not_connected_address`. - fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) { + /// Adds a known address of a peer participating in the Kademlia DHT to the + /// routing table. + /// + /// This allows prepopulating the Kademlia routing table with known + /// addresses, so that they can be used immediately in following DHT + /// operations involving one of these peers, without having to dial + /// them upfront. + pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { let key = kbucket::Key::new(peer_id.clone()); match self.kbuckets.entry(&key) { - kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address), - kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address), - kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address), - kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert(address), - kbucket::Entry::NotInKbucket(entry) => { + kbucket::Entry::Present(mut entry, _) => { + entry.value().insert(address); + } + kbucket::Entry::Pending(mut entry, _) => { + entry.value().insert(address); + } + kbucket::Entry::Absent(entry) => { let mut addresses = Addresses::new(); addresses.insert(address); - match entry.insert_disconnected(addresses) { - kbucket::InsertOutcome::Inserted => { + match entry.insert(addresses, NodeStatus::Disconnected) { + kbucket::InsertResult::Inserted => { let event = KademliaOut::KBucketAdded { peer_id: peer_id.clone(), replaced: None, }; self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); }, - kbucket::InsertOutcome::Full => (), - kbucket::InsertOutcome::Pending { to_ping } => { + kbucket::InsertResult::Full => (), + kbucket::InsertResult::Pending { disconnected } => { self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.into_preimage(), + peer_id: disconnected.into_preimage(), }) }, } @@ -261,16 +257,17 @@ impl Kademlia { providing_keys: FnvHashSet::default(), refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant parallelism, - num_results: 20, + num_results: kbucket::MAX_NODES_PER_BUCKET, rpc_timeout: Duration::from_secs(8), add_provider: SmallVec::new(), marker: PhantomData, } } - /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. - pub fn kbuckets_entries(&self) -> impl Iterator { - self.kbuckets.entries_not_pending().map(|(key, _)| key.preimage()) + /// Returns an iterator over all peer IDs of nodes currently contained in a bucket + /// of the Kademlia routing table. + pub fn kbuckets_entries(&mut self) -> impl Iterator { + self.kbuckets.iter().map(|entry| entry.node.key.preimage()) } /// Starts an iterative `FIND_NODE` request. @@ -334,8 +331,10 @@ impl Kademlia { untrusted_addresses: Default::default(), }; + let target_key = kbucket::Key::new(target.clone()); + let known_closest_peers = self.kbuckets - .find_closest(&kbucket::Key::new(target.clone())) + .closest_keys(&target_key) .take(self.num_results); self.active_queries.insert( @@ -376,6 +375,83 @@ impl Kademlia { query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id)) } } + + /// Finds the closest peers to a `target` in the context of a request by + /// the `source` peer, such that the `source` peer is never included in the + /// result. + fn find_closest(&mut self, target: &kbucket::Key, source: &PeerId) -> Vec { + self.kbuckets + .closest(target) + .filter(|e| e.node.key.preimage() != source) + .take(self.num_results) + .map(KadPeer::from) + .collect() + } + + /// Collects all peers who are known to be providers of the value for a given `Multihash`. + fn provider_peers(&mut self, key: &Multihash, source: &PeerId) -> Vec { + let kbuckets = &mut self.kbuckets; + self.values_providers + .get(key) + .into_iter() + .flat_map(|peers| peers) + .filter_map(move |p| + if p != source { + let key = kbucket::Key::new(p.clone()); + kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned())) + } else { + None + }) + .collect() + } + + /// Update the connection status of a peer in the Kademlia routing table. + fn connection_updated(&mut self, peer: PeerId, address: Option, new_status: NodeStatus) { + let key = kbucket::Key::new(peer.clone()); + match self.kbuckets.entry(&key) { + kbucket::Entry::Present(mut entry, old_status) => { + if let Some(address) = address { + entry.value().insert(address); + } + if old_status != new_status { + entry.update(new_status); + } + }, + + kbucket::Entry::Pending(mut entry, old_status) => { + if let Some(address) = address { + entry.value().insert(address); + } + if old_status != new_status { + entry.update(new_status); + } + }, + + kbucket::Entry::Absent(entry) => if new_status == NodeStatus::Connected { + let mut addresses = Addresses::new(); + if let Some(address) = address { + addresses.insert(address); + } + match entry.insert(addresses, new_status) { + kbucket::InsertResult::Inserted => { + let event = KademliaOut::KBucketAdded { + peer_id: peer.clone(), + replaced: None, + }; + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); + }, + kbucket::InsertResult::Full => (), + kbucket::InsertResult::Pending { disconnected } => { + debug_assert!(!self.connected_peers.contains(disconnected.preimage())); + self.queued_events.push(NetworkBehaviourAction::DialPeer { + peer_id: disconnected.into_preimage(), + }) + }, + } + }, + _ => {} + } + } } impl NetworkBehaviour for Kademlia @@ -396,11 +472,13 @@ where fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { // We should order addresses from decreasing likelyhood of connectivity, so start with // the addresses of that peer in the k-buckets. - let mut out_list = self.kbuckets - .entry(&kbucket::Key::new(peer_id.clone())) - .value_not_pending() - .map(|l| l.iter().cloned().collect::>()) - .unwrap_or_else(Vec::new); + let key = kbucket::Key::new(peer_id.clone()); + let mut out_list = + if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) { + entry.value().iter().cloned().collect::>() + } else { + Vec::new() + }; // We add to that a temporary list of addresses from the ongoing queries. for query in self.active_queries.values() { @@ -428,57 +506,7 @@ where ConnectedPoint::Listener { .. } => None, }; - let key = kbucket::Key::new(id.clone()); - - match self.kbuckets.entry(&key) { - kbucket::Entry::InKbucketConnected(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - kbucket::Entry::InKbucketConnectedPending(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - - kbucket::Entry::InKbucketDisconnected(mut entry) => { - if let Some(address) = address { - entry.value().insert(address); - } - entry.set_connected(); - }, - - kbucket::Entry::InKbucketDisconnectedPending(mut entry) => { - if let Some(address) = address { - entry.value().insert(address); - } - entry.set_connected(); - }, - - kbucket::Entry::NotInKbucket(entry) => { - let mut addresses = Addresses::new(); - if let Some(address) = address { - addresses.insert(address); - } - match entry.insert_connected(addresses) { - kbucket::InsertOutcome::Inserted => { - let event = KademliaOut::KBucketAdded { - peer_id: id.clone(), - replaced: None, - }; - self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); - }, - kbucket::InsertOutcome::Full => (), - kbucket::InsertOutcome::Pending { to_ping } => { - self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.into_preimage(), - }) - }, - } - }, - - kbucket::Entry::SelfEntry => { - unreachable!("Guaranteed to never receive disconnected even for self; QED") - }, - } - + self.connection_updated(id.clone(), address, NodeStatus::Connected); self.connected_peers.insert(id); } @@ -486,10 +514,10 @@ where if let Some(peer_id) = peer_id { let key = kbucket::Key::new(peer_id.clone()); - if let Some(list) = self.kbuckets.entry(&key).value() { + if let Some(addrs) = self.kbuckets.entry(&key).value() { // TODO: don't remove the address if the error is that we are already connected // to this peer - list.remove(addr); + addrs.remove(addr); } for query in self.active_queries.values_mut() { @@ -507,40 +535,11 @@ where } fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) { - let was_in = self.connected_peers.remove(id); - debug_assert!(was_in); - for query in self.active_queries.values_mut() { query.inject_rpc_error(id); } - - match self.kbuckets.entry(&kbucket::Key::new(id.clone())) { - kbucket::Entry::InKbucketConnected(entry) => { - match entry.set_disconnected() { - kbucket::SetDisconnectedOutcome::Kept(_) => {}, - kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => { - let event = KademliaOut::KBucketAdded { - peer_id: replacement.into_preimage(), - replaced: Some(id.clone()), - }; - self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); - }, - } - }, - kbucket::Entry::InKbucketConnectedPending(entry) => { - entry.set_disconnected(); - }, - kbucket::Entry::InKbucketDisconnected(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - kbucket::Entry::InKbucketDisconnectedPending(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - kbucket::Entry::NotInKbucket(_) => {}, - kbucket::Entry::SelfEntry => { - unreachable!("Guaranteed to never receive disconnected even for self; QED") - }, - } + self.connection_updated(id.clone(), None, NodeStatus::Disconnected); + self.connected_peers.remove(id); } fn inject_replaced(&mut self, peer_id: PeerId, _old: ConnectedPoint, new_endpoint: ConnectedPoint) { @@ -554,9 +553,9 @@ where } } - if let Some(list) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() { + if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() { if let ConnectedPoint::Dialer { address } = new_endpoint { - list.insert(address); + addrs.insert(address); } } } @@ -564,13 +563,7 @@ where fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent) { match event { KademliaHandlerEvent::FindNodeReq { key, request_id } => { - let closer_peers = self.kbuckets - .find_closest(&kbucket::Key::new(key)) - .filter(|p| p.preimage() != &source) - .take(self.num_results) - .map(|key| build_kad_peer(&key, &mut self.kbuckets)) - .collect(); - + let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); self.queued_events.push(NetworkBehaviourAction::SendEvent { peer_id: source, event: KademliaHandlerIn::FindNodeRes { @@ -586,23 +579,8 @@ where self.discovered(&user_data, &source, closer_peers.iter()); } KademliaHandlerEvent::GetProvidersReq { key, request_id } => { - let provider_peers = { - let kbuckets = &mut self.kbuckets; - self.values_providers - .get(&key) - .into_iter() - .flat_map(|peers| peers) - .filter(|p| *p != &source) - .map(move |peer_id| build_kad_peer(&kbucket::Key::new(peer_id.clone()), kbuckets)) - .collect() - }; - - let closer_peers = self.kbuckets - .find_closest(&kbucket::Key::from(key)) - .take(self.num_results) - .map(|key| build_kad_peer(&key, &mut self.kbuckets)) - .collect(); - + let provider_peers = self.provider_peers(&key, &source); + let closer_peers = self.find_closest(&kbucket::Key::from(key), &source); self.queued_events.push(NetworkBehaviourAction::SendEvent { peer_id: source, event: KademliaHandlerIn::GetProvidersRes { @@ -660,7 +638,7 @@ where // Flush the changes to the topology that we want to make. for (key, provider) in self.add_provider.drain() { // Don't add ourselves to the providers. - if provider == *self.kbuckets.local_key().preimage() { + if &provider == self.kbuckets.local_key().preimage() { continue; } let providers = self.values_providers.entry(key).or_insert_with(Default::default); @@ -683,12 +661,21 @@ where } loop { - // Handle events queued by other parts of this struct + // Drain queued events first. if !self.queued_events.is_empty() { return Async::Ready(self.queued_events.remove(0)); } self.queued_events.shrink_to_fit(); + // Drain applied pending entries from the routing table. + if let Some(entry) = self.kbuckets.take_applied_pending() { + let event = KademliaOut::KBucketAdded { + peer_id: entry.inserted.into_preimage(), + replaced: entry.evicted.map(|n| n.key.into_preimage()) + }; + return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) + } + // If iterating finds a query that is finished, stores it here and stops looping. let mut finished_query = None; @@ -751,16 +738,18 @@ where break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); }, QueryInfoInner::AddProvider { target } => { - let local_key = kbucket::Key::new(parameters.local_peer_id().clone()); for closest in closer_peers { let event = NetworkBehaviourAction::SendEvent { peer_id: closest, event: KademliaHandlerIn::AddProvider { key: target.clone(), - provider_peer: build_kad_peer(&local_key, &mut self.kbuckets), + provider_peer: KadPeer { + node_id: parameters.local_peer_id().clone(), + multiaddrs: parameters.external_addresses().cloned().collect(), + connection_ty: KadConnectionType::Connected, + } }, }; - self.queued_events.push(event); } }, @@ -815,26 +804,16 @@ pub enum KademliaOut { }, } -/// Builds a `KadPeer` struct corresponding to the given `PeerId`. -/// The `PeerId` cannot be the same as the local one. -/// -/// > **Note**: This is just a convenience function that doesn't do anything note-worthy. -fn build_kad_peer( - key: &kbucket::Key, - kbuckets: &mut KBucketsTable -) -> KadPeer { - let (multiaddrs, connection_ty) = match kbuckets.entry(key) { - kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection? - kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), - kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), - kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), - kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), - kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the kbucket::Key of the local ID"), - }; - - KadPeer { - node_id: key.preimage().clone(), - multiaddrs, - connection_ty, +impl From> for KadPeer { + fn from(e: kbucket::EntryView) -> KadPeer { + KadPeer { + node_id: e.node.key.into_preimage(), + multiaddrs: e.node.value.into_vec(), + connection_ty: match e.status { + NodeStatus::Connected => KadConnectionType::Connected, + NodeStatus::Disconnected => KadConnectionType::NotConnected + } + } } } + diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 72d2cb0f..4747e889 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -97,7 +97,7 @@ fn query_iter() { // Connect each swarm in the list to its predecessor in the list. for (i, (swarm, peer)) in &mut swarms.iter_mut().skip(1).zip(swarm_ids.clone()).enumerate() { - swarm.add_not_connected_address(&peer, Protocol::Memory(port_base + i as u64).into()) + swarm.add_address(&peer, Protocol::Memory(port_base + i as u64).into()) } // Ask the last peer in the list to search a random peer. The search should @@ -150,7 +150,7 @@ fn unresponsive_not_returned_direct() { // Add fake addresses. for _ in 0 .. 10 { - swarms[0].add_not_connected_address(&PeerId::random(), Protocol::Udp(10u16).into()); + swarms[0].add_address(&PeerId::random(), Protocol::Udp(10u16).into()); } // Ask first to search a random value. @@ -189,14 +189,14 @@ fn unresponsive_not_returned_indirect() { // Add fake addresses to first. let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); for _ in 0 .. 10 { - swarms[0].add_not_connected_address( + swarms[0].add_address( &PeerId::random(), multiaddr![Udp(10u16)] ); } // Connect second to first. - swarms[1].add_not_connected_address(&first_peer_id, Protocol::Memory(port_base).into()); + swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into()); // Ask second to search a random value. let search_target = PeerId::random(); diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 5d6c8dd6..4d84f3b4 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -18,164 +18,121 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! A k-buckets table allows one to store a value identified by keys, ordered by their distance -//! to a reference key passed to the constructor. +//! Implementation of a Kademlia routing table as used by a single peer +//! participating in a Kademlia DHT. //! -//! If the local ID has `N` bits, then the k-buckets table contains `N` *buckets* each containing -//! a constant number of entries. Storing a key in the k-buckets table adds it to the bucket -//! corresponding to its distance with the reference key. +//! The entry point for the API of this module is a [`KBucketsTable`]. +//! +//! ## Pending Insertions +//! +//! When the bucket associated with the `Key` of an inserted entry is full +//! but contains disconnected nodes, it accepts a [`PendingEntry`]. +//! Pending entries are inserted lazily when their timeout is found to be expired +//! upon querying the `KBucketsTable`. When that happens, the `KBucketsTable` records +//! an [`AppliedPending`] result which must be consumed by calling [`take_applied_pending`] +//! regularly and / or after performing lookup operations like [`entry`] and [`closest`]. +//! +//! [`entry`]: kbucket::KBucketsTable::entry +//! [`closest`]: kbucket::KBucketsTable::closest +//! [`AppliedPending`]: kbucket::AppliedPending +//! [`KBucketsTable`]: kbucket::KBucketsTable +//! [`take_applied_pending`]: kbucket::KBucketsTable::take_applied_pending +//! [`PendingEntry`]: kbucket::PendingEntry -use arrayvec::ArrayVec; -use bigint::U256; -use libp2p_core::PeerId; -use multihash::Multihash; -use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}}; -use std::slice::IterMut as SliceIterMut; +// [Implementation Notes] +// +// 1. Routing Table Layout +// +// The routing table is currently implemented as a fixed-size "array" of +// buckets, ordered by increasing distance relative to a local key +// that identifies the local peer. This is an often-used, simplified +// implementation that approximates the properties of the b-tree (or prefix tree) +// implementation described in the full paper [0], whereby buckets are split on-demand. +// This should be treated as an implementation detail, however, so that the +// implementation may change in the future without breaking the API. +// +// 2. Replacement Cache +// +// In this implementation, the "replacement cache" for unresponsive peers +// consists of a single entry per bucket. Furthermore, this implementation is +// currently tailored to connection-oriented transports, meaning that the +// "LRU"-based ordering of entries in a bucket is actually based on the last reported +// connection status of the corresponding peers, from least-recently (dis)connected to +// most-recently (dis)connected, and controlled through the `Entry` API. As a result, +// the nodes in the buckets are not reordered as a result of RPC activity, but only as a +// result of nodes being marked as connected or disconnected. In particular, +// if a bucket is full and contains only entries for peers that are considered +// connected, no pending entry is accepted. See the `bucket` submodule for +// further details. +// +// [0]: https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf + +mod bucket; +mod entry; +mod key; + +pub use entry::*; + +use arrayvec::{self, ArrayVec}; +use bucket::KBucket; +use std::collections::VecDeque; use std::time::{Duration, Instant}; -use std::vec::IntoIter as VecIntoIter; /// Maximum number of k-buckets. const NUM_BUCKETS: usize = 256; -/// Maximum number of nodes in a bucket, i.e. `k`. -const MAX_NODES_PER_BUCKET: usize = 20; -/// A table of `KBucket`s, i.e. a Kademlia routing table. +/// A `KBucketsTable` represents a Kademlia routing table. #[derive(Debug, Clone)] pub struct KBucketsTable { - /// Peer ID of the local node. + /// The key identifying the local peer that owns the routing table. local_key: Key, - /// The actual tables that store peers or values. - tables: Vec>, - /// The timeout when trying to reach the youngest node after which we consider it unresponsive. - unresponsive_timeout: Duration, + /// The buckets comprising the routing table. + buckets: Vec>, + /// The list of evicted entries that have been replaced with pending + /// entries since the last call to [`KBucketsTable::take_applied_pending`]. + applied_pending: VecDeque> } -/// A `Key` is a cryptographic hash, stored with an associated value in a `KBucket` -/// of a `KBucketsTable`. -/// -/// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of -/// the hash digests, interpreted as an integer. See [`Key::distance`]. -/// -/// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`]. -#[derive(Clone, Debug)] -pub struct Key { - preimage: T, - hash: GenericArray, -} +/// A (type-safe) index into a `KBucketsTable`, i.e. a non-negative integer in the +/// interval `[0, NUM_BUCKETS)`. +#[derive(Copy, Clone)] +struct BucketIndex(usize); -impl PartialEq for Key { - fn eq(&self, other: &Key) -> bool { - self.hash == other.hash - } -} - -impl Eq for Key {} - -impl Key { - /// Construct a new `Key` by hashing the bytes of the given `preimage`. +impl BucketIndex { + /// Creates a new `BucketIndex` for a `Distance`. /// - /// The preimage of type `T` is preserved. See [`Key::preimage`] and - /// [`Key::into_preimage`]. - pub fn new(preimage: T) -> Key - where - T: AsRef<[u8]> - { - let hash = Sha256::digest(preimage.as_ref()); - Key { preimage, hash } + /// The given distance is interpreted as the distance from a `local_key` of + /// a `KBucketsTable`. If the distance is zero, `None` is returned, in + /// recognition of the fact that the only key with distance `0` to a + /// `local_key` is the `local_key` itself, which does not belong in any + /// bucket. + fn new(d: &Distance) -> Option { + (NUM_BUCKETS - d.0.leading_zeros() as usize) + .checked_sub(1) + .map(BucketIndex) } - /// Borrows the preimage of the key. - pub fn preimage(&self) -> &T { - &self.preimage + /// Gets the index value as an unsigned integer. + fn get(&self) -> usize { + self.0 } - - /// Converts the key into its preimage. - pub fn into_preimage(self) -> T { - self.preimage - } - - /// Computes the distance of the keys according to the XOR metric. - pub fn distance(&self, other: &Key) -> Distance { - let a = U256::from(self.hash.as_ref()); - let b = U256::from(other.hash.as_ref()); - Distance(a ^ b) - } -} - -impl From for Key { - fn from(h: Multihash) -> Self { - let k = Key::new(h.clone().into_bytes()); - Key { preimage: h, hash: k.hash } - } -} - -impl From for Key { - fn from(peer_id: PeerId) -> Self { - Key::new(peer_id) - } -} - -/// A distance between two `Key`s. -#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)] -pub struct Distance(bigint::U256); - -/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values, -/// ordered from least recently used to most recently used. -#[derive(Debug, Clone)] -struct KBucket { - /// Nodes are always ordered from oldest to newest. The nodes we are connected to are always - /// all on top (ie. have higher indices) of the nodes we are not connected to. - nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, - - /// Index in `nodes` over which all nodes are connected. Must always be <= to the length - /// of `nodes`. - first_connected_pos: usize, - - /// Node received when the bucket was full. Will be added to the list if the youngest node - /// doesn't respond in time to our reach attempt. - pending_node: Option>, -} - -/// State of the pending node. -#[derive(Debug, Clone)] -struct PendingNode { - /// Node to insert. - node: Node, - - /// If true, we are connected to the pending node. - connected: bool, - - /// When the pending node will replace an existing node, provided that the youngest node - /// doesn't become responsive before. - replace: Instant, -} - -/// A single node in a `KBucket`. -#[derive(Debug, Clone)] -struct Node { - /// Id of the node. - id: Key, - /// Value associated to it. - value: TVal, } impl KBucketsTable where TPeerId: Clone, { - /// Builds a new routing table whose keys are distributed over `KBucket`s as - /// per the relative distance to `local_key`. - pub fn new(local_key: Key, unresponsive_timeout: Duration) -> Self { + /// Creates a new, empty Kademlia routing table with entries partitioned + /// into buckets as per the Kademlia protocol. + /// + /// The given `pending_timeout` specifies the duration after creation of + /// a [`PendingEntry`] after which it becomes eligible for insertion into + /// a full bucket, replacing the least-recently (dis)connected node. + pub fn new(local_key: Key, pending_timeout: Duration) -> Self { KBucketsTable { local_key, - tables: (0..NUM_BUCKETS) - .map(|_| KBucket { - nodes: ArrayVec::new(), - first_connected_pos: 0, - pending_node: None, - }) - .collect(), - unresponsive_timeout, + buckets: (0 .. NUM_BUCKETS).map(|_| KBucket::new(pending_timeout)).collect(), + applied_pending: VecDeque::new() } } @@ -184,720 +141,395 @@ where &self.local_key } - /// Returns the id of the bucket that should contain the peer with the given ID. - /// - /// Returns `None` if out of range, which happens if `id` is the same as the local peer id. - fn bucket_num(&self, key: &Key) -> Option { - (NUM_BUCKETS - self.local_key.distance(key).0.leading_zeros() as usize).checked_sub(1) + /// Returns an `Entry` for the given key, representing the state of the entry + /// in the routing table. + pub fn entry<'a>(&'a mut self, key: &'a Key) -> Entry<'a, TPeerId, TVal> { + let index = BucketIndex::new(&self.local_key.distance(key)); + if let Some(i) = index { + let bucket = &mut self.buckets[i.get()]; + if let Some(applied) = bucket.apply_pending() { + self.applied_pending.push_back(applied) + } + Entry::new(bucket, key) + } else { + Entry::SelfEntry + } } - /// Returns an object containing the state of the given entry. - pub fn entry<'a>(&'a mut self, peer_id: &'a Key) -> Entry<'a, TPeerId, TVal> { - let bucket_num = if let Some(num) = self.bucket_num(peer_id) { - num - } else { - return Entry::SelfEntry; - }; - - // Update the pending node state. - // TODO: must be reported to the user somehow, in a non-annoying API - if let Some(pending) = self.tables[bucket_num].pending_node.take() { - if pending.replace < Instant::now() { - let table = &mut self.tables[bucket_num]; - let first_connected_pos = &mut table.first_connected_pos; - // If all the nodes in the bucket are connected, then there shouldn't be any - // pending node. - debug_assert!(*first_connected_pos >= 1); - table.nodes.remove(0); - if pending.connected { - *first_connected_pos -= 1; - table.nodes.insert(*first_connected_pos, pending.node); - } else { - table.nodes.insert(*first_connected_pos - 1, pending.node); + /// Returns an iterator over all the entries in the routing table. + pub fn iter<'a>(&'a mut self) -> impl Iterator> { + let applied_pending = &mut self.applied_pending; + self.buckets.iter_mut().flat_map(move |table| { + if let Some(applied) = table.apply_pending() { + applied_pending.push_back(applied) + } + let table = &*table; + table.iter().map(move |(n, status)| { + EntryRefView { + node: NodeRefView { + key: &n.key, + value: &n.value + }, + status } - } else { - self.tables[bucket_num].pending_node = Some(pending); - } - } - - // Try to find the node in the bucket. - if let Some(pos) = self.tables[bucket_num].nodes.iter().position(|p| p.id == *peer_id) { - if pos >= self.tables[bucket_num].first_connected_pos { - Entry::InKbucketConnected(EntryInKbucketConn { - parent: self, - peer_id, - }) - - } else { - Entry::InKbucketDisconnected(EntryInKbucketDisc { - parent: self, - peer_id, - }) - } - - } else if self.tables[bucket_num].pending_node.as_ref().map(|p| p.node.id == *peer_id).unwrap_or(false) { - // Node is pending. - if self.tables[bucket_num].pending_node.as_ref().map(|p| p.connected).unwrap_or(false) { - Entry::InKbucketConnectedPending(EntryInKbucketConnPending { - parent: self, - peer_id, - }) - } else { - Entry::InKbucketDisconnectedPending(EntryInKbucketDiscPending { - parent: self, - peer_id, - }) - } - - } else { - Entry::NotInKbucket(EntryNotInKbucket { - parent: self, - peer_id, }) - } - } - - /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. - pub fn entries_not_pending(&self) -> impl Iterator, &TVal)> { - self.tables - .iter() - .flat_map(|table| table.nodes.iter()) - .map(|node| (&node.id, &node.value)) - } - - /// Returns an iterator to all the buckets of this table. - /// - /// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes - /// first. - pub fn buckets(&mut self) -> BucketsIter<'_, TPeerId, TVal> { - BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout) - } - - /// Finds the keys closest to `key`, ordered by distance. - /// - /// Pending nodes are ignored. - pub fn find_closest(&mut self, key: &Key) -> VecIntoIter> { - // TODO: optimize - let mut out = Vec::new(); - for table in self.tables.iter_mut() { - for node in table.nodes.iter() { - out.push(node.id.clone()); - } - - // TODO: this code that handles the pending_node should normally be shared with - // the one in `entry()`; however right now there's no mechanism to notify the - // user when a pending node has been inserted in the table, and thus we need to - // rework this pending node handling code anyway; when that is being done, we - // should rewrite this code properly - if let Some(ref pending) = table.pending_node { - if pending.replace <= Instant::now() && pending.connected { - out.pop(); - out.push(pending.node.id.clone()); - } - } - } - out.sort_by(|a, b| key.distance(a).cmp(&key.distance(b))); - out.into_iter() - } -} - -/// Represents an entry or a potential entry in the k-buckets. -pub enum Entry<'a, TPeerId, TVal> { - /// Entry in a k-bucket that we're connected to. - InKbucketConnected(EntryInKbucketConn<'a, TPeerId, TVal>), - /// Entry pending waiting for a free slot to enter a k-bucket. We're connected to it. - InKbucketConnectedPending(EntryInKbucketConnPending<'a, TPeerId, TVal>), - /// Entry in a k-bucket but that we're not connected to. - InKbucketDisconnected(EntryInKbucketDisc<'a, TPeerId, TVal>), - /// Entry pending waiting for a free slot to enter a k-bucket. We're not connected to it. - InKbucketDisconnectedPending(EntryInKbucketDiscPending<'a, TPeerId, TVal>), - /// Entry is not present in any k-bucket. - NotInKbucket(EntryNotInKbucket<'a, TPeerId, TVal>), - /// Entry is the local peer ID. - SelfEntry, -} - -impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket, including if the node is pending. - pub fn value(&mut self) -> Option<&mut TVal> { - match self { - Entry::InKbucketConnected(entry) => Some(entry.value()), - Entry::InKbucketConnectedPending(entry) => Some(entry.value()), - Entry::InKbucketDisconnected(entry) => Some(entry.value()), - Entry::InKbucketDisconnectedPending(entry) => Some(entry.value()), - Entry::NotInKbucket(_entry) => None, - Entry::SelfEntry => None, - } - } - - /// Returns the value associated to the entry in the bucket. - pub fn value_not_pending(&mut self) -> Option<&mut TVal> { - match self { - Entry::InKbucketConnected(entry) => Some(entry.value()), - Entry::InKbucketConnectedPending(_entry) => None, - Entry::InKbucketDisconnected(entry) => Some(entry.value()), - Entry::InKbucketDisconnectedPending(_entry) => None, - Entry::NotInKbucket(_entry) => None, - Entry::SelfEntry => None, - } - } -} - -/// Represents an entry in a k-bucket. -pub struct EntryInKbucketConn<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketConn<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConn if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let peer_id = self.peer_id; - &mut table.nodes.iter_mut() - .find(move |p| p.id == *peer_id) - .expect("We can only build a EntryInKbucketConn if we know that the peer is in its \ - bucket; QED") - .value - } - - /// Reports that we are now disconnected from the given node. - /// - /// This moves the node down in its bucket. There are two possible outcomes: - /// - /// - Either we had a pending node which replaces the current node. `Replaced` is returned. - /// - Or we had no pending node, and the current node is kept. `Kept` is returned. - /// - pub fn set_disconnected(self) -> SetDisconnectedOutcome<'a, TPeerId, TVal> { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConn if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let peer_id = self.peer_id; - let pos = table.nodes.iter().position(move |elem| elem.id == *peer_id) - .expect("we can only build a EntryInKbucketConn if the node is in its bucket; QED"); - debug_assert!(table.first_connected_pos <= pos); - - // We replace it with the pending node, if any. - if let Some(pending) = table.pending_node.take() { - if pending.connected { - let removed = table.nodes.remove(pos); - let ret = SetDisconnectedOutcome::Replaced { - replacement: pending.node.id.clone(), - old_val: removed.value, - }; - table.nodes.insert(table.first_connected_pos, pending.node); - return ret; - } else { - table.pending_node = Some(pending); - } - } - - // Move the node in the bucket. - if pos != table.first_connected_pos { - let elem = table.nodes.remove(pos); - table.nodes.insert(table.first_connected_pos, elem); - } - table.first_connected_pos += 1; - - // And return a EntryInKbucketDisc. - debug_assert!(table.nodes.iter() - .position(move |e| e.id == *peer_id) - .map(|p| p < table.first_connected_pos) - .unwrap_or(false)); - - SetDisconnectedOutcome::Kept(EntryInKbucketDisc { - parent: self.parent, - peer_id: self.peer_id, }) } -} -/// Outcome of calling `set_disconnected`. -#[must_use] -pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> { - /// Node is kept in the bucket. - Kept(EntryInKbucketDisc<'a, TPeerId, TVal>), - /// Node is pushed out of the bucket. - Replaced { - /// Node that replaced the node. - // TODO: could be a EntryInKbucketConn, but we have borrow issues with the new peer id - replacement: Key, - /// Value os the node that has been pushed out. - old_val: TVal, - }, -} - -/// Represents an entry waiting for a slot to be available in its k-bucket. -pub struct EntryInKbucketConnPending<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketConnPending<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id)); - &mut table.pending_node - .as_mut() - .expect("we can only build a EntryInKbucketConnPending if the node is pending; QED") - .node.value - } - - /// Reports that we are now disconnected from the given node. - pub fn set_disconnected(self) -> EntryInKbucketDiscPending<'a, TPeerId, TVal> { - { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let mut pending = table.pending_node.as_mut() - .expect("we can only build a EntryInKbucketConnPending if there's a pending node; QED"); - debug_assert!(pending.connected); - pending.connected = false; - } - - EntryInKbucketDiscPending { - parent: self.parent, - peer_id: self.peer_id, - } - } -} - -/// Represents an entry waiting for a slot to be available in its k-bucket. -pub struct EntryInKbucketDiscPending<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketDiscPending<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id)); - &mut table.pending_node - .as_mut() - .expect("we can only build a EntryInKbucketDiscPending if the node is pending; QED") - .node.value - } - - /// Reports that we are now connected to the given node. - pub fn set_connected(self) -> EntryInKbucketConnPending<'a, TPeerId, TVal> { - { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let mut pending = table.pending_node.as_mut() - .expect("we can only build a EntryInKbucketDiscPending if there's a pending node; QED"); - debug_assert!(!pending.connected); - pending.connected = true; - } - - EntryInKbucketConnPending { - parent: self.parent, - peer_id: self.peer_id, - } - } -} - -/// Represents an entry in a k-bucket. -pub struct EntryInKbucketDisc<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketDisc<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let peer_id = self.peer_id; - &mut table.nodes.iter_mut() - .find(move |p| p.id == *peer_id) - .expect("We can only build a EntryInKbucketDisc if we know that the peer is in its \ - bucket; QED") - .value - } - - /// Sets the node as connected. This moves the entry in the bucket. - pub fn set_connected(self) -> EntryInKbucketConn<'a, TPeerId, TVal> { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let pos = { - let peer_id = self.peer_id; - table.nodes.iter().position(move |p| p.id == *peer_id) - .expect("We can only build a EntryInKbucketDisc if we know that the peer is in \ - its bucket; QED") - }; - - // If we are the youngest node, we are now connected, which means that we have to drop the - // pending node. - // Note that it is theoretically possible that the replacement should have occurred between - // the moment when we build the `EntryInKbucketConn` and the moment when we call - // `set_connected`, but we don't take that into account. - if pos == 0 { - table.pending_node = None; - } - - debug_assert!(pos < table.first_connected_pos); - table.first_connected_pos -= 1; - if pos != table.first_connected_pos { - let entry = table.nodes.remove(pos); - table.nodes.insert(table.first_connected_pos, entry); - } - - // There shouldn't be a pending node if all slots are full of connected nodes. - debug_assert!(!(table.first_connected_pos == 0 && table.pending_node.is_some())); - - EntryInKbucketConn { - parent: self.parent, - peer_id: self.peer_id, - } - } -} - -/// Represents an entry not in any k-bucket. -pub struct EntryNotInKbucket<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryNotInKbucket<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Inserts the node as connected, if possible. - pub fn insert_connected(self, value: TVal) -> InsertOutcome { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryNotInKbucket if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - if table.nodes.is_full() { - if table.first_connected_pos == 0 || table.pending_node.is_some() { - InsertOutcome::Full - } else { - table.pending_node = Some(PendingNode { - node: Node { id: self.peer_id.clone(), value }, - replace: Instant::now() + self.parent.unresponsive_timeout, - connected: true, - }); - InsertOutcome::Pending { - to_ping: table.nodes[0].id.clone() - } - } - } else { - table.nodes.insert(table.first_connected_pos, Node { - id: self.peer_id.clone(), - value, - }); - InsertOutcome::Inserted - } - } - - /// Inserts the node as disconnected, if possible. + /// Returns a by-reference iterator over all buckets. /// - /// > **Note**: This function will never return `Pending`. If the bucket is full, we simply - /// > do nothing. - pub fn insert_disconnected(self, value: TVal) -> InsertOutcome { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryNotInKbucket if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; + /// The buckets are ordered by proximity to the `local_key`, i.e. the first + /// bucket is the closest bucket (containing at most one key). + pub fn buckets<'a>(&'a mut self) -> impl Iterator> + 'a { + let applied_pending = &mut self.applied_pending; + self.buckets.iter_mut().map(move |b| { + if let Some(applied) = b.apply_pending() { + applied_pending.push_back(applied) + } + KBucketRef(b) + }) + } - if table.nodes.is_full() { - InsertOutcome::Full - } else { - table.nodes.insert(table.first_connected_pos, Node { - id: self.peer_id.clone(), - value, - }); - table.first_connected_pos += 1; - InsertOutcome::Inserted + /// Consumes the next applied pending entry, if any. + /// + /// When an entry is attempted to be inserted and the respective bucket is full, + /// it may be recorded as pending insertion after a timeout, see [`InsertResult::Pending`]. + /// + /// If the oldest currently disconnected entry in the respective bucket does not change + /// its status until the timeout of pending entry expires, it is evicted and + /// the pending entry inserted instead. These insertions of pending entries + /// happens lazily, whenever the `KBucketsTable` is accessed, and the corresponding + /// buckets are updated accordingly. The fact that a pending entry was applied is + /// recorded in the `KBucketsTable` in the form of `AppliedPending` results, which must be + /// consumed by calling this function. + pub fn take_applied_pending(&mut self) -> Option> { + self.applied_pending.pop_front() + } + + /// Returns an iterator over the keys closest to `target`, ordered by + /// increasing distance. + pub fn closest_keys<'a, T>(&'a mut self, target: &'a Key) + -> impl Iterator> + 'a + where + T: Clone + { + let distance = self.local_key.distance(target); + ClosestIter { + target, + iter: None, + table: self, + buckets_iter: ClosestBucketsIter::new(distance), + fmap: |b: &KBucket<_, _>| -> ArrayVec<_> { + b.iter().map(|(n,_)| n.key.clone()).collect() + } + } + } + + /// Returns an iterator over the nodes closest to the `target` key, ordered by + /// increasing distance. + pub fn closest<'a, T>(&'a mut self, target: &'a Key) + -> impl Iterator> + 'a + where + T: Clone, + TVal: Clone + { + let distance = self.local_key.distance(target); + ClosestIter { + target, + iter: None, + table: self, + buckets_iter: ClosestBucketsIter::new(distance), + fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_> { + b.iter().map(|(n, status)| EntryView { + node: n.clone(), + status + }).collect() + } } } } -/// Outcome of calling `insert`. -#[must_use] -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum InsertOutcome { - /// The entry has been successfully inserted. - Inserted, - /// The entry has been inserted as a pending node. - Pending { - /// We have to try connect to the returned node. - to_ping: Key, - }, - /// The entry was not inserted because the bucket was full of connected nodes. - Full, +/// An iterator over (some projection of) the closest entries in a +/// `KBucketsTable` w.r.t. some target `Key`. +struct ClosestIter<'a, TTarget, TPeerId, TVal, TMap, TOut> { + /// A reference to the target key whose distance to the local key determines + /// the order in which the buckets are traversed. The resulting + /// array from projecting the entries of each bucket using `fmap` is + /// sorted according to the distance to the target. + target: &'a Key, + /// A reference to all buckets of the `KBucketsTable`. + table: &'a mut KBucketsTable, + /// The iterator over the bucket indices in the order determined by the + /// distance of the local key to the target. + buckets_iter: ClosestBucketsIter, + /// The iterator over the entries in the currently traversed bucket. + iter: Option>, + /// The projection function / mapping applied on each bucket as + /// it is encountered, producing the next `iter`ator. + fmap: TMap } -/// Iterator giving access to a bucket. -pub struct BucketsIter<'a, TPeerId, TVal>(SliceIterMut<'a, KBucket>, Duration); +/// An iterator over the bucket indices, in the order determined by the `Distance` of +/// a target from the `local_key`, such that the entries in the buckets are incrementally +/// further away from the target, starting with the bucket covering the target. +struct ClosestBucketsIter { + /// The distance to the `local_key`. + distance: Distance, + /// The current state of the iterator. + state: ClosestBucketsIterState +} -impl<'a, TPeerId, TVal> Iterator for BucketsIter<'a, TPeerId, TVal> { - type Item = Bucket<'a, TPeerId, TVal>; +/// Operating states of a `ClosestBucketsIter`. +enum ClosestBucketsIterState { + /// The starting state of the iterator yields the first bucket index and + /// then transitions to `ZoomIn`. + Start(BucketIndex), + /// The iterator "zooms in" to to yield the next bucket cotaining nodes that + /// are incrementally closer to the local node but further from the `target`. + /// These buckets are identified by a `1` in the corresponding bit position + /// of the distance bit string. When bucket `0` is reached, the iterator + /// transitions to `ZoomOut`. + ZoomIn(BucketIndex), + /// Once bucket `0` has been reached, the iterator starts "zooming out" + /// to buckets containing nodes that are incrementally further away from + /// both the local key and the target. These are identified by a `0` in + /// the corresponding bit position of the distance bit string. When bucket + /// `255` is reached, the iterator transitions to state `Done`. + ZoomOut(BucketIndex), + /// The iterator is in this state once it has visited all buckets. + Done +} + +impl ClosestBucketsIter { + fn new(distance: Distance) -> Self { + let state = match BucketIndex::new(&distance) { + Some(i) => ClosestBucketsIterState::Start(i), + None => ClosestBucketsIterState::Done + }; + Self { distance, state } + } + + fn next_in(&self, i: BucketIndex) -> Option { + (0 .. i.get()).rev().find_map(|i| + if self.distance.0.bit(i) { + Some(BucketIndex(i)) + } else { + None + }) + } + + fn next_out(&self, i: BucketIndex) -> Option { + (i.get() + 1 .. NUM_BUCKETS).find_map(|i| + if !self.distance.0.bit(i) { + Some(BucketIndex(i)) + } else { + None + }) + } +} + +impl Iterator for ClosestBucketsIter { + type Item = BucketIndex; fn next(&mut self) -> Option { - self.0.next().map(|bucket| { - Bucket(bucket) - }) - } - - fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() + match self.state { + ClosestBucketsIterState::Start(i) => { + self.state = ClosestBucketsIterState::ZoomIn(i); + Some(i) + } + ClosestBucketsIterState::ZoomIn(i) => + if let Some(i) = self.next_in(i) { + self.state = ClosestBucketsIterState::ZoomIn(i); + Some(i) + } else { + let i = BucketIndex(0); + self.state = ClosestBucketsIterState::ZoomOut(i); + Some(i) + } + ClosestBucketsIterState::ZoomOut(i) => + if let Some(i) = self.next_out(i) { + self.state = ClosestBucketsIterState::ZoomOut(i); + Some(i) + } else { + self.state = ClosestBucketsIterState::Done; + None + } + ClosestBucketsIterState::Done => None + } } } -impl<'a, TPeerId, TVal> ExactSizeIterator for BucketsIter<'a, TPeerId, TVal> {} +impl Iterator +for ClosestIter<'_, TTarget, TPeerId, TVal, TMap, TOut> +where + TPeerId: Clone, + TMap: Fn(&KBucket) -> ArrayVec<[TOut; MAX_NODES_PER_BUCKET]>, + TOut: AsRef> +{ + type Item = TOut; -/// Access to a bucket. -pub struct Bucket<'a, TPeerId, TVal>(&'a mut KBucket); + fn next(&mut self) -> Option { + loop { + match &mut self.iter { + Some(iter) => match iter.next() { + Some(k) => return Some(k), + None => self.iter = None + } + None => { + if let Some(i) = self.buckets_iter.next() { + let bucket = &mut self.table.buckets[i.get()]; + if let Some(applied) = bucket.apply_pending() { + self.table.applied_pending.push_back(applied) + } + let mut v = (self.fmap)(bucket); + v.sort_by(|a, b| + self.target.distance(a.as_ref()) + .cmp(&self.target.distance(b.as_ref()))); + self.iter = Some(v.into_iter()); + } else { + return None + } + } + } + } + } +} -impl<'a, TPeerId, TVal> Bucket<'a, TPeerId, TVal> { - /// Returns the number of entries in that bucket. - /// - /// > **Note**: Keep in mind that this operation can be racy. If `update()` is called on the - /// > table while this function is running, the `update()` may or may not be taken - /// > into account. +/// A reference to a bucket in a `KBucketsTable`. +pub struct KBucketRef<'a, TPeerId, TVal>(&'a mut KBucket); + +impl KBucketRef<'_, TPeerId, TVal> +where + TPeerId: Clone +{ + /// Returns the number of entries in the bucket. pub fn num_entries(&self) -> usize { - self.0.nodes.len() + self.0.num_entries() } - /// Returns true if this bucket has a pending node. + /// Returns true if the bucket has a pending node. pub fn has_pending(&self) -> bool { - if let Some(ref node) = self.0.pending_node { - node.replace > Instant::now() - } else { - false - } + self.0.pending().map_or(false, |n| !n.is_ready()) } } #[cfg(test)] mod tests { use super::*; - use quickcheck::*; use libp2p_core::PeerId; - use crate::kbucket::{Entry, InsertOutcome, KBucketsTable, MAX_NODES_PER_BUCKET}; - use std::time::Duration; - - impl Arbitrary for Key { - fn arbitrary(_: &mut G) -> Key { - Key::from(PeerId::random()) - } - } - - #[test] - fn identity() { - fn prop(a: Key) -> bool { - a.distance(&a) == Distance::default() - } - quickcheck(prop as fn(_) -> _) - } - - #[test] - fn symmetry() { - fn prop(a: Key, b: Key) -> bool { - a.distance(&b) == b.distance(&a) - } - quickcheck(prop as fn(_,_) -> _) - } - - #[test] - fn triangle_inequality() { - fn prop(a: Key, b: Key, c: Key) -> TestResult { - let ab = a.distance(&b); - let bc = b.distance(&c); - let (ab_plus_bc, overflow) = ab.0.overflowing_add(bc.0); - if overflow { - TestResult::discard() - } else { - TestResult::from_bool(a.distance(&c) <= Distance(ab_plus_bc)) - } - } - quickcheck(prop as fn(_,_,_) -> _) - } - - #[test] - fn unidirectionality() { - fn prop(a: Key, b: Key) -> bool { - let d = a.distance(&b); - (0..100).all(|_| { - let c = Key::from(PeerId::random()); - a.distance(&c) != d || b == c - }) - } - quickcheck(prop as fn(_,_) -> _) - } - #[test] fn basic_closest() { - let my_key = Key::from(PeerId::random()); + let local_key = Key::from(PeerId::random()); let other_id = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(my_key, Duration::from_secs(5)); - if let Entry::NotInKbucket(entry) = table.entry(&other_id) { - match entry.insert_connected(()) { - InsertOutcome::Inserted => (), + let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + if let Entry::Absent(entry) = table.entry(&other_id) { + match entry.insert((), NodeStatus::Connected) { + InsertResult::Inserted => (), _ => panic!() } } else { panic!() } - let res = table.find_closest(&other_id).collect::>(); + let res = table.closest_keys(&other_id).collect::>(); assert_eq!(res.len(), 1); assert_eq!(res[0], other_id); } #[test] fn update_local_id_fails() { - let my_key = Key::from(PeerId::random()); - - let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5)); - match table.entry(&my_key) { + let local_key = Key::from(PeerId::random()); + let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_secs(5)); + match table.entry(&local_key) { Entry::SelfEntry => (), _ => panic!(), } } #[test] - fn full_kbucket() { - let my_key = Key::from(PeerId::random()); - - let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5)); - - // Step 1: Fill the most distant bucket, i.e. bucket index `NUM_BUCKETS - 1`, - // with "disconnected" peers. - - // Prepare `MAX_NODES_PER_BUCKET` keys to fill the bucket, plus 2 - // additional keys which will be used to test the behavior on a full - // bucket. - assert!(MAX_NODES_PER_BUCKET <= 251); // Test doesn't work otherwise. - let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3) - .map(|n| { - let mut id = my_key.clone(); - // Flip the first bit so that we get in the most distant bucket. - id.hash[0] ^= 0x80; - // Each ID gets a unique low-order byte (i.e. counter). - id.hash[31] = id.hash[31].wrapping_add(n as u8); - id - }) - .collect::>(); - - let first_node = fill_ids[0].clone(); - let second_node = fill_ids[1].clone(); - - // Fill the bucket, consuming all but the last 3 test keys. - for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { - if let Entry::NotInKbucket(entry) = table.entry(&id) { - match entry.insert_disconnected(()) { - InsertOutcome::Inserted => (), - _ => panic!() + fn closest() { + let local_key = Key::from(PeerId::random()); + let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + let mut count = 0; + loop { + if count == 100 { break; } + let key = Key::from(PeerId::random()); + if let Entry::Absent(e) = table.entry(&key) { + match e.insert((), NodeStatus::Connected) { + InsertResult::Inserted => count += 1, + _ => continue, } } else { - panic!() + panic!("entry exists") } - assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); - } - assert_eq!( - table.buckets().nth(255).unwrap().num_entries(), - MAX_NODES_PER_BUCKET - ); - assert!(!table.buckets().nth(255).unwrap().has_pending()); - - // Step 2: Insert another key on the full bucket. It must be marked as - // pending and the first (i.e. "least recently used") entry scheduled - // for replacement. - - if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { - match entry.insert_connected(()) { - InsertOutcome::Pending { ref to_ping } if *to_ping == first_node => (), - _ => panic!() - } - } else { - panic!() - } - assert_eq!( - table.buckets().nth(255).unwrap().num_entries(), - MAX_NODES_PER_BUCKET - ); - assert!(table.buckets().nth(255).unwrap().has_pending()); - // Trying to insert yet another key is rejected. - if let Entry::NotInKbucket(entry) = table.entry(&Key::from(fill_ids.remove(0))) { - match entry.insert_connected(()) { - InsertOutcome::Full => (), - _ => panic!() - } - } else { - panic!() } - // Step 3: Make the pending nodes eligible for replacing existing nodes. - // The pending node must be consumed and replace the first (i.e. "least - // recently used") node. + let mut expected_keys: Vec<_> = table.buckets + .iter() + .flat_map(|t| t.iter().map(|(n,_)| n.key.clone())) + .collect(); - let elapsed = Instant::now() - Duration::from_secs(1); - table.tables[255].pending_node.as_mut().map(|n| n.replace = elapsed); - assert!(!table.buckets().nth(255).unwrap().has_pending()); - if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { - match entry.insert_connected(()) { - InsertOutcome::Pending { ref to_ping } if *to_ping == second_node => (), - _ => panic!() - } - } else { - panic!() + for _ in 0 .. 10 { + let target_key = Key::from(PeerId::random()); + let keys = table.closest_keys(&target_key).collect::>(); + // The list of keys is expected to match the result of a full-table scan. + expected_keys.sort_by_key(|k| k.distance(&target_key)); + assert_eq!(keys, expected_keys); } } + + #[test] + fn applied_pending() { + let local_key = Key::from(PeerId::random()); + let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_millis(1)); + let expected_applied; + let full_bucket_index; + loop { + let key = Key::from(PeerId::random()); + if let Entry::Absent(e) = table.entry(&key) { + match e.insert((), NodeStatus::Disconnected) { + InsertResult::Full => { + if let Entry::Absent(e) = table.entry(&key) { + match e.insert((), NodeStatus::Connected) { + InsertResult::Pending { disconnected } => { + expected_applied = AppliedPending { + inserted: key.clone(), + evicted: Some(Node { key: disconnected, value: () }) + }; + full_bucket_index = BucketIndex::new(&key.distance(&local_key)); + break + }, + _ => panic!() + } + } else { + panic!() + } + }, + _ => continue, + } + } else { + panic!("entry exists") + } + } + + // Expire the timeout for the pending entry on the full bucket.` + let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()]; + let elapsed = Instant::now() - Duration::from_secs(1); + full_bucket.pending_mut().unwrap().set_ready_at(elapsed); + + match table.entry(&expected_applied.inserted) { + Entry::Present(_, NodeStatus::Connected) => {} + x => panic!("Unexpected entry: {:?}", x) + } + + match table.entry(&expected_applied.evicted.as_ref().unwrap().key) { + Entry::Absent(_) => {} + x => panic!("Unexpected entry: {:?}", x) + } + + assert_eq!(Some(expected_applied), table.take_applied_pending()); + assert_eq!(None, table.take_applied_pending()); + } } diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs new file mode 100644 index 00000000..2133416e --- /dev/null +++ b/protocols/kad/src/kbucket/bucket.rs @@ -0,0 +1,552 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The internal API for a single `KBucket` in a `KBucketsTable`. +//! +//! > **Note**: Uniqueness of entries w.r.t. a `Key` in a `KBucket` is not +//! > checked in this module. This is an invariant that must hold across all +//! > buckets in a `KBucketsTable` and hence is enforced by the public API +//! > of the `KBucketsTable` and in particular the public `Entry` API. + +use super::*; + +/// Maximum number of nodes in a bucket, i.e. the (fixed) `k` parameter. +pub const MAX_NODES_PER_BUCKET: usize = 20; + +/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`. +#[derive(Debug, Clone)] +pub struct PendingNode { + /// The pending node to insert. + node: Node, + + /// The status of the pending node. + status: NodeStatus, + + /// The instant at which the pending node is eligible for insertion into a bucket. + replace: Instant, +} + +/// The status of a node in a bucket. +/// +/// The status of a node in a bucket together with the time of the +/// last status change determines the position of the node in a +/// bucket. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum NodeStatus { + /// The node is considered connected. + Connected, + /// The node is considered disconnected. + Disconnected +} + +impl PendingNode { + pub fn key(&self) -> &Key { + &self.node.key + } + + pub fn status(&self) -> NodeStatus { + self.status + } + + pub fn value_mut(&mut self) -> &mut TVal { + &mut self.node.value + } + + pub fn is_ready(&self) -> bool { + Instant::now() >= self.replace + } + + pub fn set_ready_at(&mut self, t: Instant) { + self.replace = t; + } +} + +/// A `Node` in a bucket, representing a peer participating +/// in the Kademlia DHT together with an associated value (e.g. contact +/// information). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Node { + /// The key of the node, identifying the peer. + pub key: Key, + /// The associated value. + pub value: TVal, +} + +/// The position of a node in a `KBucket`, i.e. a non-negative integer +/// in the range `[0, MAX_NODES_PER_BUCKET)`. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Position(usize); + +/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values, +/// ordered from least-recently connected to most-recently connected. +#[derive(Debug, Clone)] +pub struct KBucket { + /// The nodes contained in the bucket. + nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, + + /// The position (index) in `nodes` that marks the first connected node. + /// + /// Since the entries in `nodes` are ordered from least-recently connected to + /// most-recently connected, all entries above this index are also considered + /// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries + /// that are considered disconnected and the range + /// `[first_connected_pos, MAX_NODES_PER_BUCKET)` marks sub-list of entries that are + /// considered connected. + /// + /// `None` indicates that there are no connected entries in the bucket, i.e. + /// the bucket is either empty, or contains only entries for peers that are + /// considered disconnected. + first_connected_pos: Option, + + /// A node that is pending to be inserted into a full bucket, should the + /// least-recently connected (and currently disconnected) node not be + /// marked as connected within `unresponsive_timeout`. + pending: Option>, + + /// The timeout window before a new pending node is eligible for insertion, + /// if the least-recently connected node is not updated as being connected + /// in the meantime. + pending_timeout: Duration +} + +/// The result of inserting an entry into a bucket. +#[must_use] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum InsertResult { + /// The entry has been successfully inserted. + Inserted, + /// The entry is pending insertion because the relevant bucket is currently full. + /// The entry is inserted after a timeout elapsed, if the status of the + /// least-recently connected (and currently disconnected) node in the bucket + /// is not updated before the timeout expires. + Pending { + /// The key of the least-recently connected entry that is currently considered + /// disconnected and whose corresponding peer should be checked for connectivity + /// in order to prevent it from being evicted. If connectivity to the peer is + /// re-established, the corresponding entry should be updated with + /// [`NodeStatus::Connected`]. + disconnected: Key + }, + /// The entry was not inserted because the relevant bucket is full. + Full +} + +/// The result of applying a pending node to a bucket, possibly +/// replacing an existing node. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AppliedPending { + /// The key of the inserted pending node. + pub inserted: Key, + /// The node that has been evicted from the bucket to make room for the + /// pending node, if any. + pub evicted: Option> +} + +impl KBucket +where + TPeerId: Clone +{ + /// Creates a new `KBucket` with the given timeout for pending entries. + pub fn new(pending_timeout: Duration) -> Self { + KBucket { + nodes: ArrayVec::new(), + first_connected_pos: None, + pending: None, + pending_timeout, + } + } + + /// Returns a reference to the pending node of the bucket, if there is any. + pub fn pending(&self) -> Option<&PendingNode> { + self.pending.as_ref() + } + + /// Returns a mutable reference to the pending node of the bucket, if there is any. + pub fn pending_mut(&mut self) -> Option<&mut PendingNode> { + self.pending.as_mut() + } + + /// Returns a reference to the pending node of the bucket, if there is any + /// with a matching key. + pub fn as_pending(&self, key: &Key) -> Option<&PendingNode> { + self.pending().filter(|p| &p.node.key == key) + } + + /// Returns a reference to a node in the bucket. + pub fn get(&self, key: &Key) -> Option<&Node> { + self.position(key).map(|p| &self.nodes[p.0]) + } + + /// Returns an iterator over the nodes in the bucket, together with their status. + pub fn iter(&self) -> impl Iterator, NodeStatus)> { + self.nodes.iter().enumerate().map(move |(p, n)| (n, self.status(Position(p)))) + } + + /// Inserts the pending node into the bucket, if its timeout has elapsed, + /// replacing the least-recently connected node. + /// + /// If a pending node has been inserted, its key is returned together with + /// the node that was replaced. `None` indicates that the nodes in the + /// bucket remained unchanged. + pub fn apply_pending(&mut self) -> Option> { + if let Some(pending) = self.pending.take() { + if pending.replace <= Instant::now() { + if self.nodes.is_full() { + if self.status(Position(0)) == NodeStatus::Connected { + // The bucket is full with connected nodes. Drop the pending node. + return None + } + // The pending node will be inserted. + let inserted = pending.node.key.clone(); + // A connected pending node goes at the end of the list for + // the connected peers, removing the least-recently connected. + if pending.status == NodeStatus::Connected { + let evicted = Some(self.nodes.remove(0)); + self.first_connected_pos = self.first_connected_pos + .map_or_else( + | | Some(self.nodes.len()), + |p| p.checked_sub(1)); + self.nodes.push(pending.node); + return Some(AppliedPending { inserted, evicted }) + } + // A disconnected pending node goes at the end of the list + // for the disconnected peers. + else if let Some(p) = self.first_connected_pos { + if let Some(insert_pos) = p.checked_sub(1) { + let evicted = Some(self.nodes.remove(0)); + self.nodes.insert(insert_pos, pending.node); + return Some(AppliedPending { inserted, evicted }) + } + } else { + // All nodes are disconnected. Insert the new node as the most + // recently disconnected, removing the least-recently disconnected. + let evicted = Some(self.nodes.remove(0)); + self.nodes.push(pending.node); + return Some(AppliedPending { inserted, evicted }) + } + } else { + // There is room in the bucket, so just insert the pending node. + let inserted = pending.node.key.clone(); + match self.insert(pending.node, pending.status) { + InsertResult::Inserted => + return Some(AppliedPending { inserted, evicted: None }), + _ => unreachable!("Bucket is not full.") + } + } + } else { + self.pending = Some(pending); + } + } + + return None + } + + /// Updates the status of the pending node, if any. + pub fn update_pending(&mut self, status: NodeStatus) { + if let Some(pending) = &mut self.pending { + pending.status = status + } + } + + /// Updates the status of the node referred to by the given key, if it is + /// in the bucket. + pub fn update(&mut self, key: &Key, status: NodeStatus) { + if let Some(pos) = self.position(key) { + let node = self.nodes.remove(pos.0); + if pos == Position(0) && status == NodeStatus::Connected { + self.pending = None + } + match self.insert(node, status) { + InsertResult::Inserted => {}, + _ => unreachable!("The node is removed before being (re)inserted.") + } + } + } + + /// Inserts a new node into the bucket with the given status. + /// + /// The status of the node to insert determines the result as follows: + /// + /// * `NodeStatus::Connected`: If the bucket is full and either all nodes are connected + /// or there is already a pending node, insertion fails with `InsertResult::Full`. + /// If the bucket is full but at least one node is disconnected and there is no pending + /// node, the new node is inserted as pending, yielding `InsertResult::Pending`. + /// Otherwise the bucket has free slots and the new node is added to the end of the + /// bucket as the most-recently connected node. + /// + /// * `NodeStatus::Disconnected`: If the bucket is full, insertion fails with + /// `InsertResult::Full`. Otherwise the bucket has free slots and the new node + /// is inserted at the position preceding the first connected node, + /// i.e. as the most-recently disconnected node. If there are no connected nodes, + /// the new node is added as the last element of the bucket. + /// + pub fn insert(&mut self, node: Node, status: NodeStatus) -> InsertResult { + match status { + NodeStatus::Connected => { + if self.nodes.is_full() { + if self.first_connected_pos == Some(0) || self.pending.is_some() { + return InsertResult::Full + } else { + self.pending = Some(PendingNode { + node, + status: NodeStatus::Connected, + replace: Instant::now() + self.pending_timeout, + }); + return InsertResult::Pending { + disconnected: self.nodes[0].key.clone() + } + } + } + let pos = self.nodes.len(); + self.first_connected_pos = self.first_connected_pos.or(Some(pos)); + self.nodes.push(node); + InsertResult::Inserted + } + NodeStatus::Disconnected => { + if self.nodes.is_full() { + return InsertResult::Full + } + if let Some(ref mut first_connected_pos) = self.first_connected_pos { + self.nodes.insert(*first_connected_pos, node); + *first_connected_pos += 1; + } else { + self.nodes.push(node); + } + InsertResult::Inserted + } + } + } + + /// Returns the status of the node at the given position. + pub fn status(&self, pos: Position) -> NodeStatus { + if self.first_connected_pos.map_or(false, |i| pos.0 >= i) { + NodeStatus::Connected + } else { + NodeStatus::Disconnected + } + } + + /// Checks whether the given position refers to a connected node. + pub fn is_connected(&self, pos: Position) -> bool { + self.status(pos) == NodeStatus::Connected + } + + /// Gets the number of entries currently in the bucket. + pub fn num_entries(&self) -> usize { + self.nodes.len() + } + + /// Gets the number of entries in the bucket that are considered connected. + pub fn num_connected(&self) -> usize { + self.first_connected_pos.map_or(0, |i| self.nodes.len() - i) + } + + /// Gets the number of entries in the bucket that are considered disconnected. + pub fn num_disconnected(&self) -> usize { + self.nodes.len() - self.num_connected() + } + + /// Gets the position of an node in the bucket. + pub fn position(&self, key: &Key) -> Option { + self.nodes.iter().position(|p| &p.key == key).map(Position) + } + + /// Gets a mutable reference to the node identified by the given key. + /// + /// Returns `None` if the given key does not refer to an node in the + /// bucket. + pub fn get_mut(&mut self, key: &Key) -> Option<&mut Node> { + self.nodes.iter_mut().find(move |p| &p.key == key) + } +} + +#[cfg(test)] +mod tests { + use libp2p_core::PeerId; + use rand::Rng; + use std::collections::VecDeque; + use super::*; + use quickcheck::*; + + impl Arbitrary for NodeStatus { + fn arbitrary(g: &mut G) -> NodeStatus { + if g.gen() { + NodeStatus::Connected + } else { + NodeStatus::Disconnected + } + } + } + + fn fill_bucket(bucket: &mut KBucket, status: NodeStatus) { + for i in 0 .. MAX_NODES_PER_BUCKET - bucket.num_entries() { + let key = Key::new(PeerId::random()); + let node = Node { key, value: () }; + assert_eq!(InsertResult::Inserted, bucket.insert(node, status)); + assert_eq!(bucket.num_entries(), i + 1); + } + assert!(bucket.pending().is_none()); + } + + #[test] + fn ordering() { + fn prop(status: Vec) -> bool { + let mut bucket = KBucket::::new(Duration::from_secs(1)); + + // The expected lists of connected and disconnected nodes. + let mut connected = VecDeque::new(); + let mut disconnected = VecDeque::new(); + + // Fill the bucket, thereby populating the expected lists in insertion order. + for status in status { + let key = Key::new(PeerId::random()); + let node = Node { key: key.clone(), value: () }; + let full = bucket.num_entries() == MAX_NODES_PER_BUCKET; + match bucket.insert(node, status) { + InsertResult::Inserted => { + let vec = match status { + NodeStatus::Connected => &mut connected, + NodeStatus::Disconnected => &mut disconnected + }; + if full { + vec.pop_front(); + } + vec.push_back((status, key.clone())); + } + _ => {} + } + } + + // Get all nodes from the bucket, together with their status. + let mut nodes = bucket.iter() + .map(|(n, s)| (s, n.key.clone())) + .collect::>(); + + // Split the list of nodes at the first connected node. + let first_connected_pos = nodes.iter().position(|(s,_)| *s == NodeStatus::Connected); + assert_eq!(bucket.first_connected_pos, first_connected_pos); + let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p)); + + // All nodes before the first connected node must be disconnected and + // in insertion order. Similarly, all remaining nodes must be connected + // and in insertion order. + nodes == Vec::from(disconnected) + && + tail == Vec::from(connected) + } + + quickcheck(prop as fn(_) -> _); + } + + #[test] + fn full_bucket() { + let mut bucket = KBucket::::new(Duration::from_secs(1)); + + // Fill the bucket with disconnected nodes. + fill_bucket(&mut bucket, NodeStatus::Disconnected); + + // Trying to insert another disconnected node fails. + let key = Key::new(PeerId::random()); + let node = Node { key, value: () }; + match bucket.insert(node, NodeStatus::Disconnected) { + InsertResult::Full => {}, + x => panic!("{:?}", x) + } + + // One-by-one fill the bucket with connected nodes, replacing the disconnected ones. + for i in 0 .. MAX_NODES_PER_BUCKET { + let (first, first_status) = bucket.iter().next().unwrap(); + let first_disconnected = first.clone(); + assert_eq!(first_status, NodeStatus::Disconnected); + + // Add a connected node, which is expected to be pending, scheduled to + // replace the first (i.e. least-recently connected) node. + let key = Key::new(PeerId::random()); + let node = Node { key: key.clone(), value: () }; + match bucket.insert(node.clone(), NodeStatus::Connected) { + InsertResult::Pending { disconnected } => + assert_eq!(disconnected, first_disconnected.key), + x => panic!("{:?}", x) + } + + // Trying to insert another connected node fails. + match bucket.insert(node.clone(), NodeStatus::Connected) { + InsertResult::Full => {}, + x => panic!("{:?}", x) + } + + assert!(bucket.pending().is_some()); + + // Apply the pending node. + let pending = bucket.pending_mut().expect("No pending node."); + pending.set_ready_at(Instant::now() - Duration::from_secs(1)); + let result = bucket.apply_pending(); + assert_eq!(result, Some(AppliedPending { + inserted: key.clone(), + evicted: Some(first_disconnected) + })); + assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last()); + assert!(bucket.pending().is_none()); + assert_eq!(Some(MAX_NODES_PER_BUCKET - (i + 1)), bucket.first_connected_pos); + } + + assert!(bucket.pending().is_none()); + assert_eq!(MAX_NODES_PER_BUCKET, bucket.num_entries()); + + // Trying to insert another connected node fails. + let key = Key::new(PeerId::random()); + let node = Node { key, value: () }; + match bucket.insert(node, NodeStatus::Connected) { + InsertResult::Full => {}, + x => panic!("{:?}", x) + } + } + + #[test] + fn full_bucket_discard_pending() { + let mut bucket = KBucket::::new(Duration::from_secs(1)); + fill_bucket(&mut bucket, NodeStatus::Disconnected); + let (first, _) = bucket.iter().next().unwrap(); + let first_disconnected = first.clone(); + + // Add a connected pending node. + let key = Key::new(PeerId::random()); + let node = Node { key: key.clone(), value: () }; + if let InsertResult::Pending { disconnected } = bucket.insert(node, NodeStatus::Connected) { + assert_eq!(&disconnected, &first_disconnected.key); + } else { + panic!() + } + assert!(bucket.pending().is_some()); + + // Update the status of the first disconnected node to be connected. + bucket.update(&first_disconnected.key, NodeStatus::Connected); + + // The pending node has been discarded. + assert!(bucket.pending().is_none()); + assert!(bucket.iter().all(|(n,_)| &n.key != &key)); + + // The initially disconnected node is now the most-recently connected. + assert_eq!(Some((&first_disconnected, NodeStatus::Connected)), bucket.iter().last()); + assert_eq!(bucket.position(&first_disconnected.key).map(|p| p.0), bucket.first_connected_pos); + assert_eq!(1, bucket.num_connected()); + assert_eq!(MAX_NODES_PER_BUCKET - 1, bucket.num_disconnected()); + } +} diff --git a/protocols/kad/src/kbucket/entry.rs b/protocols/kad/src/kbucket/entry.rs new file mode 100644 index 00000000..7065490b --- /dev/null +++ b/protocols/kad/src/kbucket/entry.rs @@ -0,0 +1,254 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The `Entry` API for quering and modifying the entries of a `KBucketsTable` +//! representing the nodes participating in the Kademlia DHT. + +pub use super::bucket::{Node, NodeStatus, InsertResult, AppliedPending, MAX_NODES_PER_BUCKET}; +pub use super::key::*; + +use super::*; + +/// An immutable by-reference view of a bucket entry. +pub struct EntryRefView<'a, TPeerId, TVal> { + /// The node represented by the entry. + pub node: NodeRefView<'a, TPeerId, TVal>, + /// The status of the node identified by the key. + pub status: NodeStatus +} + +/// An immutable by-reference view of a `Node`. +pub struct NodeRefView<'a, TPeerId, TVal> { + pub key: &'a Key, + pub value: &'a TVal +} + +impl EntryRefView<'_, TPeerId, TVal> { + pub fn to_owned(&self) -> EntryView + where + TPeerId: Clone, + TVal: Clone + { + EntryView { + node: Node { + key: self.node.key.clone(), + value: self.node.value.clone() + }, + status: self.status + } + } +} + +/// A cloned, immutable view of an entry that is either present in a bucket +/// or pending insertion. +#[derive(Clone, Debug)] +pub struct EntryView { + /// The node represented by the entry. + pub node: Node, + /// The status of the node. + pub status: NodeStatus +} + +impl AsRef> for EntryView { + fn as_ref(&self) -> &Key { + &self.node.key + } +} + +/// A reference into a single entry of a `KBucketsTable`. +#[derive(Debug)] +pub enum Entry<'a, TPeerId, TVal> { + /// The entry is present in a bucket. + Present(PresentEntry<'a, TPeerId, TVal>, NodeStatus), + /// The entry is pending insertion in a bucket. + Pending(PendingEntry<'a, TPeerId, TVal>, NodeStatus), + /// The entry is absent and may be inserted. + Absent(AbsentEntry<'a, TPeerId, TVal>), + /// The entry represents the local node. + SelfEntry, +} + +/// The internal representation of the different states of an `Entry`, +/// referencing the associated key and bucket. +#[derive(Debug)] +struct EntryRef<'a, TPeerId, TVal> { + bucket: &'a mut KBucket, + key: &'a Key, +} + +impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + /// Creates a new `Entry` for a `Key`, encapsulating access to a bucket. + pub(super) fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + if let Some(pos) = bucket.position(key) { + let status = bucket.status(pos); + Entry::Present(PresentEntry::new(bucket, key), status) + } else if let Some(pending) = bucket.as_pending(key) { + let status = pending.status(); + Entry::Pending(PendingEntry::new(bucket, key), status) + } else { + Entry::Absent(AbsentEntry::new(bucket, key)) + } + } + + /// Creates an immutable by-reference view of the entry. + /// + /// Returns `None` if the entry is neither present in a bucket nor + /// pending insertion into a bucket. + pub fn view(&'a mut self) -> Option> { + match self { + Entry::Present(entry, status) => Some(EntryRefView { + node: NodeRefView { + key: entry.0.key, + value: entry.value() + }, + status: *status + }), + Entry::Pending(entry, status) => Some(EntryRefView { + node: NodeRefView { + key: entry.0.key, + value: entry.value() + }, + status: *status + }), + _ => None + } + } + + /// Returns the key of the entry. + /// + /// Returns `None` if the `Key` used to construct this `Entry` is not a valid + /// key for an entry in a bucket, which is the case for the `local_key` of + /// the `KBucketsTable` referring to the local node. + pub fn key(&self) -> Option<&Key> { + match self { + Entry::Present(entry, _) => Some(entry.key()), + Entry::Pending(entry, _) => Some(entry.key()), + Entry::Absent(entry) => Some(entry.key()), + Entry::SelfEntry => None, + } + } + + /// Returns the value associated with the entry. + /// + /// Returns `None` if the entry absent from any bucket or refers to the + /// local node. + pub fn value(&mut self) -> Option<&mut TVal> { + match self { + Entry::Present(entry, _) => Some(entry.value()), + Entry::Pending(entry, _) => Some(entry.value()), + Entry::Absent(_) => None, + Entry::SelfEntry => None, + } + } +} + +/// An entry present in a bucket. +#[derive(Debug)] +pub struct PresentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); + +impl<'a, TPeerId, TVal> PresentEntry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + PresentEntry(EntryRef { bucket, key }) + } + + /// Returns the key of the entry. + pub fn key(&self) -> &Key { + self.0.key + } + + /// Returns the value associated with the key. + pub fn value(&mut self) -> &mut TVal { + &mut self.0.bucket + .get_mut(self.0.key) + .expect("We can only build a ConnectedEntry if the entry is in the bucket; QED") + .value + } + + /// Sets the status of the entry to `NodeStatus::Disconnected`. + pub fn update(self, status: NodeStatus) -> Self { + self.0.bucket.update(self.0.key, status); + Self::new(self.0.bucket, self.0.key) + } +} + +/// An entry waiting for a slot to be available in a bucket. +#[derive(Debug)] +pub struct PendingEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); + +impl<'a, TPeerId, TVal> PendingEntry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + PendingEntry(EntryRef { bucket, key }) + } + + /// Returns the key of the entry. + pub fn key(&self) -> &Key { + self.0.key + } + + /// Returns the value associated with the key. + pub fn value(&mut self) -> &mut TVal { + self.0.bucket + .pending_mut() + .expect("We can only build a ConnectedPendingEntry if the entry is pending; QED") + .value_mut() + } + + /// Updates the status of the pending entry. + pub fn update(self, status: NodeStatus) -> PendingEntry<'a, TPeerId, TVal> { + self.0.bucket.update_pending(status); + PendingEntry::new(self.0.bucket, self.0.key) + } +} + +/// An entry that is not present in any bucket. +#[derive(Debug)] +pub struct AbsentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); + +impl<'a, TPeerId, TVal> AbsentEntry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + AbsentEntry(EntryRef { bucket, key }) + } + + /// Returns the key of the entry. + pub fn key(&self) -> &Key { + self.0.key + } + + /// Attempts to insert the entry into a bucket. + pub fn insert(self, value: TVal, status: NodeStatus) -> InsertResult { + self.0.bucket.insert(Node { + key: self.0.key.clone(), + value + }, status) + } +} + diff --git a/protocols/kad/src/kbucket/key.rs b/protocols/kad/src/kbucket/key.rs new file mode 100644 index 00000000..6c9ae934 --- /dev/null +++ b/protocols/kad/src/kbucket/key.rs @@ -0,0 +1,156 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bigint::U256; +use libp2p_core::PeerId; +use multihash::Multihash; +use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}}; + +/// A `Key` is a cryptographic hash, identifying both the nodes participating in +/// the Kademlia DHT, as well as records stored in the DHT. +/// +/// The set of all `Key`s defines the Kademlia keyspace. +/// +/// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of +/// the hash digests, interpreted as an integer. See [`Key::distance`]. +/// +/// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`]. +#[derive(Clone, Debug)] +pub struct Key { + preimage: T, + hash: GenericArray, +} + +impl PartialEq for Key { + fn eq(&self, other: &Key) -> bool { + self.hash == other.hash + } +} + +impl Eq for Key {} + +impl AsRef> for Key { + fn as_ref(&self) -> &Key { + self + } +} + +impl Key { + /// Construct a new `Key` by hashing the bytes of the given `preimage`. + /// + /// The preimage of type `T` is preserved. See [`Key::preimage`] and + /// [`Key::into_preimage`]. + pub fn new(preimage: T) -> Key + where + T: AsRef<[u8]> + { + let hash = Sha256::digest(preimage.as_ref()); + Key { preimage, hash } + } + + /// Borrows the preimage of the key. + pub fn preimage(&self) -> &T { + &self.preimage + } + + /// Converts the key into its preimage. + pub fn into_preimage(self) -> T { + self.preimage + } + + /// Computes the distance of the keys according to the XOR metric. + pub fn distance(&self, other: &Key) -> Distance { + let a = U256::from(self.hash.as_ref()); + let b = U256::from(other.hash.as_ref()); + Distance(a ^ b) + } +} + +impl From for Key { + fn from(h: Multihash) -> Self { + let k = Key::new(h.clone().into_bytes()); + Key { preimage: h, hash: k.hash } + } +} + +impl From for Key { + fn from(peer_id: PeerId) -> Self { + Key::new(peer_id) + } +} + +/// A distance between two `Key`s. +#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)] +pub struct Distance(pub(super) bigint::U256); + +#[cfg(test)] +mod tests { + use super::*; + use quickcheck::*; + + impl Arbitrary for Key { + fn arbitrary(_: &mut G) -> Key { + Key::from(PeerId::random()) + } + } + + #[test] + fn identity() { + fn prop(a: Key) -> bool { + a.distance(&a) == Distance::default() + } + quickcheck(prop as fn(_) -> _) + } + + #[test] + fn symmetry() { + fn prop(a: Key, b: Key) -> bool { + a.distance(&b) == b.distance(&a) + } + quickcheck(prop as fn(_,_) -> _) + } + + #[test] + fn triangle_inequality() { + fn prop(a: Key, b: Key, c: Key) -> TestResult { + let ab = a.distance(&b); + let bc = b.distance(&c); + let (ab_plus_bc, overflow) = ab.0.overflowing_add(bc.0); + if overflow { + TestResult::discard() + } else { + TestResult::from_bool(a.distance(&c) <= Distance(ab_plus_bc)) + } + } + quickcheck(prop as fn(_,_,_) -> _) + } + + #[test] + fn unidirectionality() { + fn prop(a: Key, b: Key) -> bool { + let d = a.distance(&b); + (0 .. 100).all(|_| { + let c = Key::from(PeerId::random()); + a.distance(&c) != d || b == c + }) + } + quickcheck(prop as fn(_,_) -> _) + } +} diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 3d8ef2a7..0c74069d 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Kademlia protocol. Allows peer discovery, records store and records fetch. +//! Implementation of the Kademlia protocol for libp2p. // TODO: we allow dead_code for now because this library contains a lot of unused code that will // be useful later for record store diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 94e2eed6..5983b8ac 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -25,6 +25,10 @@ //! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used //! to poll the underlying transport for incoming messages, and the `Sink` component //! is used to send messages to remote peers. +//! +//! [`KademliaProtocolConfig`]: protocol::KademliaProtocolConfig +//! [`KadRequestMsg`]: protocol::KadRequestMsg +//! [`KadResponseMsg`]: protocol::KadResponseMsg use bytes::BytesMut; use codec::UviBytes; diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 01c34f0e..99156f54 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -350,7 +350,7 @@ where } } - /// Consumes the query and returns the targe tand known closest peers. + /// Consumes the query and returns the target and known closest peers. /// /// > **Note**: This can be called at any time, but you normally only do that once the query /// > is finished.