Some improvements to k-buckets (#791)

* Rework the KBucketsPeerId trait

* Remove interior mutability from k-buckets

* Reexport the kbuckets module

* Fix tests

* Apply suggestions from code review

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>
This commit is contained in:
Pierre Krieger 2019-01-02 13:33:44 +01:00 committed by GitHub
parent 02e704195b
commit 36c8e9e3f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 84 deletions

View File

@ -30,65 +30,53 @@
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use bigint::U512; use bigint::U512;
use multihash::Multihash; use multihash::Multihash;
use parking_lot::{Mutex, MutexGuard};
use std::mem; use std::mem;
use std::slice::Iter as SliceIter; use std::slice::IterMut as SliceIterMut;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter; use std::vec::IntoIter as VecIntoIter;
/// Maximum number of nodes in a bucket. /// Maximum number of nodes in a bucket.
pub const MAX_NODES_PER_BUCKET: usize = 20; pub const MAX_NODES_PER_BUCKET: usize = 20;
/// Table of k-buckets with interior mutability. /// Table of k-buckets.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct KBucketsTable<Id, Val> { pub struct KBucketsTable<Id, Val> {
/// Peer ID of the local node.
my_id: Id, my_id: Id,
tables: Vec<Mutex<KBucket<Id, Val>>>, /// The actual tables that store peers or values.
// The timeout when pinging the first node after which we consider that it no longer responds. tables: Vec<KBucket<Id, Val>>,
// The timeout when pinging the first node after which we consider it unresponsive.
ping_timeout: Duration, ping_timeout: Duration,
} }
impl<Id, Val> Clone for KBucketsTable<Id, Val> /// An individual table that stores peers or values.
where
Id: Clone,
Val: Clone,
{
#[inline]
fn clone(&self) -> Self {
KBucketsTable {
my_id: self.my_id.clone(),
tables: self
.tables
.iter()
.map(|t| t.lock().clone())
.map(Mutex::new)
.collect(),
ping_timeout: self.ping_timeout.clone(),
}
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct KBucket<Id, Val> { struct KBucket<Id, Val> {
// Nodes are always ordered from oldest to newest. /// Nodes are always ordered from oldest to newest.
// Note that we will very often move elements to the end of this. No benchmarking has been /// Note that we will very often move elements to the end of this. No benchmarking has been
// performed, but it is very likely that a `ArrayVec` is the most performant data structure. /// performed, but it is very likely that a `ArrayVec` is the most performant data structure.
nodes: ArrayVec<[Node<Id, Val>; MAX_NODES_PER_BUCKET]>, nodes: ArrayVec<[Node<Id, Val>; MAX_NODES_PER_BUCKET]>,
// Node received when the bucket was full. Will be added to the list if the first node doesn't /// Node received when the bucket was full. Will be added to the list if the first node doesn't
// respond in time to our ping. The second element is the time when the pending node was added. /// respond in time to our ping. The second element is the time when the pending node was added.
// If it is too much in the past, then we drop the first node and add the pending node to the /// If it is too old we drop the first node and add the pending node to the
// end of the list. /// end of the list.
pending_node: Option<(Node<Id, Val>, Instant)>, pending_node: Option<(Node<Id, Val>, Instant)>,
// Last time this bucket was updated. /// Last time this bucket was updated.
last_update: Instant, last_update: Instant,
} }
#[derive(Debug, Clone)]
struct Node<Id, Val> {
id: Id,
value: Val,
}
impl<Id, Val> KBucket<Id, Val> { impl<Id, Val> KBucket<Id, Val> {
// Puts the kbucket into a coherent state. /// Puts the kbucket into a coherent state.
// If a node is pending and the timeout has expired, removes the first element of `nodes` /// If a node is pending and the timeout has expired, removes the first element of `nodes`
// and pushes back the node in `pending_node`. /// and puts the node back in `pending_node`.
fn flush(&mut self, timeout: Duration) { fn flush(&mut self, timeout: Duration) {
if let Some((pending_node, instant)) = self.pending_node.take() { if let Some((pending_node, instant)) = self.pending_node.take() {
if instant.elapsed() >= timeout { if instant.elapsed() >= timeout {
@ -101,50 +89,32 @@ impl<Id, Val> KBucket<Id, Val> {
} }
} }
#[derive(Debug, Clone)]
struct Node<Id, Val> {
id: Id,
value: Val,
}
/// Trait that must be implemented on types that can be used as an identifier in a k-bucket. /// Trait that must be implemented on types that can be used as an identifier in a k-bucket.
pub trait KBucketsPeerId: Eq + Clone { pub trait KBucketsPeerId: Eq + Clone {
/// Distance between two peer IDs. /// Computes the XOR of this value and another one. The lower the closer.
type Distance: Ord; fn distance_with(&self, other: &Self) -> u32;
/// Computes the XOR of this value and another one.
fn distance_with(&self, other: &Self) -> Self::Distance;
/// Returns then number of bits that are necessary to store the distance between peer IDs. /// Returns then number of bits that are necessary to store the distance between peer IDs.
/// Used for pre-allocations. /// Used for pre-allocations.
/// ///
/// > **Note**: Returning 0 would lead to a panic. /// > **Note**: Returning 0 would lead to a panic.
fn num_bits() -> usize; fn max_distance() -> usize;
/// Returns the number of leading zeroes of the distance between peer IDs.
fn leading_zeros(Self::Distance) -> u32;
} }
impl KBucketsPeerId for Multihash { impl KBucketsPeerId for Multihash {
type Distance = U512;
#[inline] #[inline]
fn num_bits() -> usize { fn distance_with(&self, other: &Self) -> u32 {
512
}
#[inline]
fn distance_with(&self, other: &Self) -> Self::Distance {
// Note that we don't compare the hash functions because there's no chance of collision // Note that we don't compare the hash functions because there's no chance of collision
// of the same value hashed with two different hash functions. // of the same value hashed with two different hash functions.
let my_hash = U512::from(self.digest()); let my_hash = U512::from(self.digest());
let other_hash = U512::from(other.digest()); let other_hash = U512::from(other.digest());
my_hash ^ other_hash let xor = my_hash ^ other_hash;
xor.leading_zeros()
} }
#[inline] #[inline]
fn leading_zeros(distance: Self::Distance) -> u32 { fn max_distance() -> usize {
distance.leading_zeros() 512
} }
} }
@ -156,13 +126,12 @@ where
pub fn new(my_id: Id, ping_timeout: Duration) -> Self { pub fn new(my_id: Id, ping_timeout: Duration) -> Self {
KBucketsTable { KBucketsTable {
my_id: my_id, my_id: my_id,
tables: (0..Id::num_bits()) tables: (0..Id::max_distance())
.map(|_| KBucket { .map(|_| KBucket {
nodes: ArrayVec::new(), nodes: ArrayVec::new(),
pending_node: None, pending_node: None,
last_update: Instant::now(), last_update: Instant::now(),
}) })
.map(Mutex::new)
.collect(), .collect(),
ping_timeout: ping_timeout, ping_timeout: ping_timeout,
} }
@ -173,7 +142,7 @@ where
// Returns `None` if out of range, which happens if `id` is the same as the local peer id. // Returns `None` if out of range, which happens if `id` is the same as the local peer id.
#[inline] #[inline]
fn bucket_num(&self, id: &Id) -> Option<usize> { fn bucket_num(&self, id: &Id) -> Option<usize> {
(Id::num_bits() - 1).checked_sub(Id::leading_zeros(self.my_id.distance_with(id)) as usize) (Id::max_distance() - 1).checked_sub(self.my_id.distance_with(id) as usize)
} }
/// Returns an iterator to all the buckets of this table. /// Returns an iterator to all the buckets of this table.
@ -181,8 +150,8 @@ where
/// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes /// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes
/// first. /// first.
#[inline] #[inline]
pub fn buckets(&self) -> BucketsIter<Id, Val> { pub fn buckets(&mut self) -> BucketsIter<Id, Val> {
BucketsIter(self.tables.iter(), self.ping_timeout) BucketsIter(self.tables.iter_mut(), self.ping_timeout)
} }
/// Returns the ID of the local node. /// Returns the ID of the local node.
@ -192,14 +161,13 @@ where
} }
/// Finds the `num` nodes closest to `id`, ordered by distance. /// Finds the `num` nodes closest to `id`, ordered by distance.
pub fn find_closest(&self, id: &Id) -> VecIntoIter<Id> pub fn find_closest(&mut self, id: &Id) -> VecIntoIter<Id>
where where
Id: Clone, Id: Clone,
{ {
// TODO: optimize // TODO: optimize
let mut out = Vec::new(); let mut out = Vec::new();
for table in self.tables.iter() { for table in self.tables.iter_mut() {
let mut table = table.lock();
table.flush(self.ping_timeout); table.flush(self.ping_timeout);
if table.last_update.elapsed() > self.ping_timeout { if table.last_update.elapsed() > self.ping_timeout {
continue; // ignore bucket with expired nodes continue; // ignore bucket with expired nodes
@ -213,7 +181,7 @@ where
} }
/// Same as `find_closest`, but includes the local peer as well. /// Same as `find_closest`, but includes the local peer as well.
pub fn find_closest_with_self(&self, id: &Id) -> VecIntoIter<Id> pub fn find_closest_with_self(&mut self, id: &Id) -> VecIntoIter<Id>
where where
Id: Clone, Id: Clone,
{ {
@ -234,13 +202,12 @@ where
/// Marks the node as "most recent" in its bucket and modifies the value associated to it. /// Marks the node as "most recent" in its bucket and modifies the value associated to it.
/// This function should be called whenever we receive a communication from a node. /// This function should be called whenever we receive a communication from a node.
pub fn update(&self, id: Id, value: Val) -> UpdateOutcome<Id, Val> { pub fn update(&mut self, id: Id, value: Val) -> UpdateOutcome<Id, Val> {
let table = match self.bucket_num(&id) { let table = match self.bucket_num(&id) {
Some(n) => &self.tables[n], Some(n) => &mut self.tables[n],
None => return UpdateOutcome::FailSelfUpdate, None => return UpdateOutcome::FailSelfUpdate,
}; };
let mut table = table.lock();
table.flush(self.ping_timeout); table.flush(self.ping_timeout);
if let Some(pos) = table.nodes.iter().position(|n| n.id == id) { if let Some(pos) = table.nodes.iter().position(|n| n.id == id) {
@ -302,7 +269,7 @@ pub enum UpdateOutcome<Id, Val> {
} }
/// Iterator giving access to a bucket. /// Iterator giving access to a bucket.
pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIter<'a, Mutex<KBucket<Id, Val>>>, Duration); pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIterMut<'a, KBucket<Id, Val>>, Duration);
impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> { impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> {
type Item = Bucket<'a, Id, Val>; type Item = Bucket<'a, Id, Val>;
@ -310,7 +277,6 @@ impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> {
#[inline] #[inline]
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|bucket| { self.0.next().map(|bucket| {
let mut bucket = bucket.lock();
bucket.flush(self.1); bucket.flush(self.1);
Bucket(bucket) Bucket(bucket)
}) })
@ -325,7 +291,7 @@ impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> {
impl<'a, Id: 'a, Val: 'a> ExactSizeIterator for BucketsIter<'a, Id, Val> {} impl<'a, Id: 'a, Val: 'a> ExactSizeIterator for BucketsIter<'a, Id, Val> {}
/// Access to a bucket. /// Access to a bucket.
pub struct Bucket<'a, Id: 'a, Val: 'a>(MutexGuard<'a, KBucket<Id, Val>>); pub struct Bucket<'a, Id: 'a, Val: 'a>(&'a mut KBucket<Id, Val>);
impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> { impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> {
/// Returns the number of entries in that bucket. /// Returns the number of entries in that bucket.
@ -384,7 +350,7 @@ mod tests {
)) ))
}; };
let table = KBucketsTable::new(my_id, Duration::from_secs(5)); let mut table = KBucketsTable::new(my_id, Duration::from_secs(5));
let _ = table.update(other_id.clone(), ()); let _ = table.update(other_id.clone(), ());
let res = table.find_closest(&other_id).collect::<Vec<_>>(); let res = table.find_closest(&other_id).collect::<Vec<_>>();
@ -401,7 +367,7 @@ mod tests {
Multihash::from_bytes(bytes).unwrap() Multihash::from_bytes(bytes).unwrap()
}; };
let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5)); let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5));
match table.update(my_id, ()) { match table.update(my_id, ()) {
UpdateOutcome::FailSelfUpdate => (), UpdateOutcome::FailSelfUpdate => (),
_ => panic!(), _ => panic!(),
@ -427,7 +393,7 @@ mod tests {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let table = KBucketsTable::new(my_id, Duration::from_secs(5)); let mut table = KBucketsTable::new(my_id, Duration::from_secs(5));
let before_update = table.buckets().map(|b| b.last_update()).collect::<Vec<_>>(); let before_update = table.buckets().map(|b| b.last_update()).collect::<Vec<_>>();
thread::sleep(Duration::from_secs(2)); thread::sleep(Duration::from_secs(2));
@ -468,7 +434,7 @@ mod tests {
let first_node = fill_ids[0].clone(); let first_node = fill_ids[0].clone();
let second_node = fill_ids[1].clone(); let second_node = fill_ids[1].clone();
let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1)); let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1));
for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() {
assert_eq!(table.update(id, ()), UpdateOutcome::Added); assert_eq!(table.update(id, ()), UpdateOutcome::Added);

View File

@ -88,10 +88,10 @@ pub use self::protocol::KadConnectionType;
pub use self::topology::KademliaTopology; pub use self::topology::KademliaTopology;
pub mod handler; pub mod handler;
pub mod kbucket;
pub mod protocol; pub mod protocol;
mod behaviour; mod behaviour;
mod kbucket;
mod protobuf_structs; mod protobuf_structs;
mod query; mod query;
mod topology; mod topology;

View File

@ -34,7 +34,7 @@ pub trait KademliaTopology: Topology {
/// Adds an address discovered through Kademlia to the topology. /// Adds an address discovered through Kademlia to the topology.
/// ///
/// > **Note**: Keep in mind that `peer` can the local peer. /// > **Note**: Keep in mind that `peer` can be the local peer.
fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr, fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr,
connection_ty: KadConnectionType); connection_ty: KadConnectionType);