From 36c8e9e3f1ee2d9f44bc4abd97391418e10df211 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 2 Jan 2019 13:33:44 +0100 Subject: [PATCH] Some improvements to k-buckets (#791) * Rework the KBucketsPeerId trait * Remove interior mutability from k-buckets * Reexport the kbuckets module * Fix tests * Apply suggestions from code review Co-Authored-By: tomaka --- protocols/kad/src/kbucket.rs | 130 +++++++++++++--------------------- protocols/kad/src/lib.rs | 2 +- protocols/kad/src/topology.rs | 2 +- 3 files changed, 50 insertions(+), 84 deletions(-) diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 59a1119a..191eac2b 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -30,65 +30,53 @@ use arrayvec::ArrayVec; use bigint::U512; use multihash::Multihash; -use parking_lot::{Mutex, MutexGuard}; use std::mem; -use std::slice::Iter as SliceIter; +use std::slice::IterMut as SliceIterMut; use std::time::{Duration, Instant}; use std::vec::IntoIter as VecIntoIter; /// Maximum number of nodes in a bucket. pub const MAX_NODES_PER_BUCKET: usize = 20; -/// Table of k-buckets with interior mutability. -#[derive(Debug)] +/// Table of k-buckets. +#[derive(Debug, Clone)] pub struct KBucketsTable { + /// Peer ID of the local node. my_id: Id, - tables: Vec>>, - // The timeout when pinging the first node after which we consider that it no longer responds. + /// The actual tables that store peers or values. + tables: Vec>, + // The timeout when pinging the first node after which we consider it unresponsive. ping_timeout: Duration, } -impl Clone for KBucketsTable -where - Id: Clone, - Val: Clone, -{ - #[inline] - fn clone(&self) -> Self { - KBucketsTable { - my_id: self.my_id.clone(), - tables: self - .tables - .iter() - .map(|t| t.lock().clone()) - .map(Mutex::new) - .collect(), - ping_timeout: self.ping_timeout.clone(), - } - } -} - +/// An individual table that stores peers or values. #[derive(Debug, Clone)] struct KBucket { - // Nodes are always ordered from oldest to newest. - // Note that we will very often move elements to the end of this. No benchmarking has been - // performed, but it is very likely that a `ArrayVec` is the most performant data structure. + /// Nodes are always ordered from oldest to newest. + /// Note that we will very often move elements to the end of this. No benchmarking has been + /// performed, but it is very likely that a `ArrayVec` is the most performant data structure. nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, - // Node received when the bucket was full. Will be added to the list if the first node doesn't - // respond in time to our ping. The second element is the time when the pending node was added. - // If it is too much in the past, then we drop the first node and add the pending node to the - // end of the list. + /// Node received when the bucket was full. Will be added to the list if the first node doesn't + /// respond in time to our ping. The second element is the time when the pending node was added. + /// If it is too old we drop the first node and add the pending node to the + /// end of the list. pending_node: Option<(Node, Instant)>, - // Last time this bucket was updated. + /// Last time this bucket was updated. last_update: Instant, } +#[derive(Debug, Clone)] +struct Node { + id: Id, + value: Val, +} + impl KBucket { - // Puts the kbucket into a coherent state. - // If a node is pending and the timeout has expired, removes the first element of `nodes` - // and pushes back the node in `pending_node`. + /// Puts the kbucket into a coherent state. + /// If a node is pending and the timeout has expired, removes the first element of `nodes` + /// and puts the node back in `pending_node`. fn flush(&mut self, timeout: Duration) { if let Some((pending_node, instant)) = self.pending_node.take() { if instant.elapsed() >= timeout { @@ -101,50 +89,32 @@ impl KBucket { } } -#[derive(Debug, Clone)] -struct Node { - id: Id, - value: Val, -} - /// Trait that must be implemented on types that can be used as an identifier in a k-bucket. pub trait KBucketsPeerId: Eq + Clone { - /// Distance between two peer IDs. - type Distance: Ord; - - /// Computes the XOR of this value and another one. - fn distance_with(&self, other: &Self) -> Self::Distance; + /// Computes the XOR of this value and another one. The lower the closer. + fn distance_with(&self, other: &Self) -> u32; /// Returns then number of bits that are necessary to store the distance between peer IDs. /// Used for pre-allocations. /// /// > **Note**: Returning 0 would lead to a panic. - fn num_bits() -> usize; - - /// Returns the number of leading zeroes of the distance between peer IDs. - fn leading_zeros(Self::Distance) -> u32; + fn max_distance() -> usize; } impl KBucketsPeerId for Multihash { - type Distance = U512; - #[inline] - fn num_bits() -> usize { - 512 - } - - #[inline] - fn distance_with(&self, other: &Self) -> Self::Distance { + fn distance_with(&self, other: &Self) -> u32 { // Note that we don't compare the hash functions because there's no chance of collision // of the same value hashed with two different hash functions. let my_hash = U512::from(self.digest()); let other_hash = U512::from(other.digest()); - my_hash ^ other_hash + let xor = my_hash ^ other_hash; + xor.leading_zeros() } #[inline] - fn leading_zeros(distance: Self::Distance) -> u32 { - distance.leading_zeros() + fn max_distance() -> usize { + 512 } } @@ -156,13 +126,12 @@ where pub fn new(my_id: Id, ping_timeout: Duration) -> Self { KBucketsTable { my_id: my_id, - tables: (0..Id::num_bits()) + tables: (0..Id::max_distance()) .map(|_| KBucket { nodes: ArrayVec::new(), pending_node: None, last_update: Instant::now(), }) - .map(Mutex::new) .collect(), ping_timeout: ping_timeout, } @@ -173,7 +142,7 @@ where // Returns `None` if out of range, which happens if `id` is the same as the local peer id. #[inline] fn bucket_num(&self, id: &Id) -> Option { - (Id::num_bits() - 1).checked_sub(Id::leading_zeros(self.my_id.distance_with(id)) as usize) + (Id::max_distance() - 1).checked_sub(self.my_id.distance_with(id) as usize) } /// Returns an iterator to all the buckets of this table. @@ -181,8 +150,8 @@ where /// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes /// first. #[inline] - pub fn buckets(&self) -> BucketsIter { - BucketsIter(self.tables.iter(), self.ping_timeout) + pub fn buckets(&mut self) -> BucketsIter { + BucketsIter(self.tables.iter_mut(), self.ping_timeout) } /// Returns the ID of the local node. @@ -192,14 +161,13 @@ where } /// Finds the `num` nodes closest to `id`, ordered by distance. - pub fn find_closest(&self, id: &Id) -> VecIntoIter + pub fn find_closest(&mut self, id: &Id) -> VecIntoIter where Id: Clone, { // TODO: optimize let mut out = Vec::new(); - for table in self.tables.iter() { - let mut table = table.lock(); + for table in self.tables.iter_mut() { table.flush(self.ping_timeout); if table.last_update.elapsed() > self.ping_timeout { continue; // ignore bucket with expired nodes @@ -213,7 +181,7 @@ where } /// Same as `find_closest`, but includes the local peer as well. - pub fn find_closest_with_self(&self, id: &Id) -> VecIntoIter + pub fn find_closest_with_self(&mut self, id: &Id) -> VecIntoIter where Id: Clone, { @@ -234,13 +202,12 @@ where /// Marks the node as "most recent" in its bucket and modifies the value associated to it. /// This function should be called whenever we receive a communication from a node. - pub fn update(&self, id: Id, value: Val) -> UpdateOutcome { + pub fn update(&mut self, id: Id, value: Val) -> UpdateOutcome { let table = match self.bucket_num(&id) { - Some(n) => &self.tables[n], + Some(n) => &mut self.tables[n], None => return UpdateOutcome::FailSelfUpdate, }; - let mut table = table.lock(); table.flush(self.ping_timeout); if let Some(pos) = table.nodes.iter().position(|n| n.id == id) { @@ -302,7 +269,7 @@ pub enum UpdateOutcome { } /// Iterator giving access to a bucket. -pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIter<'a, Mutex>>, Duration); +pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIterMut<'a, KBucket>, Duration); impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> { type Item = Bucket<'a, Id, Val>; @@ -310,7 +277,6 @@ impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> { #[inline] fn next(&mut self) -> Option { self.0.next().map(|bucket| { - let mut bucket = bucket.lock(); bucket.flush(self.1); Bucket(bucket) }) @@ -325,7 +291,7 @@ impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> { impl<'a, Id: 'a, Val: 'a> ExactSizeIterator for BucketsIter<'a, Id, Val> {} /// Access to a bucket. -pub struct Bucket<'a, Id: 'a, Val: 'a>(MutexGuard<'a, KBucket>); +pub struct Bucket<'a, Id: 'a, Val: 'a>(&'a mut KBucket); impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> { /// Returns the number of entries in that bucket. @@ -384,7 +350,7 @@ mod tests { )) }; - let table = KBucketsTable::new(my_id, Duration::from_secs(5)); + let mut table = KBucketsTable::new(my_id, Duration::from_secs(5)); let _ = table.update(other_id.clone(), ()); let res = table.find_closest(&other_id).collect::>(); @@ -401,7 +367,7 @@ mod tests { Multihash::from_bytes(bytes).unwrap() }; - let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5)); + let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5)); match table.update(my_id, ()) { UpdateOutcome::FailSelfUpdate => (), _ => panic!(), @@ -427,7 +393,7 @@ mod tests { }) .collect::>(); - let table = KBucketsTable::new(my_id, Duration::from_secs(5)); + let mut table = KBucketsTable::new(my_id, Duration::from_secs(5)); let before_update = table.buckets().map(|b| b.last_update()).collect::>(); thread::sleep(Duration::from_secs(2)); @@ -468,7 +434,7 @@ mod tests { let first_node = fill_ids[0].clone(); let second_node = fill_ids[1].clone(); - let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1)); + let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1)); for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { assert_eq!(table.update(id, ()), UpdateOutcome::Added); diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 0ab72964..163a9296 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -88,10 +88,10 @@ pub use self::protocol::KadConnectionType; pub use self::topology::KademliaTopology; pub mod handler; +pub mod kbucket; pub mod protocol; mod behaviour; -mod kbucket; mod protobuf_structs; mod query; mod topology; diff --git a/protocols/kad/src/topology.rs b/protocols/kad/src/topology.rs index 9298b9e3..7c4b299a 100644 --- a/protocols/kad/src/topology.rs +++ b/protocols/kad/src/topology.rs @@ -34,7 +34,7 @@ pub trait KademliaTopology: Topology { /// Adds an address discovered through Kademlia to the topology. /// - /// > **Note**: Keep in mind that `peer` can the local peer. + /// > **Note**: Keep in mind that `peer` can be the local peer. fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr, connection_ty: KadConnectionType);