diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 79b3c2f0..afcd1b22 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -44,16 +44,16 @@ fn main() { // to insert our local node in the DHT. However here we use `without_init` because this // example is very ephemeral and we don't want to pollute the DHT. In a real world // application, you want to use `new` instead. - let mut behaviour = libp2p::kad::Kademlia::without_init(local_peer_id.clone()); - behaviour.add_connected_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap()); - behaviour.add_connected_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap()); + let mut behaviour = libp2p::kad::Kademlia::new(local_peer_id.clone()); + behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap()); libp2p::core::Swarm::new(transport, behaviour, local_peer_id) }; diff --git a/protocols/kad/src/addresses.rs b/protocols/kad/src/addresses.rs index 4efb1423..3a1d6d57 100644 --- a/protocols/kad/src/addresses.rs +++ b/protocols/kad/src/addresses.rs @@ -36,11 +36,16 @@ impl Addresses { } } - /// Returns the list of addresses. + /// Returns an iterator over the list of addresses. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Converts the addresses into a `Vec`. + pub fn into_vec(self) -> Vec { + self.addrs.into_vec() + } + /// Returns true if the list of addresses is empty. pub fn is_empty(&self) -> bool { self.addrs.is_empty() diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 2fe8d4e0..2ca90e3c 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -20,7 +20,7 @@ use crate::addresses::Addresses; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn}; -use crate::kbucket::{self, KBucketsTable}; +use crate::kbucket::{self, KBucketsTable, NodeStatus}; use crate::protocol::{KadConnectionType, KadPeer}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; use fnv::{FnvHashMap, FnvHashSet}; @@ -74,13 +74,14 @@ pub struct Kademlia { /// perform in parallel. parallelism: usize, - /// `k` in the Kademlia reference papers. Number of results in a find node query. + /// The number of results to return from a query. Defaults to the maximum number + /// of entries in a single k-bucket, i.e. the `k` parameter. num_results: usize, - /// Timeout for each individual RPC query. + /// Timeout for a single RPC. rpc_timeout: Duration, - /// Events to return when polling. + /// Queued events to return when the behaviour is being polled. queued_events: SmallVec<[NetworkBehaviourAction, KademliaOut>; 32]>, /// List of providers to add to the topology as soon as we are in `poll()`. @@ -200,42 +201,37 @@ impl Kademlia { Self::new_inner(local_peer_id) } - /// Adds a known address for the given `PeerId`. We are connected to this address. - // TODO: report if the address was inserted? also, semantics unclear - pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.add_address(peer_id, address, true) - } - - /// Adds a known address for the given `PeerId`. We are not connected or don't know whether we - /// are connected to this address. - // TODO: report if the address was inserted? also, semantics unclear - pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.add_address(peer_id, address, false) - } - - /// Underlying implementation for `add_connected_address` and `add_not_connected_address`. - fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) { + /// Adds a known address of a peer participating in the Kademlia DHT to the + /// routing table. + /// + /// This allows prepopulating the Kademlia routing table with known + /// addresses, so that they can be used immediately in following DHT + /// operations involving one of these peers, without having to dial + /// them upfront. + pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { let key = kbucket::Key::new(peer_id.clone()); match self.kbuckets.entry(&key) { - kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address), - kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address), - kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address), - kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert(address), - kbucket::Entry::NotInKbucket(entry) => { + kbucket::Entry::Present(mut entry, _) => { + entry.value().insert(address); + } + kbucket::Entry::Pending(mut entry, _) => { + entry.value().insert(address); + } + kbucket::Entry::Absent(entry) => { let mut addresses = Addresses::new(); addresses.insert(address); - match entry.insert_disconnected(addresses) { - kbucket::InsertOutcome::Inserted => { + match entry.insert(addresses, NodeStatus::Disconnected) { + kbucket::InsertResult::Inserted => { let event = KademliaOut::KBucketAdded { peer_id: peer_id.clone(), replaced: None, }; self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); }, - kbucket::InsertOutcome::Full => (), - kbucket::InsertOutcome::Pending { to_ping } => { + kbucket::InsertResult::Full => (), + kbucket::InsertResult::Pending { disconnected } => { self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.into_preimage(), + peer_id: disconnected.into_preimage(), }) }, } @@ -261,16 +257,17 @@ impl Kademlia { providing_keys: FnvHashSet::default(), refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant parallelism, - num_results: 20, + num_results: kbucket::MAX_NODES_PER_BUCKET, rpc_timeout: Duration::from_secs(8), add_provider: SmallVec::new(), marker: PhantomData, } } - /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. - pub fn kbuckets_entries(&self) -> impl Iterator { - self.kbuckets.entries_not_pending().map(|(key, _)| key.preimage()) + /// Returns an iterator over all peer IDs of nodes currently contained in a bucket + /// of the Kademlia routing table. + pub fn kbuckets_entries(&mut self) -> impl Iterator { + self.kbuckets.iter().map(|entry| entry.node.key.preimage()) } /// Starts an iterative `FIND_NODE` request. @@ -334,8 +331,10 @@ impl Kademlia { untrusted_addresses: Default::default(), }; + let target_key = kbucket::Key::new(target.clone()); + let known_closest_peers = self.kbuckets - .find_closest(&kbucket::Key::new(target.clone())) + .closest_keys(&target_key) .take(self.num_results); self.active_queries.insert( @@ -376,6 +375,83 @@ impl Kademlia { query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id)) } } + + /// Finds the closest peers to a `target` in the context of a request by + /// the `source` peer, such that the `source` peer is never included in the + /// result. + fn find_closest(&mut self, target: &kbucket::Key, source: &PeerId) -> Vec { + self.kbuckets + .closest(target) + .filter(|e| e.node.key.preimage() != source) + .take(self.num_results) + .map(KadPeer::from) + .collect() + } + + /// Collects all peers who are known to be providers of the value for a given `Multihash`. + fn provider_peers(&mut self, key: &Multihash, source: &PeerId) -> Vec { + let kbuckets = &mut self.kbuckets; + self.values_providers + .get(key) + .into_iter() + .flat_map(|peers| peers) + .filter_map(move |p| + if p != source { + let key = kbucket::Key::new(p.clone()); + kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned())) + } else { + None + }) + .collect() + } + + /// Update the connection status of a peer in the Kademlia routing table. + fn connection_updated(&mut self, peer: PeerId, address: Option, new_status: NodeStatus) { + let key = kbucket::Key::new(peer.clone()); + match self.kbuckets.entry(&key) { + kbucket::Entry::Present(mut entry, old_status) => { + if let Some(address) = address { + entry.value().insert(address); + } + if old_status != new_status { + entry.update(new_status); + } + }, + + kbucket::Entry::Pending(mut entry, old_status) => { + if let Some(address) = address { + entry.value().insert(address); + } + if old_status != new_status { + entry.update(new_status); + } + }, + + kbucket::Entry::Absent(entry) => if new_status == NodeStatus::Connected { + let mut addresses = Addresses::new(); + if let Some(address) = address { + addresses.insert(address); + } + match entry.insert(addresses, new_status) { + kbucket::InsertResult::Inserted => { + let event = KademliaOut::KBucketAdded { + peer_id: peer.clone(), + replaced: None, + }; + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); + }, + kbucket::InsertResult::Full => (), + kbucket::InsertResult::Pending { disconnected } => { + debug_assert!(!self.connected_peers.contains(disconnected.preimage())); + self.queued_events.push(NetworkBehaviourAction::DialPeer { + peer_id: disconnected.into_preimage(), + }) + }, + } + }, + _ => {} + } + } } impl NetworkBehaviour for Kademlia @@ -396,11 +472,13 @@ where fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { // We should order addresses from decreasing likelyhood of connectivity, so start with // the addresses of that peer in the k-buckets. - let mut out_list = self.kbuckets - .entry(&kbucket::Key::new(peer_id.clone())) - .value_not_pending() - .map(|l| l.iter().cloned().collect::>()) - .unwrap_or_else(Vec::new); + let key = kbucket::Key::new(peer_id.clone()); + let mut out_list = + if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) { + entry.value().iter().cloned().collect::>() + } else { + Vec::new() + }; // We add to that a temporary list of addresses from the ongoing queries. for query in self.active_queries.values() { @@ -428,57 +506,7 @@ where ConnectedPoint::Listener { .. } => None, }; - let key = kbucket::Key::new(id.clone()); - - match self.kbuckets.entry(&key) { - kbucket::Entry::InKbucketConnected(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - kbucket::Entry::InKbucketConnectedPending(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - - kbucket::Entry::InKbucketDisconnected(mut entry) => { - if let Some(address) = address { - entry.value().insert(address); - } - entry.set_connected(); - }, - - kbucket::Entry::InKbucketDisconnectedPending(mut entry) => { - if let Some(address) = address { - entry.value().insert(address); - } - entry.set_connected(); - }, - - kbucket::Entry::NotInKbucket(entry) => { - let mut addresses = Addresses::new(); - if let Some(address) = address { - addresses.insert(address); - } - match entry.insert_connected(addresses) { - kbucket::InsertOutcome::Inserted => { - let event = KademliaOut::KBucketAdded { - peer_id: id.clone(), - replaced: None, - }; - self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); - }, - kbucket::InsertOutcome::Full => (), - kbucket::InsertOutcome::Pending { to_ping } => { - self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.into_preimage(), - }) - }, - } - }, - - kbucket::Entry::SelfEntry => { - unreachable!("Guaranteed to never receive disconnected even for self; QED") - }, - } - + self.connection_updated(id.clone(), address, NodeStatus::Connected); self.connected_peers.insert(id); } @@ -486,10 +514,10 @@ where if let Some(peer_id) = peer_id { let key = kbucket::Key::new(peer_id.clone()); - if let Some(list) = self.kbuckets.entry(&key).value() { + if let Some(addrs) = self.kbuckets.entry(&key).value() { // TODO: don't remove the address if the error is that we are already connected // to this peer - list.remove(addr); + addrs.remove(addr); } for query in self.active_queries.values_mut() { @@ -507,40 +535,11 @@ where } fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) { - let was_in = self.connected_peers.remove(id); - debug_assert!(was_in); - for query in self.active_queries.values_mut() { query.inject_rpc_error(id); } - - match self.kbuckets.entry(&kbucket::Key::new(id.clone())) { - kbucket::Entry::InKbucketConnected(entry) => { - match entry.set_disconnected() { - kbucket::SetDisconnectedOutcome::Kept(_) => {}, - kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => { - let event = KademliaOut::KBucketAdded { - peer_id: replacement.into_preimage(), - replaced: Some(id.clone()), - }; - self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); - }, - } - }, - kbucket::Entry::InKbucketConnectedPending(entry) => { - entry.set_disconnected(); - }, - kbucket::Entry::InKbucketDisconnected(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - kbucket::Entry::InKbucketDisconnectedPending(_) => { - unreachable!("Kbuckets are always kept in sync with the connection state; QED") - }, - kbucket::Entry::NotInKbucket(_) => {}, - kbucket::Entry::SelfEntry => { - unreachable!("Guaranteed to never receive disconnected even for self; QED") - }, - } + self.connection_updated(id.clone(), None, NodeStatus::Disconnected); + self.connected_peers.remove(id); } fn inject_replaced(&mut self, peer_id: PeerId, _old: ConnectedPoint, new_endpoint: ConnectedPoint) { @@ -554,9 +553,9 @@ where } } - if let Some(list) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() { + if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() { if let ConnectedPoint::Dialer { address } = new_endpoint { - list.insert(address); + addrs.insert(address); } } } @@ -564,13 +563,7 @@ where fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent) { match event { KademliaHandlerEvent::FindNodeReq { key, request_id } => { - let closer_peers = self.kbuckets - .find_closest(&kbucket::Key::new(key)) - .filter(|p| p.preimage() != &source) - .take(self.num_results) - .map(|key| build_kad_peer(&key, &mut self.kbuckets)) - .collect(); - + let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); self.queued_events.push(NetworkBehaviourAction::SendEvent { peer_id: source, event: KademliaHandlerIn::FindNodeRes { @@ -586,23 +579,8 @@ where self.discovered(&user_data, &source, closer_peers.iter()); } KademliaHandlerEvent::GetProvidersReq { key, request_id } => { - let provider_peers = { - let kbuckets = &mut self.kbuckets; - self.values_providers - .get(&key) - .into_iter() - .flat_map(|peers| peers) - .filter(|p| *p != &source) - .map(move |peer_id| build_kad_peer(&kbucket::Key::new(peer_id.clone()), kbuckets)) - .collect() - }; - - let closer_peers = self.kbuckets - .find_closest(&kbucket::Key::from(key)) - .take(self.num_results) - .map(|key| build_kad_peer(&key, &mut self.kbuckets)) - .collect(); - + let provider_peers = self.provider_peers(&key, &source); + let closer_peers = self.find_closest(&kbucket::Key::from(key), &source); self.queued_events.push(NetworkBehaviourAction::SendEvent { peer_id: source, event: KademliaHandlerIn::GetProvidersRes { @@ -660,7 +638,7 @@ where // Flush the changes to the topology that we want to make. for (key, provider) in self.add_provider.drain() { // Don't add ourselves to the providers. - if provider == *self.kbuckets.local_key().preimage() { + if &provider == self.kbuckets.local_key().preimage() { continue; } let providers = self.values_providers.entry(key).or_insert_with(Default::default); @@ -683,12 +661,21 @@ where } loop { - // Handle events queued by other parts of this struct + // Drain queued events first. if !self.queued_events.is_empty() { return Async::Ready(self.queued_events.remove(0)); } self.queued_events.shrink_to_fit(); + // Drain applied pending entries from the routing table. + if let Some(entry) = self.kbuckets.take_applied_pending() { + let event = KademliaOut::KBucketAdded { + peer_id: entry.inserted.into_preimage(), + replaced: entry.evicted.map(|n| n.key.into_preimage()) + }; + return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) + } + // If iterating finds a query that is finished, stores it here and stops looping. let mut finished_query = None; @@ -751,16 +738,18 @@ where break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); }, QueryInfoInner::AddProvider { target } => { - let local_key = kbucket::Key::new(parameters.local_peer_id().clone()); for closest in closer_peers { let event = NetworkBehaviourAction::SendEvent { peer_id: closest, event: KademliaHandlerIn::AddProvider { key: target.clone(), - provider_peer: build_kad_peer(&local_key, &mut self.kbuckets), + provider_peer: KadPeer { + node_id: parameters.local_peer_id().clone(), + multiaddrs: parameters.external_addresses().cloned().collect(), + connection_ty: KadConnectionType::Connected, + } }, }; - self.queued_events.push(event); } }, @@ -815,26 +804,16 @@ pub enum KademliaOut { }, } -/// Builds a `KadPeer` struct corresponding to the given `PeerId`. -/// The `PeerId` cannot be the same as the local one. -/// -/// > **Note**: This is just a convenience function that doesn't do anything note-worthy. -fn build_kad_peer( - key: &kbucket::Key, - kbuckets: &mut KBucketsTable -) -> KadPeer { - let (multiaddrs, connection_ty) = match kbuckets.entry(key) { - kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection? - kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), - kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), - kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), - kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), - kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the kbucket::Key of the local ID"), - }; - - KadPeer { - node_id: key.preimage().clone(), - multiaddrs, - connection_ty, +impl From> for KadPeer { + fn from(e: kbucket::EntryView) -> KadPeer { + KadPeer { + node_id: e.node.key.into_preimage(), + multiaddrs: e.node.value.into_vec(), + connection_ty: match e.status { + NodeStatus::Connected => KadConnectionType::Connected, + NodeStatus::Disconnected => KadConnectionType::NotConnected + } + } } } + diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 72d2cb0f..4747e889 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -97,7 +97,7 @@ fn query_iter() { // Connect each swarm in the list to its predecessor in the list. for (i, (swarm, peer)) in &mut swarms.iter_mut().skip(1).zip(swarm_ids.clone()).enumerate() { - swarm.add_not_connected_address(&peer, Protocol::Memory(port_base + i as u64).into()) + swarm.add_address(&peer, Protocol::Memory(port_base + i as u64).into()) } // Ask the last peer in the list to search a random peer. The search should @@ -150,7 +150,7 @@ fn unresponsive_not_returned_direct() { // Add fake addresses. for _ in 0 .. 10 { - swarms[0].add_not_connected_address(&PeerId::random(), Protocol::Udp(10u16).into()); + swarms[0].add_address(&PeerId::random(), Protocol::Udp(10u16).into()); } // Ask first to search a random value. @@ -189,14 +189,14 @@ fn unresponsive_not_returned_indirect() { // Add fake addresses to first. let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); for _ in 0 .. 10 { - swarms[0].add_not_connected_address( + swarms[0].add_address( &PeerId::random(), multiaddr![Udp(10u16)] ); } // Connect second to first. - swarms[1].add_not_connected_address(&first_peer_id, Protocol::Memory(port_base).into()); + swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into()); // Ask second to search a random value. let search_target = PeerId::random(); diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 5d6c8dd6..4d84f3b4 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -18,164 +18,121 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! A k-buckets table allows one to store a value identified by keys, ordered by their distance -//! to a reference key passed to the constructor. +//! Implementation of a Kademlia routing table as used by a single peer +//! participating in a Kademlia DHT. //! -//! If the local ID has `N` bits, then the k-buckets table contains `N` *buckets* each containing -//! a constant number of entries. Storing a key in the k-buckets table adds it to the bucket -//! corresponding to its distance with the reference key. +//! The entry point for the API of this module is a [`KBucketsTable`]. +//! +//! ## Pending Insertions +//! +//! When the bucket associated with the `Key` of an inserted entry is full +//! but contains disconnected nodes, it accepts a [`PendingEntry`]. +//! Pending entries are inserted lazily when their timeout is found to be expired +//! upon querying the `KBucketsTable`. When that happens, the `KBucketsTable` records +//! an [`AppliedPending`] result which must be consumed by calling [`take_applied_pending`] +//! regularly and / or after performing lookup operations like [`entry`] and [`closest`]. +//! +//! [`entry`]: kbucket::KBucketsTable::entry +//! [`closest`]: kbucket::KBucketsTable::closest +//! [`AppliedPending`]: kbucket::AppliedPending +//! [`KBucketsTable`]: kbucket::KBucketsTable +//! [`take_applied_pending`]: kbucket::KBucketsTable::take_applied_pending +//! [`PendingEntry`]: kbucket::PendingEntry -use arrayvec::ArrayVec; -use bigint::U256; -use libp2p_core::PeerId; -use multihash::Multihash; -use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}}; -use std::slice::IterMut as SliceIterMut; +// [Implementation Notes] +// +// 1. Routing Table Layout +// +// The routing table is currently implemented as a fixed-size "array" of +// buckets, ordered by increasing distance relative to a local key +// that identifies the local peer. This is an often-used, simplified +// implementation that approximates the properties of the b-tree (or prefix tree) +// implementation described in the full paper [0], whereby buckets are split on-demand. +// This should be treated as an implementation detail, however, so that the +// implementation may change in the future without breaking the API. +// +// 2. Replacement Cache +// +// In this implementation, the "replacement cache" for unresponsive peers +// consists of a single entry per bucket. Furthermore, this implementation is +// currently tailored to connection-oriented transports, meaning that the +// "LRU"-based ordering of entries in a bucket is actually based on the last reported +// connection status of the corresponding peers, from least-recently (dis)connected to +// most-recently (dis)connected, and controlled through the `Entry` API. As a result, +// the nodes in the buckets are not reordered as a result of RPC activity, but only as a +// result of nodes being marked as connected or disconnected. In particular, +// if a bucket is full and contains only entries for peers that are considered +// connected, no pending entry is accepted. See the `bucket` submodule for +// further details. +// +// [0]: https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf + +mod bucket; +mod entry; +mod key; + +pub use entry::*; + +use arrayvec::{self, ArrayVec}; +use bucket::KBucket; +use std::collections::VecDeque; use std::time::{Duration, Instant}; -use std::vec::IntoIter as VecIntoIter; /// Maximum number of k-buckets. const NUM_BUCKETS: usize = 256; -/// Maximum number of nodes in a bucket, i.e. `k`. -const MAX_NODES_PER_BUCKET: usize = 20; -/// A table of `KBucket`s, i.e. a Kademlia routing table. +/// A `KBucketsTable` represents a Kademlia routing table. #[derive(Debug, Clone)] pub struct KBucketsTable { - /// Peer ID of the local node. + /// The key identifying the local peer that owns the routing table. local_key: Key, - /// The actual tables that store peers or values. - tables: Vec>, - /// The timeout when trying to reach the youngest node after which we consider it unresponsive. - unresponsive_timeout: Duration, + /// The buckets comprising the routing table. + buckets: Vec>, + /// The list of evicted entries that have been replaced with pending + /// entries since the last call to [`KBucketsTable::take_applied_pending`]. + applied_pending: VecDeque> } -/// A `Key` is a cryptographic hash, stored with an associated value in a `KBucket` -/// of a `KBucketsTable`. -/// -/// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of -/// the hash digests, interpreted as an integer. See [`Key::distance`]. -/// -/// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`]. -#[derive(Clone, Debug)] -pub struct Key { - preimage: T, - hash: GenericArray, -} +/// A (type-safe) index into a `KBucketsTable`, i.e. a non-negative integer in the +/// interval `[0, NUM_BUCKETS)`. +#[derive(Copy, Clone)] +struct BucketIndex(usize); -impl PartialEq for Key { - fn eq(&self, other: &Key) -> bool { - self.hash == other.hash - } -} - -impl Eq for Key {} - -impl Key { - /// Construct a new `Key` by hashing the bytes of the given `preimage`. +impl BucketIndex { + /// Creates a new `BucketIndex` for a `Distance`. /// - /// The preimage of type `T` is preserved. See [`Key::preimage`] and - /// [`Key::into_preimage`]. - pub fn new(preimage: T) -> Key - where - T: AsRef<[u8]> - { - let hash = Sha256::digest(preimage.as_ref()); - Key { preimage, hash } + /// The given distance is interpreted as the distance from a `local_key` of + /// a `KBucketsTable`. If the distance is zero, `None` is returned, in + /// recognition of the fact that the only key with distance `0` to a + /// `local_key` is the `local_key` itself, which does not belong in any + /// bucket. + fn new(d: &Distance) -> Option { + (NUM_BUCKETS - d.0.leading_zeros() as usize) + .checked_sub(1) + .map(BucketIndex) } - /// Borrows the preimage of the key. - pub fn preimage(&self) -> &T { - &self.preimage + /// Gets the index value as an unsigned integer. + fn get(&self) -> usize { + self.0 } - - /// Converts the key into its preimage. - pub fn into_preimage(self) -> T { - self.preimage - } - - /// Computes the distance of the keys according to the XOR metric. - pub fn distance(&self, other: &Key) -> Distance { - let a = U256::from(self.hash.as_ref()); - let b = U256::from(other.hash.as_ref()); - Distance(a ^ b) - } -} - -impl From for Key { - fn from(h: Multihash) -> Self { - let k = Key::new(h.clone().into_bytes()); - Key { preimage: h, hash: k.hash } - } -} - -impl From for Key { - fn from(peer_id: PeerId) -> Self { - Key::new(peer_id) - } -} - -/// A distance between two `Key`s. -#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)] -pub struct Distance(bigint::U256); - -/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values, -/// ordered from least recently used to most recently used. -#[derive(Debug, Clone)] -struct KBucket { - /// Nodes are always ordered from oldest to newest. The nodes we are connected to are always - /// all on top (ie. have higher indices) of the nodes we are not connected to. - nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, - - /// Index in `nodes` over which all nodes are connected. Must always be <= to the length - /// of `nodes`. - first_connected_pos: usize, - - /// Node received when the bucket was full. Will be added to the list if the youngest node - /// doesn't respond in time to our reach attempt. - pending_node: Option>, -} - -/// State of the pending node. -#[derive(Debug, Clone)] -struct PendingNode { - /// Node to insert. - node: Node, - - /// If true, we are connected to the pending node. - connected: bool, - - /// When the pending node will replace an existing node, provided that the youngest node - /// doesn't become responsive before. - replace: Instant, -} - -/// A single node in a `KBucket`. -#[derive(Debug, Clone)] -struct Node { - /// Id of the node. - id: Key, - /// Value associated to it. - value: TVal, } impl KBucketsTable where TPeerId: Clone, { - /// Builds a new routing table whose keys are distributed over `KBucket`s as - /// per the relative distance to `local_key`. - pub fn new(local_key: Key, unresponsive_timeout: Duration) -> Self { + /// Creates a new, empty Kademlia routing table with entries partitioned + /// into buckets as per the Kademlia protocol. + /// + /// The given `pending_timeout` specifies the duration after creation of + /// a [`PendingEntry`] after which it becomes eligible for insertion into + /// a full bucket, replacing the least-recently (dis)connected node. + pub fn new(local_key: Key, pending_timeout: Duration) -> Self { KBucketsTable { local_key, - tables: (0..NUM_BUCKETS) - .map(|_| KBucket { - nodes: ArrayVec::new(), - first_connected_pos: 0, - pending_node: None, - }) - .collect(), - unresponsive_timeout, + buckets: (0 .. NUM_BUCKETS).map(|_| KBucket::new(pending_timeout)).collect(), + applied_pending: VecDeque::new() } } @@ -184,720 +141,395 @@ where &self.local_key } - /// Returns the id of the bucket that should contain the peer with the given ID. - /// - /// Returns `None` if out of range, which happens if `id` is the same as the local peer id. - fn bucket_num(&self, key: &Key) -> Option { - (NUM_BUCKETS - self.local_key.distance(key).0.leading_zeros() as usize).checked_sub(1) + /// Returns an `Entry` for the given key, representing the state of the entry + /// in the routing table. + pub fn entry<'a>(&'a mut self, key: &'a Key) -> Entry<'a, TPeerId, TVal> { + let index = BucketIndex::new(&self.local_key.distance(key)); + if let Some(i) = index { + let bucket = &mut self.buckets[i.get()]; + if let Some(applied) = bucket.apply_pending() { + self.applied_pending.push_back(applied) + } + Entry::new(bucket, key) + } else { + Entry::SelfEntry + } } - /// Returns an object containing the state of the given entry. - pub fn entry<'a>(&'a mut self, peer_id: &'a Key) -> Entry<'a, TPeerId, TVal> { - let bucket_num = if let Some(num) = self.bucket_num(peer_id) { - num - } else { - return Entry::SelfEntry; - }; - - // Update the pending node state. - // TODO: must be reported to the user somehow, in a non-annoying API - if let Some(pending) = self.tables[bucket_num].pending_node.take() { - if pending.replace < Instant::now() { - let table = &mut self.tables[bucket_num]; - let first_connected_pos = &mut table.first_connected_pos; - // If all the nodes in the bucket are connected, then there shouldn't be any - // pending node. - debug_assert!(*first_connected_pos >= 1); - table.nodes.remove(0); - if pending.connected { - *first_connected_pos -= 1; - table.nodes.insert(*first_connected_pos, pending.node); - } else { - table.nodes.insert(*first_connected_pos - 1, pending.node); + /// Returns an iterator over all the entries in the routing table. + pub fn iter<'a>(&'a mut self) -> impl Iterator> { + let applied_pending = &mut self.applied_pending; + self.buckets.iter_mut().flat_map(move |table| { + if let Some(applied) = table.apply_pending() { + applied_pending.push_back(applied) + } + let table = &*table; + table.iter().map(move |(n, status)| { + EntryRefView { + node: NodeRefView { + key: &n.key, + value: &n.value + }, + status } - } else { - self.tables[bucket_num].pending_node = Some(pending); - } - } - - // Try to find the node in the bucket. - if let Some(pos) = self.tables[bucket_num].nodes.iter().position(|p| p.id == *peer_id) { - if pos >= self.tables[bucket_num].first_connected_pos { - Entry::InKbucketConnected(EntryInKbucketConn { - parent: self, - peer_id, - }) - - } else { - Entry::InKbucketDisconnected(EntryInKbucketDisc { - parent: self, - peer_id, - }) - } - - } else if self.tables[bucket_num].pending_node.as_ref().map(|p| p.node.id == *peer_id).unwrap_or(false) { - // Node is pending. - if self.tables[bucket_num].pending_node.as_ref().map(|p| p.connected).unwrap_or(false) { - Entry::InKbucketConnectedPending(EntryInKbucketConnPending { - parent: self, - peer_id, - }) - } else { - Entry::InKbucketDisconnectedPending(EntryInKbucketDiscPending { - parent: self, - peer_id, - }) - } - - } else { - Entry::NotInKbucket(EntryNotInKbucket { - parent: self, - peer_id, }) - } - } - - /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. - pub fn entries_not_pending(&self) -> impl Iterator, &TVal)> { - self.tables - .iter() - .flat_map(|table| table.nodes.iter()) - .map(|node| (&node.id, &node.value)) - } - - /// Returns an iterator to all the buckets of this table. - /// - /// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes - /// first. - pub fn buckets(&mut self) -> BucketsIter<'_, TPeerId, TVal> { - BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout) - } - - /// Finds the keys closest to `key`, ordered by distance. - /// - /// Pending nodes are ignored. - pub fn find_closest(&mut self, key: &Key) -> VecIntoIter> { - // TODO: optimize - let mut out = Vec::new(); - for table in self.tables.iter_mut() { - for node in table.nodes.iter() { - out.push(node.id.clone()); - } - - // TODO: this code that handles the pending_node should normally be shared with - // the one in `entry()`; however right now there's no mechanism to notify the - // user when a pending node has been inserted in the table, and thus we need to - // rework this pending node handling code anyway; when that is being done, we - // should rewrite this code properly - if let Some(ref pending) = table.pending_node { - if pending.replace <= Instant::now() && pending.connected { - out.pop(); - out.push(pending.node.id.clone()); - } - } - } - out.sort_by(|a, b| key.distance(a).cmp(&key.distance(b))); - out.into_iter() - } -} - -/// Represents an entry or a potential entry in the k-buckets. -pub enum Entry<'a, TPeerId, TVal> { - /// Entry in a k-bucket that we're connected to. - InKbucketConnected(EntryInKbucketConn<'a, TPeerId, TVal>), - /// Entry pending waiting for a free slot to enter a k-bucket. We're connected to it. - InKbucketConnectedPending(EntryInKbucketConnPending<'a, TPeerId, TVal>), - /// Entry in a k-bucket but that we're not connected to. - InKbucketDisconnected(EntryInKbucketDisc<'a, TPeerId, TVal>), - /// Entry pending waiting for a free slot to enter a k-bucket. We're not connected to it. - InKbucketDisconnectedPending(EntryInKbucketDiscPending<'a, TPeerId, TVal>), - /// Entry is not present in any k-bucket. - NotInKbucket(EntryNotInKbucket<'a, TPeerId, TVal>), - /// Entry is the local peer ID. - SelfEntry, -} - -impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket, including if the node is pending. - pub fn value(&mut self) -> Option<&mut TVal> { - match self { - Entry::InKbucketConnected(entry) => Some(entry.value()), - Entry::InKbucketConnectedPending(entry) => Some(entry.value()), - Entry::InKbucketDisconnected(entry) => Some(entry.value()), - Entry::InKbucketDisconnectedPending(entry) => Some(entry.value()), - Entry::NotInKbucket(_entry) => None, - Entry::SelfEntry => None, - } - } - - /// Returns the value associated to the entry in the bucket. - pub fn value_not_pending(&mut self) -> Option<&mut TVal> { - match self { - Entry::InKbucketConnected(entry) => Some(entry.value()), - Entry::InKbucketConnectedPending(_entry) => None, - Entry::InKbucketDisconnected(entry) => Some(entry.value()), - Entry::InKbucketDisconnectedPending(_entry) => None, - Entry::NotInKbucket(_entry) => None, - Entry::SelfEntry => None, - } - } -} - -/// Represents an entry in a k-bucket. -pub struct EntryInKbucketConn<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketConn<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConn if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let peer_id = self.peer_id; - &mut table.nodes.iter_mut() - .find(move |p| p.id == *peer_id) - .expect("We can only build a EntryInKbucketConn if we know that the peer is in its \ - bucket; QED") - .value - } - - /// Reports that we are now disconnected from the given node. - /// - /// This moves the node down in its bucket. There are two possible outcomes: - /// - /// - Either we had a pending node which replaces the current node. `Replaced` is returned. - /// - Or we had no pending node, and the current node is kept. `Kept` is returned. - /// - pub fn set_disconnected(self) -> SetDisconnectedOutcome<'a, TPeerId, TVal> { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConn if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let peer_id = self.peer_id; - let pos = table.nodes.iter().position(move |elem| elem.id == *peer_id) - .expect("we can only build a EntryInKbucketConn if the node is in its bucket; QED"); - debug_assert!(table.first_connected_pos <= pos); - - // We replace it with the pending node, if any. - if let Some(pending) = table.pending_node.take() { - if pending.connected { - let removed = table.nodes.remove(pos); - let ret = SetDisconnectedOutcome::Replaced { - replacement: pending.node.id.clone(), - old_val: removed.value, - }; - table.nodes.insert(table.first_connected_pos, pending.node); - return ret; - } else { - table.pending_node = Some(pending); - } - } - - // Move the node in the bucket. - if pos != table.first_connected_pos { - let elem = table.nodes.remove(pos); - table.nodes.insert(table.first_connected_pos, elem); - } - table.first_connected_pos += 1; - - // And return a EntryInKbucketDisc. - debug_assert!(table.nodes.iter() - .position(move |e| e.id == *peer_id) - .map(|p| p < table.first_connected_pos) - .unwrap_or(false)); - - SetDisconnectedOutcome::Kept(EntryInKbucketDisc { - parent: self.parent, - peer_id: self.peer_id, }) } -} -/// Outcome of calling `set_disconnected`. -#[must_use] -pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> { - /// Node is kept in the bucket. - Kept(EntryInKbucketDisc<'a, TPeerId, TVal>), - /// Node is pushed out of the bucket. - Replaced { - /// Node that replaced the node. - // TODO: could be a EntryInKbucketConn, but we have borrow issues with the new peer id - replacement: Key, - /// Value os the node that has been pushed out. - old_val: TVal, - }, -} - -/// Represents an entry waiting for a slot to be available in its k-bucket. -pub struct EntryInKbucketConnPending<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketConnPending<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id)); - &mut table.pending_node - .as_mut() - .expect("we can only build a EntryInKbucketConnPending if the node is pending; QED") - .node.value - } - - /// Reports that we are now disconnected from the given node. - pub fn set_disconnected(self) -> EntryInKbucketDiscPending<'a, TPeerId, TVal> { - { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let mut pending = table.pending_node.as_mut() - .expect("we can only build a EntryInKbucketConnPending if there's a pending node; QED"); - debug_assert!(pending.connected); - pending.connected = false; - } - - EntryInKbucketDiscPending { - parent: self.parent, - peer_id: self.peer_id, - } - } -} - -/// Represents an entry waiting for a slot to be available in its k-bucket. -pub struct EntryInKbucketDiscPending<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketDiscPending<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id)); - &mut table.pending_node - .as_mut() - .expect("we can only build a EntryInKbucketDiscPending if the node is pending; QED") - .node.value - } - - /// Reports that we are now connected to the given node. - pub fn set_connected(self) -> EntryInKbucketConnPending<'a, TPeerId, TVal> { - { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let mut pending = table.pending_node.as_mut() - .expect("we can only build a EntryInKbucketDiscPending if there's a pending node; QED"); - debug_assert!(!pending.connected); - pending.connected = true; - } - - EntryInKbucketConnPending { - parent: self.parent, - peer_id: self.peer_id, - } - } -} - -/// Represents an entry in a k-bucket. -pub struct EntryInKbucketDisc<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryInKbucketDisc<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Returns the value associated to the entry in the bucket. - pub fn value(&mut self) -> &mut TVal { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let peer_id = self.peer_id; - &mut table.nodes.iter_mut() - .find(move |p| p.id == *peer_id) - .expect("We can only build a EntryInKbucketDisc if we know that the peer is in its \ - bucket; QED") - .value - } - - /// Sets the node as connected. This moves the entry in the bucket. - pub fn set_connected(self) -> EntryInKbucketConn<'a, TPeerId, TVal> { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - let pos = { - let peer_id = self.peer_id; - table.nodes.iter().position(move |p| p.id == *peer_id) - .expect("We can only build a EntryInKbucketDisc if we know that the peer is in \ - its bucket; QED") - }; - - // If we are the youngest node, we are now connected, which means that we have to drop the - // pending node. - // Note that it is theoretically possible that the replacement should have occurred between - // the moment when we build the `EntryInKbucketConn` and the moment when we call - // `set_connected`, but we don't take that into account. - if pos == 0 { - table.pending_node = None; - } - - debug_assert!(pos < table.first_connected_pos); - table.first_connected_pos -= 1; - if pos != table.first_connected_pos { - let entry = table.nodes.remove(pos); - table.nodes.insert(table.first_connected_pos, entry); - } - - // There shouldn't be a pending node if all slots are full of connected nodes. - debug_assert!(!(table.first_connected_pos == 0 && table.pending_node.is_some())); - - EntryInKbucketConn { - parent: self.parent, - peer_id: self.peer_id, - } - } -} - -/// Represents an entry not in any k-bucket. -pub struct EntryNotInKbucket<'a, TPeerId, TVal> { - parent: &'a mut KBucketsTable, - peer_id: &'a Key, -} - -impl<'a, TPeerId, TVal> EntryNotInKbucket<'a, TPeerId, TVal> -where - TPeerId: Clone, -{ - /// Inserts the node as connected, if possible. - pub fn insert_connected(self, value: TVal) -> InsertOutcome { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryNotInKbucket if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; - - if table.nodes.is_full() { - if table.first_connected_pos == 0 || table.pending_node.is_some() { - InsertOutcome::Full - } else { - table.pending_node = Some(PendingNode { - node: Node { id: self.peer_id.clone(), value }, - replace: Instant::now() + self.parent.unresponsive_timeout, - connected: true, - }); - InsertOutcome::Pending { - to_ping: table.nodes[0].id.clone() - } - } - } else { - table.nodes.insert(table.first_connected_pos, Node { - id: self.peer_id.clone(), - value, - }); - InsertOutcome::Inserted - } - } - - /// Inserts the node as disconnected, if possible. + /// Returns a by-reference iterator over all buckets. /// - /// > **Note**: This function will never return `Pending`. If the bucket is full, we simply - /// > do nothing. - pub fn insert_disconnected(self, value: TVal) -> InsertOutcome { - let table = { - let num = self.parent.bucket_num(&self.peer_id) - .expect("we can only build a EntryNotInKbucket if we know of a bucket; QED"); - &mut self.parent.tables[num] - }; + /// The buckets are ordered by proximity to the `local_key`, i.e. the first + /// bucket is the closest bucket (containing at most one key). + pub fn buckets<'a>(&'a mut self) -> impl Iterator> + 'a { + let applied_pending = &mut self.applied_pending; + self.buckets.iter_mut().map(move |b| { + if let Some(applied) = b.apply_pending() { + applied_pending.push_back(applied) + } + KBucketRef(b) + }) + } - if table.nodes.is_full() { - InsertOutcome::Full - } else { - table.nodes.insert(table.first_connected_pos, Node { - id: self.peer_id.clone(), - value, - }); - table.first_connected_pos += 1; - InsertOutcome::Inserted + /// Consumes the next applied pending entry, if any. + /// + /// When an entry is attempted to be inserted and the respective bucket is full, + /// it may be recorded as pending insertion after a timeout, see [`InsertResult::Pending`]. + /// + /// If the oldest currently disconnected entry in the respective bucket does not change + /// its status until the timeout of pending entry expires, it is evicted and + /// the pending entry inserted instead. These insertions of pending entries + /// happens lazily, whenever the `KBucketsTable` is accessed, and the corresponding + /// buckets are updated accordingly. The fact that a pending entry was applied is + /// recorded in the `KBucketsTable` in the form of `AppliedPending` results, which must be + /// consumed by calling this function. + pub fn take_applied_pending(&mut self) -> Option> { + self.applied_pending.pop_front() + } + + /// Returns an iterator over the keys closest to `target`, ordered by + /// increasing distance. + pub fn closest_keys<'a, T>(&'a mut self, target: &'a Key) + -> impl Iterator> + 'a + where + T: Clone + { + let distance = self.local_key.distance(target); + ClosestIter { + target, + iter: None, + table: self, + buckets_iter: ClosestBucketsIter::new(distance), + fmap: |b: &KBucket<_, _>| -> ArrayVec<_> { + b.iter().map(|(n,_)| n.key.clone()).collect() + } + } + } + + /// Returns an iterator over the nodes closest to the `target` key, ordered by + /// increasing distance. + pub fn closest<'a, T>(&'a mut self, target: &'a Key) + -> impl Iterator> + 'a + where + T: Clone, + TVal: Clone + { + let distance = self.local_key.distance(target); + ClosestIter { + target, + iter: None, + table: self, + buckets_iter: ClosestBucketsIter::new(distance), + fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_> { + b.iter().map(|(n, status)| EntryView { + node: n.clone(), + status + }).collect() + } } } } -/// Outcome of calling `insert`. -#[must_use] -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum InsertOutcome { - /// The entry has been successfully inserted. - Inserted, - /// The entry has been inserted as a pending node. - Pending { - /// We have to try connect to the returned node. - to_ping: Key, - }, - /// The entry was not inserted because the bucket was full of connected nodes. - Full, +/// An iterator over (some projection of) the closest entries in a +/// `KBucketsTable` w.r.t. some target `Key`. +struct ClosestIter<'a, TTarget, TPeerId, TVal, TMap, TOut> { + /// A reference to the target key whose distance to the local key determines + /// the order in which the buckets are traversed. The resulting + /// array from projecting the entries of each bucket using `fmap` is + /// sorted according to the distance to the target. + target: &'a Key, + /// A reference to all buckets of the `KBucketsTable`. + table: &'a mut KBucketsTable, + /// The iterator over the bucket indices in the order determined by the + /// distance of the local key to the target. + buckets_iter: ClosestBucketsIter, + /// The iterator over the entries in the currently traversed bucket. + iter: Option>, + /// The projection function / mapping applied on each bucket as + /// it is encountered, producing the next `iter`ator. + fmap: TMap } -/// Iterator giving access to a bucket. -pub struct BucketsIter<'a, TPeerId, TVal>(SliceIterMut<'a, KBucket>, Duration); +/// An iterator over the bucket indices, in the order determined by the `Distance` of +/// a target from the `local_key`, such that the entries in the buckets are incrementally +/// further away from the target, starting with the bucket covering the target. +struct ClosestBucketsIter { + /// The distance to the `local_key`. + distance: Distance, + /// The current state of the iterator. + state: ClosestBucketsIterState +} -impl<'a, TPeerId, TVal> Iterator for BucketsIter<'a, TPeerId, TVal> { - type Item = Bucket<'a, TPeerId, TVal>; +/// Operating states of a `ClosestBucketsIter`. +enum ClosestBucketsIterState { + /// The starting state of the iterator yields the first bucket index and + /// then transitions to `ZoomIn`. + Start(BucketIndex), + /// The iterator "zooms in" to to yield the next bucket cotaining nodes that + /// are incrementally closer to the local node but further from the `target`. + /// These buckets are identified by a `1` in the corresponding bit position + /// of the distance bit string. When bucket `0` is reached, the iterator + /// transitions to `ZoomOut`. + ZoomIn(BucketIndex), + /// Once bucket `0` has been reached, the iterator starts "zooming out" + /// to buckets containing nodes that are incrementally further away from + /// both the local key and the target. These are identified by a `0` in + /// the corresponding bit position of the distance bit string. When bucket + /// `255` is reached, the iterator transitions to state `Done`. + ZoomOut(BucketIndex), + /// The iterator is in this state once it has visited all buckets. + Done +} + +impl ClosestBucketsIter { + fn new(distance: Distance) -> Self { + let state = match BucketIndex::new(&distance) { + Some(i) => ClosestBucketsIterState::Start(i), + None => ClosestBucketsIterState::Done + }; + Self { distance, state } + } + + fn next_in(&self, i: BucketIndex) -> Option { + (0 .. i.get()).rev().find_map(|i| + if self.distance.0.bit(i) { + Some(BucketIndex(i)) + } else { + None + }) + } + + fn next_out(&self, i: BucketIndex) -> Option { + (i.get() + 1 .. NUM_BUCKETS).find_map(|i| + if !self.distance.0.bit(i) { + Some(BucketIndex(i)) + } else { + None + }) + } +} + +impl Iterator for ClosestBucketsIter { + type Item = BucketIndex; fn next(&mut self) -> Option { - self.0.next().map(|bucket| { - Bucket(bucket) - }) - } - - fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() + match self.state { + ClosestBucketsIterState::Start(i) => { + self.state = ClosestBucketsIterState::ZoomIn(i); + Some(i) + } + ClosestBucketsIterState::ZoomIn(i) => + if let Some(i) = self.next_in(i) { + self.state = ClosestBucketsIterState::ZoomIn(i); + Some(i) + } else { + let i = BucketIndex(0); + self.state = ClosestBucketsIterState::ZoomOut(i); + Some(i) + } + ClosestBucketsIterState::ZoomOut(i) => + if let Some(i) = self.next_out(i) { + self.state = ClosestBucketsIterState::ZoomOut(i); + Some(i) + } else { + self.state = ClosestBucketsIterState::Done; + None + } + ClosestBucketsIterState::Done => None + } } } -impl<'a, TPeerId, TVal> ExactSizeIterator for BucketsIter<'a, TPeerId, TVal> {} +impl Iterator +for ClosestIter<'_, TTarget, TPeerId, TVal, TMap, TOut> +where + TPeerId: Clone, + TMap: Fn(&KBucket) -> ArrayVec<[TOut; MAX_NODES_PER_BUCKET]>, + TOut: AsRef> +{ + type Item = TOut; -/// Access to a bucket. -pub struct Bucket<'a, TPeerId, TVal>(&'a mut KBucket); + fn next(&mut self) -> Option { + loop { + match &mut self.iter { + Some(iter) => match iter.next() { + Some(k) => return Some(k), + None => self.iter = None + } + None => { + if let Some(i) = self.buckets_iter.next() { + let bucket = &mut self.table.buckets[i.get()]; + if let Some(applied) = bucket.apply_pending() { + self.table.applied_pending.push_back(applied) + } + let mut v = (self.fmap)(bucket); + v.sort_by(|a, b| + self.target.distance(a.as_ref()) + .cmp(&self.target.distance(b.as_ref()))); + self.iter = Some(v.into_iter()); + } else { + return None + } + } + } + } + } +} -impl<'a, TPeerId, TVal> Bucket<'a, TPeerId, TVal> { - /// Returns the number of entries in that bucket. - /// - /// > **Note**: Keep in mind that this operation can be racy. If `update()` is called on the - /// > table while this function is running, the `update()` may or may not be taken - /// > into account. +/// A reference to a bucket in a `KBucketsTable`. +pub struct KBucketRef<'a, TPeerId, TVal>(&'a mut KBucket); + +impl KBucketRef<'_, TPeerId, TVal> +where + TPeerId: Clone +{ + /// Returns the number of entries in the bucket. pub fn num_entries(&self) -> usize { - self.0.nodes.len() + self.0.num_entries() } - /// Returns true if this bucket has a pending node. + /// Returns true if the bucket has a pending node. pub fn has_pending(&self) -> bool { - if let Some(ref node) = self.0.pending_node { - node.replace > Instant::now() - } else { - false - } + self.0.pending().map_or(false, |n| !n.is_ready()) } } #[cfg(test)] mod tests { use super::*; - use quickcheck::*; use libp2p_core::PeerId; - use crate::kbucket::{Entry, InsertOutcome, KBucketsTable, MAX_NODES_PER_BUCKET}; - use std::time::Duration; - - impl Arbitrary for Key { - fn arbitrary(_: &mut G) -> Key { - Key::from(PeerId::random()) - } - } - - #[test] - fn identity() { - fn prop(a: Key) -> bool { - a.distance(&a) == Distance::default() - } - quickcheck(prop as fn(_) -> _) - } - - #[test] - fn symmetry() { - fn prop(a: Key, b: Key) -> bool { - a.distance(&b) == b.distance(&a) - } - quickcheck(prop as fn(_,_) -> _) - } - - #[test] - fn triangle_inequality() { - fn prop(a: Key, b: Key, c: Key) -> TestResult { - let ab = a.distance(&b); - let bc = b.distance(&c); - let (ab_plus_bc, overflow) = ab.0.overflowing_add(bc.0); - if overflow { - TestResult::discard() - } else { - TestResult::from_bool(a.distance(&c) <= Distance(ab_plus_bc)) - } - } - quickcheck(prop as fn(_,_,_) -> _) - } - - #[test] - fn unidirectionality() { - fn prop(a: Key, b: Key) -> bool { - let d = a.distance(&b); - (0..100).all(|_| { - let c = Key::from(PeerId::random()); - a.distance(&c) != d || b == c - }) - } - quickcheck(prop as fn(_,_) -> _) - } - #[test] fn basic_closest() { - let my_key = Key::from(PeerId::random()); + let local_key = Key::from(PeerId::random()); let other_id = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(my_key, Duration::from_secs(5)); - if let Entry::NotInKbucket(entry) = table.entry(&other_id) { - match entry.insert_connected(()) { - InsertOutcome::Inserted => (), + let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + if let Entry::Absent(entry) = table.entry(&other_id) { + match entry.insert((), NodeStatus::Connected) { + InsertResult::Inserted => (), _ => panic!() } } else { panic!() } - let res = table.find_closest(&other_id).collect::>(); + let res = table.closest_keys(&other_id).collect::>(); assert_eq!(res.len(), 1); assert_eq!(res[0], other_id); } #[test] fn update_local_id_fails() { - let my_key = Key::from(PeerId::random()); - - let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5)); - match table.entry(&my_key) { + let local_key = Key::from(PeerId::random()); + let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_secs(5)); + match table.entry(&local_key) { Entry::SelfEntry => (), _ => panic!(), } } #[test] - fn full_kbucket() { - let my_key = Key::from(PeerId::random()); - - let mut table = KBucketsTable::<_, ()>::new(my_key.clone(), Duration::from_secs(5)); - - // Step 1: Fill the most distant bucket, i.e. bucket index `NUM_BUCKETS - 1`, - // with "disconnected" peers. - - // Prepare `MAX_NODES_PER_BUCKET` keys to fill the bucket, plus 2 - // additional keys which will be used to test the behavior on a full - // bucket. - assert!(MAX_NODES_PER_BUCKET <= 251); // Test doesn't work otherwise. - let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3) - .map(|n| { - let mut id = my_key.clone(); - // Flip the first bit so that we get in the most distant bucket. - id.hash[0] ^= 0x80; - // Each ID gets a unique low-order byte (i.e. counter). - id.hash[31] = id.hash[31].wrapping_add(n as u8); - id - }) - .collect::>(); - - let first_node = fill_ids[0].clone(); - let second_node = fill_ids[1].clone(); - - // Fill the bucket, consuming all but the last 3 test keys. - for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { - if let Entry::NotInKbucket(entry) = table.entry(&id) { - match entry.insert_disconnected(()) { - InsertOutcome::Inserted => (), - _ => panic!() + fn closest() { + let local_key = Key::from(PeerId::random()); + let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + let mut count = 0; + loop { + if count == 100 { break; } + let key = Key::from(PeerId::random()); + if let Entry::Absent(e) = table.entry(&key) { + match e.insert((), NodeStatus::Connected) { + InsertResult::Inserted => count += 1, + _ => continue, } } else { - panic!() + panic!("entry exists") } - assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); - } - assert_eq!( - table.buckets().nth(255).unwrap().num_entries(), - MAX_NODES_PER_BUCKET - ); - assert!(!table.buckets().nth(255).unwrap().has_pending()); - - // Step 2: Insert another key on the full bucket. It must be marked as - // pending and the first (i.e. "least recently used") entry scheduled - // for replacement. - - if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { - match entry.insert_connected(()) { - InsertOutcome::Pending { ref to_ping } if *to_ping == first_node => (), - _ => panic!() - } - } else { - panic!() - } - assert_eq!( - table.buckets().nth(255).unwrap().num_entries(), - MAX_NODES_PER_BUCKET - ); - assert!(table.buckets().nth(255).unwrap().has_pending()); - // Trying to insert yet another key is rejected. - if let Entry::NotInKbucket(entry) = table.entry(&Key::from(fill_ids.remove(0))) { - match entry.insert_connected(()) { - InsertOutcome::Full => (), - _ => panic!() - } - } else { - panic!() } - // Step 3: Make the pending nodes eligible for replacing existing nodes. - // The pending node must be consumed and replace the first (i.e. "least - // recently used") node. + let mut expected_keys: Vec<_> = table.buckets + .iter() + .flat_map(|t| t.iter().map(|(n,_)| n.key.clone())) + .collect(); - let elapsed = Instant::now() - Duration::from_secs(1); - table.tables[255].pending_node.as_mut().map(|n| n.replace = elapsed); - assert!(!table.buckets().nth(255).unwrap().has_pending()); - if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { - match entry.insert_connected(()) { - InsertOutcome::Pending { ref to_ping } if *to_ping == second_node => (), - _ => panic!() - } - } else { - panic!() + for _ in 0 .. 10 { + let target_key = Key::from(PeerId::random()); + let keys = table.closest_keys(&target_key).collect::>(); + // The list of keys is expected to match the result of a full-table scan. + expected_keys.sort_by_key(|k| k.distance(&target_key)); + assert_eq!(keys, expected_keys); } } + + #[test] + fn applied_pending() { + let local_key = Key::from(PeerId::random()); + let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_millis(1)); + let expected_applied; + let full_bucket_index; + loop { + let key = Key::from(PeerId::random()); + if let Entry::Absent(e) = table.entry(&key) { + match e.insert((), NodeStatus::Disconnected) { + InsertResult::Full => { + if let Entry::Absent(e) = table.entry(&key) { + match e.insert((), NodeStatus::Connected) { + InsertResult::Pending { disconnected } => { + expected_applied = AppliedPending { + inserted: key.clone(), + evicted: Some(Node { key: disconnected, value: () }) + }; + full_bucket_index = BucketIndex::new(&key.distance(&local_key)); + break + }, + _ => panic!() + } + } else { + panic!() + } + }, + _ => continue, + } + } else { + panic!("entry exists") + } + } + + // Expire the timeout for the pending entry on the full bucket.` + let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()]; + let elapsed = Instant::now() - Duration::from_secs(1); + full_bucket.pending_mut().unwrap().set_ready_at(elapsed); + + match table.entry(&expected_applied.inserted) { + Entry::Present(_, NodeStatus::Connected) => {} + x => panic!("Unexpected entry: {:?}", x) + } + + match table.entry(&expected_applied.evicted.as_ref().unwrap().key) { + Entry::Absent(_) => {} + x => panic!("Unexpected entry: {:?}", x) + } + + assert_eq!(Some(expected_applied), table.take_applied_pending()); + assert_eq!(None, table.take_applied_pending()); + } } diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs new file mode 100644 index 00000000..2133416e --- /dev/null +++ b/protocols/kad/src/kbucket/bucket.rs @@ -0,0 +1,552 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The internal API for a single `KBucket` in a `KBucketsTable`. +//! +//! > **Note**: Uniqueness of entries w.r.t. a `Key` in a `KBucket` is not +//! > checked in this module. This is an invariant that must hold across all +//! > buckets in a `KBucketsTable` and hence is enforced by the public API +//! > of the `KBucketsTable` and in particular the public `Entry` API. + +use super::*; + +/// Maximum number of nodes in a bucket, i.e. the (fixed) `k` parameter. +pub const MAX_NODES_PER_BUCKET: usize = 20; + +/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`. +#[derive(Debug, Clone)] +pub struct PendingNode { + /// The pending node to insert. + node: Node, + + /// The status of the pending node. + status: NodeStatus, + + /// The instant at which the pending node is eligible for insertion into a bucket. + replace: Instant, +} + +/// The status of a node in a bucket. +/// +/// The status of a node in a bucket together with the time of the +/// last status change determines the position of the node in a +/// bucket. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum NodeStatus { + /// The node is considered connected. + Connected, + /// The node is considered disconnected. + Disconnected +} + +impl PendingNode { + pub fn key(&self) -> &Key { + &self.node.key + } + + pub fn status(&self) -> NodeStatus { + self.status + } + + pub fn value_mut(&mut self) -> &mut TVal { + &mut self.node.value + } + + pub fn is_ready(&self) -> bool { + Instant::now() >= self.replace + } + + pub fn set_ready_at(&mut self, t: Instant) { + self.replace = t; + } +} + +/// A `Node` in a bucket, representing a peer participating +/// in the Kademlia DHT together with an associated value (e.g. contact +/// information). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Node { + /// The key of the node, identifying the peer. + pub key: Key, + /// The associated value. + pub value: TVal, +} + +/// The position of a node in a `KBucket`, i.e. a non-negative integer +/// in the range `[0, MAX_NODES_PER_BUCKET)`. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Position(usize); + +/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values, +/// ordered from least-recently connected to most-recently connected. +#[derive(Debug, Clone)] +pub struct KBucket { + /// The nodes contained in the bucket. + nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, + + /// The position (index) in `nodes` that marks the first connected node. + /// + /// Since the entries in `nodes` are ordered from least-recently connected to + /// most-recently connected, all entries above this index are also considered + /// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries + /// that are considered disconnected and the range + /// `[first_connected_pos, MAX_NODES_PER_BUCKET)` marks sub-list of entries that are + /// considered connected. + /// + /// `None` indicates that there are no connected entries in the bucket, i.e. + /// the bucket is either empty, or contains only entries for peers that are + /// considered disconnected. + first_connected_pos: Option, + + /// A node that is pending to be inserted into a full bucket, should the + /// least-recently connected (and currently disconnected) node not be + /// marked as connected within `unresponsive_timeout`. + pending: Option>, + + /// The timeout window before a new pending node is eligible for insertion, + /// if the least-recently connected node is not updated as being connected + /// in the meantime. + pending_timeout: Duration +} + +/// The result of inserting an entry into a bucket. +#[must_use] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum InsertResult { + /// The entry has been successfully inserted. + Inserted, + /// The entry is pending insertion because the relevant bucket is currently full. + /// The entry is inserted after a timeout elapsed, if the status of the + /// least-recently connected (and currently disconnected) node in the bucket + /// is not updated before the timeout expires. + Pending { + /// The key of the least-recently connected entry that is currently considered + /// disconnected and whose corresponding peer should be checked for connectivity + /// in order to prevent it from being evicted. If connectivity to the peer is + /// re-established, the corresponding entry should be updated with + /// [`NodeStatus::Connected`]. + disconnected: Key + }, + /// The entry was not inserted because the relevant bucket is full. + Full +} + +/// The result of applying a pending node to a bucket, possibly +/// replacing an existing node. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AppliedPending { + /// The key of the inserted pending node. + pub inserted: Key, + /// The node that has been evicted from the bucket to make room for the + /// pending node, if any. + pub evicted: Option> +} + +impl KBucket +where + TPeerId: Clone +{ + /// Creates a new `KBucket` with the given timeout for pending entries. + pub fn new(pending_timeout: Duration) -> Self { + KBucket { + nodes: ArrayVec::new(), + first_connected_pos: None, + pending: None, + pending_timeout, + } + } + + /// Returns a reference to the pending node of the bucket, if there is any. + pub fn pending(&self) -> Option<&PendingNode> { + self.pending.as_ref() + } + + /// Returns a mutable reference to the pending node of the bucket, if there is any. + pub fn pending_mut(&mut self) -> Option<&mut PendingNode> { + self.pending.as_mut() + } + + /// Returns a reference to the pending node of the bucket, if there is any + /// with a matching key. + pub fn as_pending(&self, key: &Key) -> Option<&PendingNode> { + self.pending().filter(|p| &p.node.key == key) + } + + /// Returns a reference to a node in the bucket. + pub fn get(&self, key: &Key) -> Option<&Node> { + self.position(key).map(|p| &self.nodes[p.0]) + } + + /// Returns an iterator over the nodes in the bucket, together with their status. + pub fn iter(&self) -> impl Iterator, NodeStatus)> { + self.nodes.iter().enumerate().map(move |(p, n)| (n, self.status(Position(p)))) + } + + /// Inserts the pending node into the bucket, if its timeout has elapsed, + /// replacing the least-recently connected node. + /// + /// If a pending node has been inserted, its key is returned together with + /// the node that was replaced. `None` indicates that the nodes in the + /// bucket remained unchanged. + pub fn apply_pending(&mut self) -> Option> { + if let Some(pending) = self.pending.take() { + if pending.replace <= Instant::now() { + if self.nodes.is_full() { + if self.status(Position(0)) == NodeStatus::Connected { + // The bucket is full with connected nodes. Drop the pending node. + return None + } + // The pending node will be inserted. + let inserted = pending.node.key.clone(); + // A connected pending node goes at the end of the list for + // the connected peers, removing the least-recently connected. + if pending.status == NodeStatus::Connected { + let evicted = Some(self.nodes.remove(0)); + self.first_connected_pos = self.first_connected_pos + .map_or_else( + | | Some(self.nodes.len()), + |p| p.checked_sub(1)); + self.nodes.push(pending.node); + return Some(AppliedPending { inserted, evicted }) + } + // A disconnected pending node goes at the end of the list + // for the disconnected peers. + else if let Some(p) = self.first_connected_pos { + if let Some(insert_pos) = p.checked_sub(1) { + let evicted = Some(self.nodes.remove(0)); + self.nodes.insert(insert_pos, pending.node); + return Some(AppliedPending { inserted, evicted }) + } + } else { + // All nodes are disconnected. Insert the new node as the most + // recently disconnected, removing the least-recently disconnected. + let evicted = Some(self.nodes.remove(0)); + self.nodes.push(pending.node); + return Some(AppliedPending { inserted, evicted }) + } + } else { + // There is room in the bucket, so just insert the pending node. + let inserted = pending.node.key.clone(); + match self.insert(pending.node, pending.status) { + InsertResult::Inserted => + return Some(AppliedPending { inserted, evicted: None }), + _ => unreachable!("Bucket is not full.") + } + } + } else { + self.pending = Some(pending); + } + } + + return None + } + + /// Updates the status of the pending node, if any. + pub fn update_pending(&mut self, status: NodeStatus) { + if let Some(pending) = &mut self.pending { + pending.status = status + } + } + + /// Updates the status of the node referred to by the given key, if it is + /// in the bucket. + pub fn update(&mut self, key: &Key, status: NodeStatus) { + if let Some(pos) = self.position(key) { + let node = self.nodes.remove(pos.0); + if pos == Position(0) && status == NodeStatus::Connected { + self.pending = None + } + match self.insert(node, status) { + InsertResult::Inserted => {}, + _ => unreachable!("The node is removed before being (re)inserted.") + } + } + } + + /// Inserts a new node into the bucket with the given status. + /// + /// The status of the node to insert determines the result as follows: + /// + /// * `NodeStatus::Connected`: If the bucket is full and either all nodes are connected + /// or there is already a pending node, insertion fails with `InsertResult::Full`. + /// If the bucket is full but at least one node is disconnected and there is no pending + /// node, the new node is inserted as pending, yielding `InsertResult::Pending`. + /// Otherwise the bucket has free slots and the new node is added to the end of the + /// bucket as the most-recently connected node. + /// + /// * `NodeStatus::Disconnected`: If the bucket is full, insertion fails with + /// `InsertResult::Full`. Otherwise the bucket has free slots and the new node + /// is inserted at the position preceding the first connected node, + /// i.e. as the most-recently disconnected node. If there are no connected nodes, + /// the new node is added as the last element of the bucket. + /// + pub fn insert(&mut self, node: Node, status: NodeStatus) -> InsertResult { + match status { + NodeStatus::Connected => { + if self.nodes.is_full() { + if self.first_connected_pos == Some(0) || self.pending.is_some() { + return InsertResult::Full + } else { + self.pending = Some(PendingNode { + node, + status: NodeStatus::Connected, + replace: Instant::now() + self.pending_timeout, + }); + return InsertResult::Pending { + disconnected: self.nodes[0].key.clone() + } + } + } + let pos = self.nodes.len(); + self.first_connected_pos = self.first_connected_pos.or(Some(pos)); + self.nodes.push(node); + InsertResult::Inserted + } + NodeStatus::Disconnected => { + if self.nodes.is_full() { + return InsertResult::Full + } + if let Some(ref mut first_connected_pos) = self.first_connected_pos { + self.nodes.insert(*first_connected_pos, node); + *first_connected_pos += 1; + } else { + self.nodes.push(node); + } + InsertResult::Inserted + } + } + } + + /// Returns the status of the node at the given position. + pub fn status(&self, pos: Position) -> NodeStatus { + if self.first_connected_pos.map_or(false, |i| pos.0 >= i) { + NodeStatus::Connected + } else { + NodeStatus::Disconnected + } + } + + /// Checks whether the given position refers to a connected node. + pub fn is_connected(&self, pos: Position) -> bool { + self.status(pos) == NodeStatus::Connected + } + + /// Gets the number of entries currently in the bucket. + pub fn num_entries(&self) -> usize { + self.nodes.len() + } + + /// Gets the number of entries in the bucket that are considered connected. + pub fn num_connected(&self) -> usize { + self.first_connected_pos.map_or(0, |i| self.nodes.len() - i) + } + + /// Gets the number of entries in the bucket that are considered disconnected. + pub fn num_disconnected(&self) -> usize { + self.nodes.len() - self.num_connected() + } + + /// Gets the position of an node in the bucket. + pub fn position(&self, key: &Key) -> Option { + self.nodes.iter().position(|p| &p.key == key).map(Position) + } + + /// Gets a mutable reference to the node identified by the given key. + /// + /// Returns `None` if the given key does not refer to an node in the + /// bucket. + pub fn get_mut(&mut self, key: &Key) -> Option<&mut Node> { + self.nodes.iter_mut().find(move |p| &p.key == key) + } +} + +#[cfg(test)] +mod tests { + use libp2p_core::PeerId; + use rand::Rng; + use std::collections::VecDeque; + use super::*; + use quickcheck::*; + + impl Arbitrary for NodeStatus { + fn arbitrary(g: &mut G) -> NodeStatus { + if g.gen() { + NodeStatus::Connected + } else { + NodeStatus::Disconnected + } + } + } + + fn fill_bucket(bucket: &mut KBucket, status: NodeStatus) { + for i in 0 .. MAX_NODES_PER_BUCKET - bucket.num_entries() { + let key = Key::new(PeerId::random()); + let node = Node { key, value: () }; + assert_eq!(InsertResult::Inserted, bucket.insert(node, status)); + assert_eq!(bucket.num_entries(), i + 1); + } + assert!(bucket.pending().is_none()); + } + + #[test] + fn ordering() { + fn prop(status: Vec) -> bool { + let mut bucket = KBucket::::new(Duration::from_secs(1)); + + // The expected lists of connected and disconnected nodes. + let mut connected = VecDeque::new(); + let mut disconnected = VecDeque::new(); + + // Fill the bucket, thereby populating the expected lists in insertion order. + for status in status { + let key = Key::new(PeerId::random()); + let node = Node { key: key.clone(), value: () }; + let full = bucket.num_entries() == MAX_NODES_PER_BUCKET; + match bucket.insert(node, status) { + InsertResult::Inserted => { + let vec = match status { + NodeStatus::Connected => &mut connected, + NodeStatus::Disconnected => &mut disconnected + }; + if full { + vec.pop_front(); + } + vec.push_back((status, key.clone())); + } + _ => {} + } + } + + // Get all nodes from the bucket, together with their status. + let mut nodes = bucket.iter() + .map(|(n, s)| (s, n.key.clone())) + .collect::>(); + + // Split the list of nodes at the first connected node. + let first_connected_pos = nodes.iter().position(|(s,_)| *s == NodeStatus::Connected); + assert_eq!(bucket.first_connected_pos, first_connected_pos); + let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p)); + + // All nodes before the first connected node must be disconnected and + // in insertion order. Similarly, all remaining nodes must be connected + // and in insertion order. + nodes == Vec::from(disconnected) + && + tail == Vec::from(connected) + } + + quickcheck(prop as fn(_) -> _); + } + + #[test] + fn full_bucket() { + let mut bucket = KBucket::::new(Duration::from_secs(1)); + + // Fill the bucket with disconnected nodes. + fill_bucket(&mut bucket, NodeStatus::Disconnected); + + // Trying to insert another disconnected node fails. + let key = Key::new(PeerId::random()); + let node = Node { key, value: () }; + match bucket.insert(node, NodeStatus::Disconnected) { + InsertResult::Full => {}, + x => panic!("{:?}", x) + } + + // One-by-one fill the bucket with connected nodes, replacing the disconnected ones. + for i in 0 .. MAX_NODES_PER_BUCKET { + let (first, first_status) = bucket.iter().next().unwrap(); + let first_disconnected = first.clone(); + assert_eq!(first_status, NodeStatus::Disconnected); + + // Add a connected node, which is expected to be pending, scheduled to + // replace the first (i.e. least-recently connected) node. + let key = Key::new(PeerId::random()); + let node = Node { key: key.clone(), value: () }; + match bucket.insert(node.clone(), NodeStatus::Connected) { + InsertResult::Pending { disconnected } => + assert_eq!(disconnected, first_disconnected.key), + x => panic!("{:?}", x) + } + + // Trying to insert another connected node fails. + match bucket.insert(node.clone(), NodeStatus::Connected) { + InsertResult::Full => {}, + x => panic!("{:?}", x) + } + + assert!(bucket.pending().is_some()); + + // Apply the pending node. + let pending = bucket.pending_mut().expect("No pending node."); + pending.set_ready_at(Instant::now() - Duration::from_secs(1)); + let result = bucket.apply_pending(); + assert_eq!(result, Some(AppliedPending { + inserted: key.clone(), + evicted: Some(first_disconnected) + })); + assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last()); + assert!(bucket.pending().is_none()); + assert_eq!(Some(MAX_NODES_PER_BUCKET - (i + 1)), bucket.first_connected_pos); + } + + assert!(bucket.pending().is_none()); + assert_eq!(MAX_NODES_PER_BUCKET, bucket.num_entries()); + + // Trying to insert another connected node fails. + let key = Key::new(PeerId::random()); + let node = Node { key, value: () }; + match bucket.insert(node, NodeStatus::Connected) { + InsertResult::Full => {}, + x => panic!("{:?}", x) + } + } + + #[test] + fn full_bucket_discard_pending() { + let mut bucket = KBucket::::new(Duration::from_secs(1)); + fill_bucket(&mut bucket, NodeStatus::Disconnected); + let (first, _) = bucket.iter().next().unwrap(); + let first_disconnected = first.clone(); + + // Add a connected pending node. + let key = Key::new(PeerId::random()); + let node = Node { key: key.clone(), value: () }; + if let InsertResult::Pending { disconnected } = bucket.insert(node, NodeStatus::Connected) { + assert_eq!(&disconnected, &first_disconnected.key); + } else { + panic!() + } + assert!(bucket.pending().is_some()); + + // Update the status of the first disconnected node to be connected. + bucket.update(&first_disconnected.key, NodeStatus::Connected); + + // The pending node has been discarded. + assert!(bucket.pending().is_none()); + assert!(bucket.iter().all(|(n,_)| &n.key != &key)); + + // The initially disconnected node is now the most-recently connected. + assert_eq!(Some((&first_disconnected, NodeStatus::Connected)), bucket.iter().last()); + assert_eq!(bucket.position(&first_disconnected.key).map(|p| p.0), bucket.first_connected_pos); + assert_eq!(1, bucket.num_connected()); + assert_eq!(MAX_NODES_PER_BUCKET - 1, bucket.num_disconnected()); + } +} diff --git a/protocols/kad/src/kbucket/entry.rs b/protocols/kad/src/kbucket/entry.rs new file mode 100644 index 00000000..7065490b --- /dev/null +++ b/protocols/kad/src/kbucket/entry.rs @@ -0,0 +1,254 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The `Entry` API for quering and modifying the entries of a `KBucketsTable` +//! representing the nodes participating in the Kademlia DHT. + +pub use super::bucket::{Node, NodeStatus, InsertResult, AppliedPending, MAX_NODES_PER_BUCKET}; +pub use super::key::*; + +use super::*; + +/// An immutable by-reference view of a bucket entry. +pub struct EntryRefView<'a, TPeerId, TVal> { + /// The node represented by the entry. + pub node: NodeRefView<'a, TPeerId, TVal>, + /// The status of the node identified by the key. + pub status: NodeStatus +} + +/// An immutable by-reference view of a `Node`. +pub struct NodeRefView<'a, TPeerId, TVal> { + pub key: &'a Key, + pub value: &'a TVal +} + +impl EntryRefView<'_, TPeerId, TVal> { + pub fn to_owned(&self) -> EntryView + where + TPeerId: Clone, + TVal: Clone + { + EntryView { + node: Node { + key: self.node.key.clone(), + value: self.node.value.clone() + }, + status: self.status + } + } +} + +/// A cloned, immutable view of an entry that is either present in a bucket +/// or pending insertion. +#[derive(Clone, Debug)] +pub struct EntryView { + /// The node represented by the entry. + pub node: Node, + /// The status of the node. + pub status: NodeStatus +} + +impl AsRef> for EntryView { + fn as_ref(&self) -> &Key { + &self.node.key + } +} + +/// A reference into a single entry of a `KBucketsTable`. +#[derive(Debug)] +pub enum Entry<'a, TPeerId, TVal> { + /// The entry is present in a bucket. + Present(PresentEntry<'a, TPeerId, TVal>, NodeStatus), + /// The entry is pending insertion in a bucket. + Pending(PendingEntry<'a, TPeerId, TVal>, NodeStatus), + /// The entry is absent and may be inserted. + Absent(AbsentEntry<'a, TPeerId, TVal>), + /// The entry represents the local node. + SelfEntry, +} + +/// The internal representation of the different states of an `Entry`, +/// referencing the associated key and bucket. +#[derive(Debug)] +struct EntryRef<'a, TPeerId, TVal> { + bucket: &'a mut KBucket, + key: &'a Key, +} + +impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + /// Creates a new `Entry` for a `Key`, encapsulating access to a bucket. + pub(super) fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + if let Some(pos) = bucket.position(key) { + let status = bucket.status(pos); + Entry::Present(PresentEntry::new(bucket, key), status) + } else if let Some(pending) = bucket.as_pending(key) { + let status = pending.status(); + Entry::Pending(PendingEntry::new(bucket, key), status) + } else { + Entry::Absent(AbsentEntry::new(bucket, key)) + } + } + + /// Creates an immutable by-reference view of the entry. + /// + /// Returns `None` if the entry is neither present in a bucket nor + /// pending insertion into a bucket. + pub fn view(&'a mut self) -> Option> { + match self { + Entry::Present(entry, status) => Some(EntryRefView { + node: NodeRefView { + key: entry.0.key, + value: entry.value() + }, + status: *status + }), + Entry::Pending(entry, status) => Some(EntryRefView { + node: NodeRefView { + key: entry.0.key, + value: entry.value() + }, + status: *status + }), + _ => None + } + } + + /// Returns the key of the entry. + /// + /// Returns `None` if the `Key` used to construct this `Entry` is not a valid + /// key for an entry in a bucket, which is the case for the `local_key` of + /// the `KBucketsTable` referring to the local node. + pub fn key(&self) -> Option<&Key> { + match self { + Entry::Present(entry, _) => Some(entry.key()), + Entry::Pending(entry, _) => Some(entry.key()), + Entry::Absent(entry) => Some(entry.key()), + Entry::SelfEntry => None, + } + } + + /// Returns the value associated with the entry. + /// + /// Returns `None` if the entry absent from any bucket or refers to the + /// local node. + pub fn value(&mut self) -> Option<&mut TVal> { + match self { + Entry::Present(entry, _) => Some(entry.value()), + Entry::Pending(entry, _) => Some(entry.value()), + Entry::Absent(_) => None, + Entry::SelfEntry => None, + } + } +} + +/// An entry present in a bucket. +#[derive(Debug)] +pub struct PresentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); + +impl<'a, TPeerId, TVal> PresentEntry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + PresentEntry(EntryRef { bucket, key }) + } + + /// Returns the key of the entry. + pub fn key(&self) -> &Key { + self.0.key + } + + /// Returns the value associated with the key. + pub fn value(&mut self) -> &mut TVal { + &mut self.0.bucket + .get_mut(self.0.key) + .expect("We can only build a ConnectedEntry if the entry is in the bucket; QED") + .value + } + + /// Sets the status of the entry to `NodeStatus::Disconnected`. + pub fn update(self, status: NodeStatus) -> Self { + self.0.bucket.update(self.0.key, status); + Self::new(self.0.bucket, self.0.key) + } +} + +/// An entry waiting for a slot to be available in a bucket. +#[derive(Debug)] +pub struct PendingEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); + +impl<'a, TPeerId, TVal> PendingEntry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + PendingEntry(EntryRef { bucket, key }) + } + + /// Returns the key of the entry. + pub fn key(&self) -> &Key { + self.0.key + } + + /// Returns the value associated with the key. + pub fn value(&mut self) -> &mut TVal { + self.0.bucket + .pending_mut() + .expect("We can only build a ConnectedPendingEntry if the entry is pending; QED") + .value_mut() + } + + /// Updates the status of the pending entry. + pub fn update(self, status: NodeStatus) -> PendingEntry<'a, TPeerId, TVal> { + self.0.bucket.update_pending(status); + PendingEntry::new(self.0.bucket, self.0.key) + } +} + +/// An entry that is not present in any bucket. +#[derive(Debug)] +pub struct AbsentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); + +impl<'a, TPeerId, TVal> AbsentEntry<'a, TPeerId, TVal> +where + TPeerId: Clone, +{ + fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { + AbsentEntry(EntryRef { bucket, key }) + } + + /// Returns the key of the entry. + pub fn key(&self) -> &Key { + self.0.key + } + + /// Attempts to insert the entry into a bucket. + pub fn insert(self, value: TVal, status: NodeStatus) -> InsertResult { + self.0.bucket.insert(Node { + key: self.0.key.clone(), + value + }, status) + } +} + diff --git a/protocols/kad/src/kbucket/key.rs b/protocols/kad/src/kbucket/key.rs new file mode 100644 index 00000000..6c9ae934 --- /dev/null +++ b/protocols/kad/src/kbucket/key.rs @@ -0,0 +1,156 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bigint::U256; +use libp2p_core::PeerId; +use multihash::Multihash; +use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}}; + +/// A `Key` is a cryptographic hash, identifying both the nodes participating in +/// the Kademlia DHT, as well as records stored in the DHT. +/// +/// The set of all `Key`s defines the Kademlia keyspace. +/// +/// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of +/// the hash digests, interpreted as an integer. See [`Key::distance`]. +/// +/// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`]. +#[derive(Clone, Debug)] +pub struct Key { + preimage: T, + hash: GenericArray, +} + +impl PartialEq for Key { + fn eq(&self, other: &Key) -> bool { + self.hash == other.hash + } +} + +impl Eq for Key {} + +impl AsRef> for Key { + fn as_ref(&self) -> &Key { + self + } +} + +impl Key { + /// Construct a new `Key` by hashing the bytes of the given `preimage`. + /// + /// The preimage of type `T` is preserved. See [`Key::preimage`] and + /// [`Key::into_preimage`]. + pub fn new(preimage: T) -> Key + where + T: AsRef<[u8]> + { + let hash = Sha256::digest(preimage.as_ref()); + Key { preimage, hash } + } + + /// Borrows the preimage of the key. + pub fn preimage(&self) -> &T { + &self.preimage + } + + /// Converts the key into its preimage. + pub fn into_preimage(self) -> T { + self.preimage + } + + /// Computes the distance of the keys according to the XOR metric. + pub fn distance(&self, other: &Key) -> Distance { + let a = U256::from(self.hash.as_ref()); + let b = U256::from(other.hash.as_ref()); + Distance(a ^ b) + } +} + +impl From for Key { + fn from(h: Multihash) -> Self { + let k = Key::new(h.clone().into_bytes()); + Key { preimage: h, hash: k.hash } + } +} + +impl From for Key { + fn from(peer_id: PeerId) -> Self { + Key::new(peer_id) + } +} + +/// A distance between two `Key`s. +#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)] +pub struct Distance(pub(super) bigint::U256); + +#[cfg(test)] +mod tests { + use super::*; + use quickcheck::*; + + impl Arbitrary for Key { + fn arbitrary(_: &mut G) -> Key { + Key::from(PeerId::random()) + } + } + + #[test] + fn identity() { + fn prop(a: Key) -> bool { + a.distance(&a) == Distance::default() + } + quickcheck(prop as fn(_) -> _) + } + + #[test] + fn symmetry() { + fn prop(a: Key, b: Key) -> bool { + a.distance(&b) == b.distance(&a) + } + quickcheck(prop as fn(_,_) -> _) + } + + #[test] + fn triangle_inequality() { + fn prop(a: Key, b: Key, c: Key) -> TestResult { + let ab = a.distance(&b); + let bc = b.distance(&c); + let (ab_plus_bc, overflow) = ab.0.overflowing_add(bc.0); + if overflow { + TestResult::discard() + } else { + TestResult::from_bool(a.distance(&c) <= Distance(ab_plus_bc)) + } + } + quickcheck(prop as fn(_,_,_) -> _) + } + + #[test] + fn unidirectionality() { + fn prop(a: Key, b: Key) -> bool { + let d = a.distance(&b); + (0 .. 100).all(|_| { + let c = Key::from(PeerId::random()); + a.distance(&c) != d || b == c + }) + } + quickcheck(prop as fn(_,_) -> _) + } +} diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 3d8ef2a7..0c74069d 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Kademlia protocol. Allows peer discovery, records store and records fetch. +//! Implementation of the Kademlia protocol for libp2p. // TODO: we allow dead_code for now because this library contains a lot of unused code that will // be useful later for record store diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 94e2eed6..5983b8ac 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -25,6 +25,10 @@ //! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used //! to poll the underlying transport for incoming messages, and the `Sink` component //! is used to send messages to remote peers. +//! +//! [`KademliaProtocolConfig`]: protocol::KademliaProtocolConfig +//! [`KadRequestMsg`]: protocol::KadRequestMsg +//! [`KadResponseMsg`]: protocol::KadResponseMsg use bytes::BytesMut; use codec::UviBytes; diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 01c34f0e..99156f54 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -350,7 +350,7 @@ where } } - /// Consumes the query and returns the targe tand known closest peers. + /// Consumes the query and returns the target and known closest peers. /// /// > **Note**: This can be called at any time, but you normally only do that once the query /// > is finished.