weighted bucket: implement insert()

This commit is contained in:
folex 2020-03-24 12:08:39 +03:00
parent 760e6baac3
commit d5c0112fbb
7 changed files with 683 additions and 243 deletions

View File

@ -69,12 +69,16 @@
mod bucket;
mod entry;
mod key;
mod sub_bucket;
mod weighted;
mod swamp;
pub use entry::*;
pub use sub_bucket::*;
use arrayvec::{self, ArrayVec};
use bucket::KBucket;
use std::collections::VecDeque;
use std::collections::{VecDeque, HashMap};
use std::time::{Duration, Instant};
use libp2p_core::identity::ed25519::{Keypair, PublicKey};
@ -462,7 +466,7 @@ where
/// Returns true if the bucket has a pending node.
pub fn has_pending(&self) -> bool {
self.bucket.pending().map_or(false, |n| !n.is_ready())
self.bucket.has_pending()
}
/// Tests whether the given distance falls into this bucket.

View File

@ -25,125 +25,74 @@
//! > buckets in a `KBucketsTable` and hence is enforced by the public API
//! > of the `KBucketsTable` and in particular the public `Entry` API.
pub use crate::K_VALUE;
use super::*;
pub use crate::kbucket::sub_bucket::{Node, NodeStatus, PendingNode, Position, SubBucket};
use crate::kbucket::swamp::Swamp;
use crate::kbucket::weighted::Weighted;
pub use crate::{K_VALUE, W_VALUE};
use futures::StreamExt;
/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
#[derive(Debug, Clone)]
pub struct PendingNode<TKey, TVal> {
/// The pending node to insert.
node: Node<TKey, TVal>,
/// The status of the pending node.
status: NodeStatus,
/// The instant at which the pending node is eligible for insertion into a bucket.
replace: Instant,
}
/// The status of a node in a bucket.
///
/// The status of a node in a bucket together with the time of the
/// last status change determines the position of the node in a
/// bucket.
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub enum NodeStatus {
/// The node is considered connected.
Connected,
/// The node is considered disconnected.
Disconnected
}
impl<TKey, TVal> PendingNode<TKey, TVal> {
pub fn key(&self) -> &TKey {
&self.node.key
}
pub fn status(&self) -> NodeStatus {
self.status
}
pub fn value_mut(&mut self) -> &mut TVal {
&mut self.node.value
}
pub fn is_ready(&self) -> bool {
Instant::now() >= self.replace
}
pub fn set_ready_at(&mut self, t: Instant) {
self.replace = t;
}
}
/// A `Node` in a bucket, representing a peer participating
/// in the Kademlia DHT together with an associated value (e.g. contact
/// information).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node<TKey, TVal> {
/// The key of the node, identifying the peer.
pub key: TKey,
/// The associated value.
pub value: TVal,
}
/// The position of a node in a `KBucket`, i.e. a non-negative integer
/// in the range `[0, K_VALUE)`.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Position(usize);
/// A `KBucket` is a list of up to `K_VALUE` keys and associated values,
/// ordered from least-recently connected to most-recently connected.
#[derive(Debug, Clone)]
pub struct KBucket<TKey, TVal> {
/// The nodes contained in the bucket.
nodes: ArrayVec<[Node<TKey, TVal>; K_VALUE.get()]>,
/// The position (index) in `nodes` that marks the first connected node.
///
/// Since the entries in `nodes` are ordered from least-recently connected to
/// most-recently connected, all entries above this index are also considered
/// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries
/// that are considered disconnected and the range
/// `[first_connected_pos, K_VALUE)` marks sub-list of entries that are
/// considered connected.
///
/// `None` indicates that there are no connected entries in the bucket, i.e.
/// the bucket is either empty, or contains only entries for peers that are
/// considered disconnected.
first_connected_pos: Option<usize>,
/// A node that is pending to be inserted into a full bucket, should the
/// least-recently connected (and currently disconnected) node not be
/// marked as connected within `unresponsive_timeout`.
pending: Option<PendingNode<TKey, TVal>>,
/// The timeout window before a new pending node is eligible for insertion,
/// if the least-recently connected node is not updated as being connected
/// in the meantime.
pending_timeout: Duration
swamp: Swamp<TKey, TVal>,
weighted: Weighted<TKey, TVal>,
pending_timeout: Duration,
}
/*
// /// A `KBucket` is a list of up to `K_VALUE` keys and associated values,
// /// ordered from least-recently connected to most-recently connected.
// #[derive(Debug, Clone)]
// pub struct KBucket<TKey, TVal> {
// /// The nodes contained in the bucket.
// nodes: ArrayVec<[Node<TKey, TVal>; K_VALUE.get()]>,
//
// /// The position (index) in `nodes` that marks the first connected node.
// ///
// /// Since the entries in `nodes` are ordered from least-recently connected to
// /// most-recently connected, all entries above this index are also considered
// /// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries
// /// that are considered disconnected and the range
// /// `[first_connected_pos, K_VALUE)` marks sub-list of entries that are
// /// considered connected.
// ///
// /// `None` indicates that there are no connected entries in the bucket, i.e.
// /// the bucket is either empty, or contains only entries for peers that are
// /// considered disconnected.
// first_connected_pos: Option<usize>,
//
// /// A node that is pending to be inserted into a full bucket, should the
// /// least-recently connected (and currently disconnected) node not be
// /// marked as connected within `unresponsive_timeout`.
// pending: Option<PendingNode<TKey, TVal>>,
//
// /// The timeout window before a new pending node is eligible for insertion,
// /// if the least-recently connected node is not updated as being connected
// /// in the meantime.
// pending_timeout: Duration
// }
*/
/// The result of inserting an entry into a bucket.
#[must_use]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InsertResult<TKey> {
/// The entry has been successfully inserted.
Inserted,
/// The entry is pending insertion because the relevant bucket is currently full.
/// The entry is inserted after a timeout elapsed, if the status of the
/// least-recently connected (and currently disconnected) node in the bucket
/// is not updated before the timeout expires.
Pending {
/// The key of the least-recently connected entry that is currently considered
/// disconnected and whose corresponding peer should be checked for connectivity
/// in order to prevent it from being evicted. If connectivity to the peer is
/// re-established, the corresponding entry should be updated with
/// [`NodeStatus::Connected`].
disconnected: TKey
},
/// The entry was not inserted because the relevant bucket is full.
Full
/// The entry has been successfully inserted.
Inserted,
/// The entry is pending insertion because the relevant bucket is currently full.
/// The entry is inserted after a timeout elapsed, if the status of the
/// least-recently connected (and currently disconnected) node in the bucket
/// is not updated before the timeout expires.
Pending {
/// The key of the least-recently connected entry that is currently considered
/// disconnected and whose corresponding peer should be checked for connectivity
/// in order to prevent it from being evicted. If connectivity to the peer is
/// re-established, the corresponding entry should be updated with
/// [`NodeStatus::Connected`].
disconnected: TKey,
},
/// The entry was not inserted because the relevant bucket is full.
Full,
}
/// The result of applying a pending node to a bucket, possibly
@ -154,56 +103,55 @@ pub struct AppliedPending<TKey, TVal> {
pub inserted: Node<TKey, TVal>,
/// The node that has been evicted from the bucket to make room for the
/// pending node, if any.
pub evicted: Option<Node<TKey, TVal>>
pub evicted: Option<Node<TKey, TVal>>,
}
enum ChangePosition {
AddDisconnected,
// num_entries number of nodes in a bucket BEFORE appending
AppendConnected{ num_entries: usize },
AppendConnected { num_entries: usize },
RemoveConnected,
RemoveDisconnected
RemoveDisconnected,
}
impl<TKey, TVal> KBucket<TKey, TVal>
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone
TVal: Clone,
{
/// Creates a new `KBucket` with the given timeout for pending entries.
pub fn new(pending_timeout: Duration) -> Self {
KBucket {
nodes: ArrayVec::new(),
first_connected_pos: None,
pending: None,
swamp: Swamp::new(pending_timeout),
weighted: Weighted::new(pending_timeout),
pending_timeout,
}
}
pub fn has_pending(&self) -> bool {
self.exists_active_pending(true) || self.exists_active_pending(false)
}
/// Returns a reference to the pending node of the bucket, if there is any.
pub fn pending(&self) -> Option<&PendingNode<TKey, TVal>> {
self.pending.as_ref()
fn pending(&self) -> Option<&PendingNode<TKey, TVal>> {
// self.swamp.as_ref()
unimplemented!("pending")
}
/// Returns a mutable reference to the pending node of the bucket, if there is any.
pub fn pending_mut(&mut self) -> Option<&mut PendingNode<TKey, TVal>> {
self.pending.as_mut()
// self.pending.as_mut()
unimplemented!("pending_mut")
}
/// Returns a reference to the pending node of the bucket, if there is any
/// with a matching key.
pub fn as_pending(&self, key: &TKey) -> Option<&PendingNode<TKey, TVal>> {
self.pending().filter(|p| p.node.key.as_ref() == key.as_ref())
}
/// Returns a reference to a node in the bucket.
pub fn get(&self, key: &TKey) -> Option<&Node<TKey, TVal>> {
self.position(key).map(|p| &self.nodes[p.0])
}
/// Returns an iterator over the nodes in the bucket, together with their status.
pub fn iter(&self) -> impl Iterator<Item = (&Node<TKey, TVal>, NodeStatus)> {
self.nodes.iter().enumerate().map(move |(p, n)| (n, self.status(Position(p))))
self.swamp_pending
.iter()
.chain(self.weighted_pending.iter())
.find(|p| p.node.key.as_ref() == key.as_ref())
// self.pending().filter(|p| p.node.key.as_ref() == key.as_ref())
}
/// Inserts the pending node into the bucket, if its timeout has elapsed,
@ -214,32 +162,45 @@ where
/// bucket remained unchanged.
pub fn apply_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
if !self.pending_ready() {
return None
return None;
}
self.pending.take().map(|PendingNode { node, status, .. }| {
let evicted = if self.is_full() {
Some(self.pop_node())
} else {
None
};
self.swamp_pending
.take()
.map(|PendingNode { node, status, .. }| {
let evicted = if self.is_full(node.weight > 0) {
Some(self.pop_node(node.weight))
} else {
None
};
if let InsertResult::Inserted = self.insert(node.clone(), status) {
AppliedPending { inserted: node, evicted }
} else {
unreachable!("Bucket is not full, we just evicted a node.")
}
})
if let InsertResult::Inserted = self.insert(node.clone(), status) {
AppliedPending {
inserted: node,
evicted,
}
} else {
unreachable!("Bucket is not full, we just evicted a node.")
}
})
}
/// Updates the status of the pending node, if any.
pub fn update_pending(&mut self, status: NodeStatus) {
if let Some(pending) = &mut self.pending {
if let Some(pending) = &mut self.swamp_pending {
pending.status = status
}
}
/// Returns an iterator over the nodes in the bucket, together with their status.
pub fn iter(&self) -> impl Iterator<Item = (&Node<TKey, TVal>, NodeStatus)> {
self.weighted
.values()
.map(|bucket| bucket.iter())
.flatten()
.chain(self.swamp.iter())
}
/// Updates the status of the node referred to by the given key, if it is
/// in the bucket.
pub fn update(&mut self, key: &TKey, new_status: NodeStatus) {
@ -258,8 +219,8 @@ where
}
// Reinsert the node with the desired status.
match self.insert(node, new_status) {
InsertResult::Inserted => {},
_ => unreachable!("The node is removed before being (re)inserted.")
InsertResult::Inserted => {}
_ => unreachable!("The node is removed before being (re)inserted."),
}
}
}
@ -281,66 +242,61 @@ where
/// i.e. as the most-recently disconnected node. If there are no connected nodes,
/// the new node is added as the last element of the bucket.
///
pub fn insert(&mut self, node: Node<TKey, TVal>, status: NodeStatus) -> InsertResult<TKey> {
match status {
NodeStatus::Connected => {
if self.nodes.is_full() {
if self.all_nodes_connected() || self.exists_active_pending() {
return InsertResult::Full
} else {
self.set_pending(PendingNode {
node,
status: NodeStatus::Connected,
replace: Instant::now() + self.pending_timeout,
});
return InsertResult::Pending {
// Schedule a dial-up to check if the node is reachable
// NOTE: nodes[0] is disconnected (see all_nodes_connected check above)
// and the least recently connected
disconnected: self.nodes[0].key.clone()
}
}
}
self.append_connected_node(node);
InsertResult::Inserted
}
NodeStatus::Disconnected => {
if self.nodes.is_full() {
return InsertResult::Full
}
self.insert_disconnected_node(node);
InsertResult::Inserted
}
pub fn insert(&mut self, node: Node<TKey, TVal>, status: NodeStatus) -> InsertResult<TKey> {}
fn pending_ready(&self, weighted: bool) -> bool {
if weighted {
self.weighted.pending_ready()
} else {
self.swamp.pending_ready()
}
}
fn pending_ready(&self) -> bool {
self.pending.as_ref().map_or(false, |pending| pending.replace <= Instant::now())
fn is_full(&self, weighted: bool) -> bool {
if weighted {
self.weighted.is_full()
} else {
self.swamp.is_full()
}
}
fn is_full(&self) -> bool {
self.nodes.is_full()
}
fn exists_active_pending(&self) -> bool {
self.pending.is_some() // TODO: check is replace has passed?
fn exists_active_pending(&self, weighted: bool) -> bool {
if weighted {
self.weighted.pending_active()
} else {
self.swamp.exists_active_pending() // TODO: check if replace has passed?
}
}
fn set_pending(&mut self, node: PendingNode<TKey, TVal>) {
self.pending = Some(node);
if node.node.weight == 0 {
self.swamp.set_pending(node)
} else {
self.weighted.set_pending(node)
}
}
fn remove_pending(&mut self) {
self.pending = None
if node.node.weight == 0 {
self.swamp.remove_pending()
} else {
self.weighted.remove_pending()
}
}
fn all_nodes_connected(&self) -> bool {
self.first_connected_pos == Some(0)
fn all_nodes_connected(&self, weight: u32) -> bool {
if weight == 0 {
self.swamp.all_nodes_connected()
} else {
self.weighted.all_nodes_connected(weight)
}
}
fn append_connected_node(&mut self, node: Node<TKey, TVal>) {
// `num_entries` MUST be calculated BEFORE insertion
self.change_connected_pos(ChangePosition::AppendConnected { num_entries: self.num_entries() });
self.change_connected_pos(ChangePosition::AppendConnected {
num_entries: self.num_entries(),
});
self.nodes.push(node);
}
@ -349,19 +305,21 @@ where
self.change_connected_pos(ChangePosition::AddDisconnected);
match current_position {
Some(p) => self.nodes.insert(p, node), // Insert disconnected node just before the first connected node
None => self.nodes.push(node) // Or simply append disconnected node
None => self.nodes.push(node), // Or simply append disconnected node
}
}
fn evict_node(&mut self, position: Position) -> Node<TKey, TVal> {
match self.status(position) {
NodeStatus::Connected => self.change_connected_pos(ChangePosition::RemoveConnected),
NodeStatus::Disconnected => self.change_connected_pos(ChangePosition::RemoveDisconnected),
NodeStatus::Disconnected => {
self.change_connected_pos(ChangePosition::RemoveDisconnected)
}
}
self.nodes.remove(position.0)
}
fn pop_node(&mut self) -> Node<TKey, TVal> {
fn pop_node(&mut self, weight: u32) -> Node<TKey, TVal> {
self.evict_node(Position(0))
}
@ -370,14 +328,15 @@ where
ChangePosition::AddDisconnected => {
// New disconnected node added => position of the first connected node moved by 1
self.first_connected_pos = self.first_connected_pos.map(|p| p + 1)
},
}
ChangePosition::AppendConnected { num_entries } => {
// If there were no previously connected nodes set mark to the given one (usually the last one)
// Otherwise keep it the same
self.first_connected_pos = self.first_connected_pos.or(Some(num_entries));
},
}
ChangePosition::RemoveConnected => {
if self.num_connected() == 1 { // If it was the last connected node
if self.num_connected() == 1 {
// If it was the last connected node
self.first_connected_pos = None // Then mark there is no connected nodes left
}
// Otherwise keep mark the same
@ -385,7 +344,9 @@ where
ChangePosition::RemoveDisconnected => {
// If there are connected nodes lower mark
// Otherwise keep it None
self.first_connected_pos = self.first_connected_pos.map(|p| p.checked_sub(1).unwrap_or(0))
self.first_connected_pos = self
.first_connected_pos
.map(|p| p.checked_sub(1).unwrap_or(0))
}
}
}
@ -415,7 +376,8 @@ where
/// Gets the number of entries in the bucket that are considered connected.
pub fn num_connected(&self) -> usize {
self.first_connected_pos.map_or(0, |i| self.num_entries() - i)
self.first_connected_pos
.map_or(0, |i| self.num_entries() - i)
}
/// Gets the number of entries in the bucket that are considered disconnected.
@ -425,7 +387,10 @@ where
/// Gets the position of an node in the bucket.
pub fn position(&self, key: &TKey) -> Option<Position> {
self.nodes.iter().position(|p| p.key.as_ref() == key.as_ref()).map(Position)
self.nodes
.iter()
.position(|p| p.key.as_ref() == key.as_ref())
.map(Position)
}
/// Gets a mutable reference to the node identified by the given key.
@ -433,30 +398,35 @@ where
/// Returns `None` if the given key does not refer to a node in the
/// bucket.
pub fn get_mut(&mut self, key: &TKey) -> Option<&mut Node<TKey, TVal>> {
self.nodes.iter_mut().find(move |p| p.key.as_ref() == key.as_ref())
self.nodes
.iter_mut()
.find(move |p| p.key.as_ref() == key.as_ref())
}
}
#[cfg(test)]
mod tests {
use super::*;
use libp2p_core::PeerId;
use quickcheck::*;
use rand::Rng;
use std::collections::VecDeque;
use super::*;
use quickcheck::*;
impl Arbitrary for KBucket<Key<PeerId>, ()> {
fn arbitrary<G: Gen>(g: &mut G) -> KBucket<Key<PeerId>, ()> {
let timeout = Duration::from_secs(g.gen_range(1, g.size() as u64));
let mut bucket = KBucket::<Key<PeerId>, ()>::new(timeout);
let num_nodes = g.gen_range(1, K_VALUE.get() + 1);
for _ in 0 .. num_nodes {
for _ in 0..num_nodes {
let key = Key::new(PeerId::random());
let node = Node { key: key.clone(), value: () };
let node = Node {
key: key.clone(),
value: (),
};
let status = NodeStatus::arbitrary(g);
match bucket.insert(node, status) {
InsertResult::Inserted => {}
_ => panic!()
_ => panic!(),
}
}
bucket
@ -482,7 +452,7 @@ mod tests {
// Fill a bucket with random nodes with the given status.
fn fill_bucket(bucket: &mut KBucket<Key<PeerId>, ()>, status: NodeStatus) {
let num_entries_start = bucket.num_entries();
for i in 0 .. K_VALUE.get() - num_entries_start {
for i in 0..K_VALUE.get() - num_entries_start {
let key = Key::new(PeerId::random());
let node = Node { key, value: () };
assert_eq!(InsertResult::Inserted, bucket.insert(node, status));
@ -502,13 +472,16 @@ mod tests {
// Fill the bucket, thereby populating the expected lists in insertion order.
for status in status {
let key = Key::new(PeerId::random());
let node = Node { key: key.clone(), value: () };
let node = Node {
key: key.clone(),
value: (),
};
let full = bucket.num_entries() == K_VALUE.get();
match bucket.insert(node, status) {
InsertResult::Inserted => {
let vec = match status {
NodeStatus::Connected => &mut connected,
NodeStatus::Disconnected => &mut disconnected
NodeStatus::Disconnected => &mut disconnected,
};
if full {
vec.pop_front();
@ -520,21 +493,20 @@ mod tests {
}
// Get all nodes from the bucket, together with their status.
let mut nodes = bucket.iter()
let mut nodes = bucket
.iter()
.map(|(n, s)| (s, n.key.clone()))
.collect::<Vec<_>>();
// Split the list of nodes at the first connected node.
let first_connected_pos = nodes.iter().position(|(s,_)| *s == NodeStatus::Connected);
let first_connected_pos = nodes.iter().position(|(s, _)| *s == NodeStatus::Connected);
assert_eq!(bucket.first_connected_pos, first_connected_pos);
let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p));
// All nodes before the first connected node must be disconnected and
// in insertion order. Similarly, all remaining nodes must be connected
// and in insertion order.
nodes == Vec::from(disconnected)
&&
tail == Vec::from(connected)
nodes == Vec::from(disconnected) && tail == Vec::from(connected)
}
quickcheck(prop as fn(_) -> _);
@ -551,12 +523,12 @@ mod tests {
let key = Key::new(PeerId::random());
let node = Node { key, value: () };
match bucket.insert(node, NodeStatus::Disconnected) {
InsertResult::Full => {},
x => panic!("{:?}", x)
InsertResult::Full => {}
x => panic!("{:?}", x),
}
// One-by-one fill the bucket with connected nodes, replacing the disconnected ones.
for i in 0 .. K_VALUE.get() {
for i in 0..K_VALUE.get() {
let (first, first_status) = bucket.iter().next().unwrap();
let first_disconnected = first.clone();
assert_eq!(first_status, NodeStatus::Disconnected);
@ -564,17 +536,21 @@ mod tests {
// Add a connected node, which is expected to be pending, scheduled to
// replace the first (i.e. least-recently connected) node.
let key = Key::new(PeerId::random());
let node = Node { key: key.clone(), value: () };
let node = Node {
key: key.clone(),
value: (),
};
match bucket.insert(node.clone(), NodeStatus::Connected) {
InsertResult::Pending { disconnected } =>
assert_eq!(disconnected, first_disconnected.key),
x => panic!("{:?}", x)
InsertResult::Pending { disconnected } => {
assert_eq!(disconnected, first_disconnected.key)
}
x => panic!("{:?}", x),
}
// Trying to insert another connected node fails.
match bucket.insert(node.clone(), NodeStatus::Connected) {
InsertResult::Full => {},
x => panic!("{:?}", x)
InsertResult::Full => {}
x => panic!("{:?}", x),
}
assert!(bucket.pending().is_some());
@ -583,10 +559,13 @@ mod tests {
let pending = bucket.pending_mut().expect("No pending node.");
pending.set_ready_at(Instant::now() - Duration::from_secs(1));
let result = bucket.apply_pending();
assert_eq!(result, Some(AppliedPending {
inserted: node.clone(),
evicted: Some(first_disconnected)
}));
assert_eq!(
result,
Some(AppliedPending {
inserted: node.clone(),
evicted: Some(first_disconnected)
})
);
assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last());
assert!(bucket.pending().is_none());
assert_eq!(Some(K_VALUE.get() - (i + 1)), bucket.first_connected_pos);
@ -599,8 +578,8 @@ mod tests {
let key = Key::new(PeerId::random());
let node = Node { key, value: () };
match bucket.insert(node, NodeStatus::Connected) {
InsertResult::Full => {},
x => panic!("{:?}", x)
InsertResult::Full => {}
x => panic!("{:?}", x),
}
}
@ -613,7 +592,10 @@ mod tests {
// Add a connected pending node.
let key = Key::new(PeerId::random());
let node = Node { key: key.clone(), value: () };
let node = Node {
key: key.clone(),
value: (),
};
if let InsertResult::Pending { disconnected } = bucket.insert(node, NodeStatus::Connected) {
assert_eq!(&disconnected, &first_disconnected.key);
} else {
@ -626,16 +608,21 @@ mod tests {
// The pending node has been discarded.
assert!(bucket.pending().is_none());
assert!(bucket.iter().all(|(n,_)| &n.key != &key));
assert!(bucket.iter().all(|(n, _)| &n.key != &key));
// The initially disconnected node is now the most-recently connected.
assert_eq!(Some((&first_disconnected, NodeStatus::Connected)), bucket.iter().last());
assert_eq!(bucket.position(&first_disconnected.key).map(|p| p.0), bucket.first_connected_pos);
assert_eq!(
Some((&first_disconnected, NodeStatus::Connected)),
bucket.iter().last()
);
assert_eq!(
bucket.position(&first_disconnected.key).map(|p| p.0),
bucket.first_connected_pos
);
assert_eq!(1, bucket.num_connected());
assert_eq!(K_VALUE.get() - 1, bucket.num_disconnected());
}
#[test]
fn bucket_update() {
fn prop(mut bucket: KBucket<Key<PeerId>, ()>, pos: Position, status: NodeStatus) -> bool {
@ -646,7 +633,10 @@ mod tests {
let key = bucket.nodes[pos].key.clone();
// Record the (ordered) list of status of all nodes in the bucket.
let mut expected = bucket.iter().map(|(n,s)| (n.key.clone(), s)).collect::<Vec<_>>();
let mut expected = bucket
.iter()
.map(|(n, s)| (n.key.clone(), s))
.collect::<Vec<_>>();
// Update the node in the bucket.
bucket.update(&key, status);
@ -655,14 +645,17 @@ mod tests {
// preserving the status and relative order of all other nodes.
let expected_pos = match status {
NodeStatus::Connected => num_nodes - 1,
NodeStatus::Disconnected => bucket.first_connected_pos.unwrap_or(num_nodes) - 1
NodeStatus::Disconnected => bucket.first_connected_pos.unwrap_or(num_nodes) - 1,
};
expected.remove(pos);
expected.insert(expected_pos, (key.clone(), status));
let actual = bucket.iter().map(|(n,s)| (n.key.clone(), s)).collect::<Vec<_>>();
let actual = bucket
.iter()
.map(|(n, s)| (n.key.clone(), s))
.collect::<Vec<_>>();
expected == actual
}
quickcheck(prop as fn(_,_,_) -> _);
quickcheck(prop as fn(_, _, _) -> _);
}
}

View File

@ -115,7 +115,7 @@ where
///
/// Returns `None` if the entry is neither present in a bucket nor
/// pending insertion into a bucket.
pub fn view(&'a mut self) -> Option<EntryRefView<'a, TKey, TVal>> {
pub fn view(&'a self) -> Option<EntryRefView<'a, TKey, TVal>> {
match self {
Entry::Present(entry, status) => Some(EntryRefView {
node: NodeRefView {
@ -189,7 +189,7 @@ where
.value
}
/// Sets the status of the entry to `NodeStatus::Disconnected`.
/// Sets the status of the entry.
pub fn update(self, status: NodeStatus) -> Self {
self.0.bucket.update(self.0.key, status);
Self::new(self.0.bucket, self.0.key)
@ -218,7 +218,7 @@ where
pub fn value(&mut self) -> &mut TVal {
self.0.bucket
.pending_mut()
.expect("We can only build a ConnectedPendingEntry if the entry is pending; QED")
.expect("We can only build a PendingEntry if the entry is pending; QED")
.value_mut()
}
@ -251,7 +251,8 @@ where
pub fn insert(self, value: TVal, status: NodeStatus) -> InsertResult<TKey> {
self.0.bucket.insert(Node {
key: self.0.key.clone(),
value
value,
weight: unimplemented!("TODO: pass weight to AbsentEntry")
}, status)
}
}

View File

@ -0,0 +1,187 @@
/*
* Copyright 2019 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// #[derive(Debug, Clone)]
// struct WeightedSubBucket<TKey, TVal> {
// nodes: Vec<Node<TKey, TVal>>,
// first_connected_pos: Option<usize>,
// pending: Option<PendingNode<TKey, TVal>>,
// }
//
// impl<TKey, TVal> WeightedSubBucket<TKey, TVal> {
// pub fn new() -> Self {
// Self {
// nodes: Vec::new(),
// first_connected_pos: None,
// pending: None,
// }
// }
// }
use crate::kbucket::InsertResult;
use crate::K_VALUE;
use arrayvec::ArrayVec;
use std::time::Instant;
/// The status of a node in a bucket.
///
/// The status of a node in a bucket together with the time of the
/// last status change determines the position of the node in a
/// bucket.
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub enum NodeStatus {
/// The node is considered connected.
Connected,
/// The node is considered disconnected.
Disconnected,
}
/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
#[derive(Debug, Clone)]
pub struct PendingNode<TKey, TVal> {
/// The pending node to insert.
pub node: Node<TKey, TVal>,
/// The status of the pending node.
pub status: NodeStatus,
/// The instant at which the pending node is eligible for insertion into a bucket.
pub replace: Instant,
}
impl<TKey, TVal> PendingNode<TKey, TVal> {
pub fn key(&self) -> &TKey {
&self.node.key
}
pub fn status(&self) -> NodeStatus {
self.status
}
pub fn value_mut(&mut self) -> &mut TVal {
&mut self.node.value
}
pub fn is_ready(&self) -> bool {
Instant::now() >= self.replace
}
pub fn set_ready_at(&mut self, t: Instant) {
self.replace = t;
}
}
/// A `Node` in a bucket, representing a peer participating
/// in the Kademlia DHT together with an associated value (e.g. contact
/// information).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node<TKey, TVal> {
/// The key of the node, identifying the peer.
pub key: TKey,
/// The associated value.
pub value: TVal,
pub weight: u32,
}
/// The position of a node in a `KBucket`, i.e. a non-negative integer
/// in the range `[0, K_VALUE)`.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Position(pub usize);
#[derive(Debug, Clone)]
pub struct SubBucket<Node> {
pub nodes: ArrayVec<[Node; K_VALUE.get()]>,
pub first_connected_pos: Option<usize>,
// pub pending: Option<PendingNode<TKey, TVal>>,
}
impl<Node> SubBucket<Node> {
pub fn new() -> Self {
Self {
nodes: ArrayVec::new(),
first_connected_pos: None,
}
}
pub fn status(&self, pos: Position) -> NodeStatus {
if self.first_connected_pos.map_or(false, |i| pos.0 >= i) {
NodeStatus::Connected
} else {
NodeStatus::Disconnected
}
}
/// Returns an iterator over the nodes in the bucket, together with their status.
pub fn iter(&self) -> impl Iterator<Item = (&Node, NodeStatus)> {
self.nodes
.iter()
.enumerate()
.map(move |(p, n)| (n, self.status(Position(p))))
}
pub fn is_full(&self) -> bool {
self.nodes.is_full()
}
pub fn all_nodes_connected(&self) -> bool {
self.first_connected_pos == Some(0)
}
pub fn append_connected_node(&mut self, node: Node) {
// `num_entries` MUST be calculated BEFORE insertion
self.change_connected_pos(ChangePosition::AppendConnected {
num_entries: self.num_entries(),
});
self.nodes.push(node);
}
pub fn insert_disconnected_node(&mut self, node: Node) {
let current_position = self.first_connected_pos;
self.change_connected_pos(ChangePosition::AddDisconnected);
match current_position {
Some(p) => self.nodes.insert(p, node), // Insert disconnected node just before the first connected node
None => self.nodes.push(node), // Or simply append disconnected node
}
}
fn change_connected_pos(&mut self, action: ChangePosition) {
match action {
ChangePosition::AddDisconnected => {
// New disconnected node added => position of the first connected node moved by 1
self.first_connected_pos = self.first_connected_pos.map(|p| p + 1)
}
ChangePosition::AppendConnected { num_entries } => {
// If there were no previously connected nodes set mark to the given one (usually the last one)
// Otherwise keep it the same
self.first_connected_pos = self.first_connected_pos.or(Some(num_entries));
}
ChangePosition::RemoveConnected => {
if self.num_connected() == 1 {
// If it was the last connected node
self.first_connected_pos = None // Then mark there is no connected nodes left
}
// Otherwise keep mark the same
}
ChangePosition::RemoveDisconnected => {
// If there are connected nodes lower mark
// Otherwise keep it None
self.first_connected_pos = self
.first_connected_pos
.map(|p| p.checked_sub(1).unwrap_or(0))
}
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2019 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::kbucket::{InsertResult, Node, NodeStatus, PendingNode, SubBucket};
use std::time::Instant;
pub struct Swamp<TKey, TVal> {
bucket: SubBucket<Node<TKey, TVal>>,
pending: Option<PendingNode<TKey, TVal>>,
}
impl<TKey, TVal> Swamp<TKey, TVal> {
pub fn new() -> Self {
Self {
bucket: SubBucket::new(),
pending: None,
}
}
pub fn exists_active_pending(&self) -> bool {
self.pending.is_some() // TODO: check replace timeout
}
pub fn set_pending(&mut self, node: PendingNode<TKey, TVal>) {
self.pending = Some(node)
}
pub fn remove_pending(&mut self) {
self.pending = None
}
pub fn pending_ready(&self) -> bool {
self.pending
.as_ref()
.map_or(false, |pending| pending.replace <= Instant::now())
}
pub fn insert(&mut self, node: Node<TKey, TVal>, status: NodeStatus) -> InsertResult<TKey> {
match status {
NodeStatus::Connected => {
if self.bucket.is_full() {
if self.bucket.all_nodes_connected() || self.exists_active_pending() {
// TODO: check pending.replace in exists_active_pending & call apply_pending?
return InsertResult::Full;
} else {
self.set_pending(PendingNode {
node,
status: NodeStatus::Connected,
replace: Instant::now() + self.pending_timeout,
});
return InsertResult::Pending {
// Schedule a dial-up to check if the node is reachable
// NOTE: nodes[0] is disconnected (see all_nodes_connected check above)
// and the least recently connected
disconnected: self.nodes[0].key.clone(),
};
}
}
self.bucket.append_connected_node(node);
InsertResult::Inserted
}
NodeStatus::Disconnected => {
if self.bucket.is_full() {
return InsertResult::Full;
}
self.bucket.insert_disconnected_node(node);
InsertResult::Inserted
}
}
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright 2019 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::kbucket::{InsertResult, Node, NodeStatus, PendingNode, SubBucket};
use crate::W_VALUE;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::{Duration, Instant};
pub struct WeightedNode<TKey, TVal> {
/// The key of the node, identifying the peer.
pub key: TKey,
/// The associated value.
pub value: TVal,
pub weight: u32,
pub last_contact_time: u128,
}
impl<TKey, TVal> Into<Node<TKey, TVal>> for WeightedNode<TKey, TVal> {
fn into(self) -> Node<TKey, TVal> {
Node {
key: self.key,
value: self.value,
weight: self.weight,
}
}
}
pub struct Weighted<TKey, TVal> {
map: HashMap<u32, SubBucket<WeightedNode<TKey, TVal>>>,
pending: Option<PendingNode<TKey, TVal>>,
capacity: usize,
pending_timeout: Duration,
}
impl<TKey, TVal> Weighted<TKey, TVal> {
pub fn new(pending_timeout: Duration) -> Self {
Self {
map: HashMap::new(),
pending: None,
capacity: W_VALUE.get(),
pending_timeout,
}
}
pub fn all_nodes_connected(&self, weight: u32) -> bool {
self.map
.get(&weight)
.map_or_else(false, |bucket| bucket.all_nodes_connected())
}
pub fn pending_active(&self) -> bool {
self.pending.is_some() // TODO: check replace timeout
}
pub fn set_pending(&mut self, node: PendingNode<TKey, TVal>) {
self.pending = Some(node)
}
pub fn remove_pending(&mut self) {
self.pending = None
}
pub fn pending_ready(&self) -> bool {
self.pending
.as_ref()
.map_or(false, |pending| pending.replace <= Instant::now())
}
fn num_entries(&self) -> usize {
self.map.values().map(|bucket| bucket.nodes.len()).sum()
}
pub fn is_full(&self) -> bool {
self.num_entries() >= self.capacity
}
fn get_bucket_mut(&mut self, weight: u32) -> &mut SubBucket<WeightedNode<TKey, TVal>> {
match self.map.entry(weight) {
Entry::Occupied(mut e) => e.get_mut(),
Entry::Vacant(e) => {
let mut bucket = SubBucket::new();
bucket.append_connected_node(node);
bucket
}
}
}
fn append_connected_node(&mut self, node: WeightedNode<TKey, TVal>) {
self.get_bucket_mut(node.weight).append_connected_node(node)
}
fn insert_disconnected_node(&mut self, node: WeightedNode<TKey, TVal>) {
self.get_bucket_mut(node.weight)
.insert_disconnected_node(node)
}
fn min_key(&self) -> Option<u32> {
self.map.keys().min().cloned()
}
fn least_recent(&self, weight_bound: u32) -> Option<WeightedNode<TKey, TVal>> {
self.map
.iter()
.filter(|(&&key, _)| key <= weight_bound)
.map(|(_, bucket)| bucket.iter())
.flatten()
.min_by(|(a, b)| Ord::cmp(a.last_contact_time, b.last_contact_time))
}
pub fn insert(
&mut self,
node: WeightedNode<TKey, TVal>,
status: NodeStatus,
) -> InsertResult<TKey> {
match status {
NodeStatus::Connected => {
if !self.is_full() {
// If there's free space in bucket, append the node
self.append_connected_node(node);
InsertResult::Inserted
} else {
let min_key = self.min_key().expect("bucket MUST be full here");
if min_key < node.weight && !self.pending_active() {
// If bucket is full, but there's a sub-bucket with lower weight, and no pending node
// then set `node` to be pending, and schedule a dial-up check for the least recent node
match self.least_recent(node.weight) {
Some(least_recent) => {
self.set_pending(PendingNode {
node: node.into(),
status,
replace: Instant::now() + self.pending_timeout,
});
InsertResult::Pending {
disconnected: least_recent,
}
}
// There's no node to evict
None => InsertResult::Full,
}
} else {
InsertResult::Full
}
}
}
NodeStatus::Disconnected if !self.is_full() => {
self.insert_disconnected_node(node); // TODO: maybe schedule a dial-up to this node?
InsertResult::Inserted
}
_ => InsertResult::Full,
}
}
}

View File

@ -88,6 +88,9 @@ use std::num::NonZeroUsize;
/// The current value is `20`.
pub const K_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) };
/// Total number of weighted nodes in weighted bucket
pub const W_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) };
/// The `α` parameter of the Kademlia specification.
///
/// This parameter determines the default parallelism for iterative queries,