From d5c0112fbb9d327a49a01a90ea8a1d6ae37c09cd Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Tue, 24 Mar 2020 12:08:39 +0300 Subject: [PATCH] weighted bucket: implement insert() --- protocols/kad/src/kbucket.rs | 8 +- protocols/kad/src/kbucket/bucket.rs | 467 ++++++++++++------------ protocols/kad/src/kbucket/entry.rs | 9 +- protocols/kad/src/kbucket/sub_bucket.rs | 187 ++++++++++ protocols/kad/src/kbucket/swamp.rs | 84 +++++ protocols/kad/src/kbucket/weighted.rs | 168 +++++++++ protocols/kad/src/lib.rs | 3 + 7 files changed, 683 insertions(+), 243 deletions(-) create mode 100644 protocols/kad/src/kbucket/sub_bucket.rs create mode 100644 protocols/kad/src/kbucket/swamp.rs create mode 100644 protocols/kad/src/kbucket/weighted.rs diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index acafaa87..32f9ae83 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -69,12 +69,16 @@ mod bucket; mod entry; mod key; +mod sub_bucket; +mod weighted; +mod swamp; pub use entry::*; +pub use sub_bucket::*; use arrayvec::{self, ArrayVec}; use bucket::KBucket; -use std::collections::VecDeque; +use std::collections::{VecDeque, HashMap}; use std::time::{Duration, Instant}; use libp2p_core::identity::ed25519::{Keypair, PublicKey}; @@ -462,7 +466,7 @@ where /// Returns true if the bucket has a pending node. pub fn has_pending(&self) -> bool { - self.bucket.pending().map_or(false, |n| !n.is_ready()) + self.bucket.has_pending() } /// Tests whether the given distance falls into this bucket. diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs index 6c7ab09c..072d645e 100644 --- a/protocols/kad/src/kbucket/bucket.rs +++ b/protocols/kad/src/kbucket/bucket.rs @@ -25,125 +25,74 @@ //! > buckets in a `KBucketsTable` and hence is enforced by the public API //! > of the `KBucketsTable` and in particular the public `Entry` API. -pub use crate::K_VALUE; use super::*; +pub use crate::kbucket::sub_bucket::{Node, NodeStatus, PendingNode, Position, SubBucket}; +use crate::kbucket::swamp::Swamp; +use crate::kbucket::weighted::Weighted; +pub use crate::{K_VALUE, W_VALUE}; +use futures::StreamExt; -/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`. -#[derive(Debug, Clone)] -pub struct PendingNode { - /// The pending node to insert. - node: Node, - - /// The status of the pending node. - status: NodeStatus, - - /// The instant at which the pending node is eligible for insertion into a bucket. - replace: Instant, -} - -/// The status of a node in a bucket. -/// -/// The status of a node in a bucket together with the time of the -/// last status change determines the position of the node in a -/// bucket. -#[derive(PartialEq, Eq, Debug, Copy, Clone)] -pub enum NodeStatus { - /// The node is considered connected. - Connected, - /// The node is considered disconnected. - Disconnected -} - -impl PendingNode { - pub fn key(&self) -> &TKey { - &self.node.key - } - - pub fn status(&self) -> NodeStatus { - self.status - } - - pub fn value_mut(&mut self) -> &mut TVal { - &mut self.node.value - } - - pub fn is_ready(&self) -> bool { - Instant::now() >= self.replace - } - - pub fn set_ready_at(&mut self, t: Instant) { - self.replace = t; - } -} - -/// A `Node` in a bucket, representing a peer participating -/// in the Kademlia DHT together with an associated value (e.g. contact -/// information). -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Node { - /// The key of the node, identifying the peer. - pub key: TKey, - /// The associated value. - pub value: TVal, -} - -/// The position of a node in a `KBucket`, i.e. a non-negative integer -/// in the range `[0, K_VALUE)`. -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct Position(usize); - -/// A `KBucket` is a list of up to `K_VALUE` keys and associated values, -/// ordered from least-recently connected to most-recently connected. #[derive(Debug, Clone)] pub struct KBucket { - /// The nodes contained in the bucket. - nodes: ArrayVec<[Node; K_VALUE.get()]>, - - /// The position (index) in `nodes` that marks the first connected node. - /// - /// Since the entries in `nodes` are ordered from least-recently connected to - /// most-recently connected, all entries above this index are also considered - /// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries - /// that are considered disconnected and the range - /// `[first_connected_pos, K_VALUE)` marks sub-list of entries that are - /// considered connected. - /// - /// `None` indicates that there are no connected entries in the bucket, i.e. - /// the bucket is either empty, or contains only entries for peers that are - /// considered disconnected. - first_connected_pos: Option, - - /// A node that is pending to be inserted into a full bucket, should the - /// least-recently connected (and currently disconnected) node not be - /// marked as connected within `unresponsive_timeout`. - pending: Option>, - - /// The timeout window before a new pending node is eligible for insertion, - /// if the least-recently connected node is not updated as being connected - /// in the meantime. - pending_timeout: Duration + swamp: Swamp, + weighted: Weighted, + pending_timeout: Duration, } +/* +// /// A `KBucket` is a list of up to `K_VALUE` keys and associated values, +// /// ordered from least-recently connected to most-recently connected. +// #[derive(Debug, Clone)] +// pub struct KBucket { +// /// The nodes contained in the bucket. +// nodes: ArrayVec<[Node; K_VALUE.get()]>, +// +// /// The position (index) in `nodes` that marks the first connected node. +// /// +// /// Since the entries in `nodes` are ordered from least-recently connected to +// /// most-recently connected, all entries above this index are also considered +// /// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries +// /// that are considered disconnected and the range +// /// `[first_connected_pos, K_VALUE)` marks sub-list of entries that are +// /// considered connected. +// /// +// /// `None` indicates that there are no connected entries in the bucket, i.e. +// /// the bucket is either empty, or contains only entries for peers that are +// /// considered disconnected. +// first_connected_pos: Option, +// +// /// A node that is pending to be inserted into a full bucket, should the +// /// least-recently connected (and currently disconnected) node not be +// /// marked as connected within `unresponsive_timeout`. +// pending: Option>, +// +// /// The timeout window before a new pending node is eligible for insertion, +// /// if the least-recently connected node is not updated as being connected +// /// in the meantime. +// pending_timeout: Duration +// } +*/ + /// The result of inserting an entry into a bucket. #[must_use] #[derive(Debug, Clone, PartialEq, Eq)] pub enum InsertResult { - /// The entry has been successfully inserted. - Inserted, - /// The entry is pending insertion because the relevant bucket is currently full. - /// The entry is inserted after a timeout elapsed, if the status of the - /// least-recently connected (and currently disconnected) node in the bucket - /// is not updated before the timeout expires. - Pending { - /// The key of the least-recently connected entry that is currently considered - /// disconnected and whose corresponding peer should be checked for connectivity - /// in order to prevent it from being evicted. If connectivity to the peer is - /// re-established, the corresponding entry should be updated with - /// [`NodeStatus::Connected`]. - disconnected: TKey - }, - /// The entry was not inserted because the relevant bucket is full. - Full + /// The entry has been successfully inserted. + Inserted, + /// The entry is pending insertion because the relevant bucket is currently full. + /// The entry is inserted after a timeout elapsed, if the status of the + /// least-recently connected (and currently disconnected) node in the bucket + /// is not updated before the timeout expires. + Pending { + /// The key of the least-recently connected entry that is currently considered + /// disconnected and whose corresponding peer should be checked for connectivity + /// in order to prevent it from being evicted. If connectivity to the peer is + /// re-established, the corresponding entry should be updated with + /// [`NodeStatus::Connected`]. + disconnected: TKey, + }, + /// The entry was not inserted because the relevant bucket is full. + Full, } /// The result of applying a pending node to a bucket, possibly @@ -154,56 +103,55 @@ pub struct AppliedPending { pub inserted: Node, /// The node that has been evicted from the bucket to make room for the /// pending node, if any. - pub evicted: Option> + pub evicted: Option>, } enum ChangePosition { AddDisconnected, // num_entries – number of nodes in a bucket BEFORE appending - AppendConnected{ num_entries: usize }, + AppendConnected { num_entries: usize }, RemoveConnected, - RemoveDisconnected + RemoveDisconnected, } impl KBucket where TKey: Clone + AsRef, - TVal: Clone + TVal: Clone, { /// Creates a new `KBucket` with the given timeout for pending entries. pub fn new(pending_timeout: Duration) -> Self { KBucket { - nodes: ArrayVec::new(), - first_connected_pos: None, - pending: None, + swamp: Swamp::new(pending_timeout), + weighted: Weighted::new(pending_timeout), pending_timeout, } } + pub fn has_pending(&self) -> bool { + self.exists_active_pending(true) || self.exists_active_pending(false) + } + /// Returns a reference to the pending node of the bucket, if there is any. - pub fn pending(&self) -> Option<&PendingNode> { - self.pending.as_ref() + fn pending(&self) -> Option<&PendingNode> { + // self.swamp.as_ref() + unimplemented!("pending") } /// Returns a mutable reference to the pending node of the bucket, if there is any. pub fn pending_mut(&mut self) -> Option<&mut PendingNode> { - self.pending.as_mut() + // self.pending.as_mut() + unimplemented!("pending_mut") } /// Returns a reference to the pending node of the bucket, if there is any /// with a matching key. pub fn as_pending(&self, key: &TKey) -> Option<&PendingNode> { - self.pending().filter(|p| p.node.key.as_ref() == key.as_ref()) - } - - /// Returns a reference to a node in the bucket. - pub fn get(&self, key: &TKey) -> Option<&Node> { - self.position(key).map(|p| &self.nodes[p.0]) - } - - /// Returns an iterator over the nodes in the bucket, together with their status. - pub fn iter(&self) -> impl Iterator, NodeStatus)> { - self.nodes.iter().enumerate().map(move |(p, n)| (n, self.status(Position(p)))) + self.swamp_pending + .iter() + .chain(self.weighted_pending.iter()) + .find(|p| p.node.key.as_ref() == key.as_ref()) + // self.pending().filter(|p| p.node.key.as_ref() == key.as_ref()) } /// Inserts the pending node into the bucket, if its timeout has elapsed, @@ -214,32 +162,45 @@ where /// bucket remained unchanged. pub fn apply_pending(&mut self) -> Option> { if !self.pending_ready() { - return None + return None; } - self.pending.take().map(|PendingNode { node, status, .. }| { - let evicted = if self.is_full() { - Some(self.pop_node()) - } else { - None - }; + self.swamp_pending + .take() + .map(|PendingNode { node, status, .. }| { + let evicted = if self.is_full(node.weight > 0) { + Some(self.pop_node(node.weight)) + } else { + None + }; - - if let InsertResult::Inserted = self.insert(node.clone(), status) { - AppliedPending { inserted: node, evicted } - } else { - unreachable!("Bucket is not full, we just evicted a node.") - } - }) + if let InsertResult::Inserted = self.insert(node.clone(), status) { + AppliedPending { + inserted: node, + evicted, + } + } else { + unreachable!("Bucket is not full, we just evicted a node.") + } + }) } /// Updates the status of the pending node, if any. pub fn update_pending(&mut self, status: NodeStatus) { - if let Some(pending) = &mut self.pending { + if let Some(pending) = &mut self.swamp_pending { pending.status = status } } + /// Returns an iterator over the nodes in the bucket, together with their status. + pub fn iter(&self) -> impl Iterator, NodeStatus)> { + self.weighted + .values() + .map(|bucket| bucket.iter()) + .flatten() + .chain(self.swamp.iter()) + } + /// Updates the status of the node referred to by the given key, if it is /// in the bucket. pub fn update(&mut self, key: &TKey, new_status: NodeStatus) { @@ -258,8 +219,8 @@ where } // Reinsert the node with the desired status. match self.insert(node, new_status) { - InsertResult::Inserted => {}, - _ => unreachable!("The node is removed before being (re)inserted.") + InsertResult::Inserted => {} + _ => unreachable!("The node is removed before being (re)inserted."), } } } @@ -281,66 +242,61 @@ where /// i.e. as the most-recently disconnected node. If there are no connected nodes, /// the new node is added as the last element of the bucket. /// - pub fn insert(&mut self, node: Node, status: NodeStatus) -> InsertResult { - match status { - NodeStatus::Connected => { - if self.nodes.is_full() { - if self.all_nodes_connected() || self.exists_active_pending() { - return InsertResult::Full - } else { - self.set_pending(PendingNode { - node, - status: NodeStatus::Connected, - replace: Instant::now() + self.pending_timeout, - }); - return InsertResult::Pending { - // Schedule a dial-up to check if the node is reachable - // NOTE: nodes[0] is disconnected (see all_nodes_connected check above) - // and the least recently connected - disconnected: self.nodes[0].key.clone() - } - } - } - self.append_connected_node(node); - InsertResult::Inserted - } - NodeStatus::Disconnected => { - if self.nodes.is_full() { - return InsertResult::Full - } - self.insert_disconnected_node(node); - InsertResult::Inserted - } + pub fn insert(&mut self, node: Node, status: NodeStatus) -> InsertResult {} + + fn pending_ready(&self, weighted: bool) -> bool { + if weighted { + self.weighted.pending_ready() + } else { + self.swamp.pending_ready() } } - fn pending_ready(&self) -> bool { - self.pending.as_ref().map_or(false, |pending| pending.replace <= Instant::now()) + fn is_full(&self, weighted: bool) -> bool { + if weighted { + self.weighted.is_full() + } else { + self.swamp.is_full() + } } - fn is_full(&self) -> bool { - self.nodes.is_full() - } - - fn exists_active_pending(&self) -> bool { - self.pending.is_some() // TODO: check is replace has passed? + fn exists_active_pending(&self, weighted: bool) -> bool { + if weighted { + self.weighted.pending_active() + } else { + self.swamp.exists_active_pending() // TODO: check if replace has passed? + } } fn set_pending(&mut self, node: PendingNode) { - self.pending = Some(node); + if node.node.weight == 0 { + self.swamp.set_pending(node) + } else { + self.weighted.set_pending(node) + } } fn remove_pending(&mut self) { - self.pending = None + if node.node.weight == 0 { + self.swamp.remove_pending() + } else { + self.weighted.remove_pending() + } } - fn all_nodes_connected(&self) -> bool { - self.first_connected_pos == Some(0) + fn all_nodes_connected(&self, weight: u32) -> bool { + if weight == 0 { + self.swamp.all_nodes_connected() + } else { + self.weighted.all_nodes_connected(weight) + } } fn append_connected_node(&mut self, node: Node) { // `num_entries` MUST be calculated BEFORE insertion - self.change_connected_pos(ChangePosition::AppendConnected { num_entries: self.num_entries() }); + self.change_connected_pos(ChangePosition::AppendConnected { + num_entries: self.num_entries(), + }); self.nodes.push(node); } @@ -349,19 +305,21 @@ where self.change_connected_pos(ChangePosition::AddDisconnected); match current_position { Some(p) => self.nodes.insert(p, node), // Insert disconnected node just before the first connected node - None => self.nodes.push(node) // Or simply append disconnected node + None => self.nodes.push(node), // Or simply append disconnected node } } fn evict_node(&mut self, position: Position) -> Node { match self.status(position) { NodeStatus::Connected => self.change_connected_pos(ChangePosition::RemoveConnected), - NodeStatus::Disconnected => self.change_connected_pos(ChangePosition::RemoveDisconnected), + NodeStatus::Disconnected => { + self.change_connected_pos(ChangePosition::RemoveDisconnected) + } } self.nodes.remove(position.0) } - fn pop_node(&mut self) -> Node { + fn pop_node(&mut self, weight: u32) -> Node { self.evict_node(Position(0)) } @@ -370,14 +328,15 @@ where ChangePosition::AddDisconnected => { // New disconnected node added => position of the first connected node moved by 1 self.first_connected_pos = self.first_connected_pos.map(|p| p + 1) - }, + } ChangePosition::AppendConnected { num_entries } => { // If there were no previously connected nodes – set mark to the given one (usually the last one) // Otherwise – keep it the same self.first_connected_pos = self.first_connected_pos.or(Some(num_entries)); - }, + } ChangePosition::RemoveConnected => { - if self.num_connected() == 1 { // If it was the last connected node + if self.num_connected() == 1 { + // If it was the last connected node self.first_connected_pos = None // Then mark there is no connected nodes left } // Otherwise – keep mark the same @@ -385,7 +344,9 @@ where ChangePosition::RemoveDisconnected => { // If there are connected nodes – lower mark // Otherwise – keep it None - self.first_connected_pos = self.first_connected_pos.map(|p| p.checked_sub(1).unwrap_or(0)) + self.first_connected_pos = self + .first_connected_pos + .map(|p| p.checked_sub(1).unwrap_or(0)) } } } @@ -415,7 +376,8 @@ where /// Gets the number of entries in the bucket that are considered connected. pub fn num_connected(&self) -> usize { - self.first_connected_pos.map_or(0, |i| self.num_entries() - i) + self.first_connected_pos + .map_or(0, |i| self.num_entries() - i) } /// Gets the number of entries in the bucket that are considered disconnected. @@ -425,7 +387,10 @@ where /// Gets the position of an node in the bucket. pub fn position(&self, key: &TKey) -> Option { - self.nodes.iter().position(|p| p.key.as_ref() == key.as_ref()).map(Position) + self.nodes + .iter() + .position(|p| p.key.as_ref() == key.as_ref()) + .map(Position) } /// Gets a mutable reference to the node identified by the given key. @@ -433,30 +398,35 @@ where /// Returns `None` if the given key does not refer to a node in the /// bucket. pub fn get_mut(&mut self, key: &TKey) -> Option<&mut Node> { - self.nodes.iter_mut().find(move |p| p.key.as_ref() == key.as_ref()) + self.nodes + .iter_mut() + .find(move |p| p.key.as_ref() == key.as_ref()) } } #[cfg(test)] mod tests { + use super::*; use libp2p_core::PeerId; + use quickcheck::*; use rand::Rng; use std::collections::VecDeque; - use super::*; - use quickcheck::*; impl Arbitrary for KBucket, ()> { fn arbitrary(g: &mut G) -> KBucket, ()> { let timeout = Duration::from_secs(g.gen_range(1, g.size() as u64)); let mut bucket = KBucket::, ()>::new(timeout); let num_nodes = g.gen_range(1, K_VALUE.get() + 1); - for _ in 0 .. num_nodes { + for _ in 0..num_nodes { let key = Key::new(PeerId::random()); - let node = Node { key: key.clone(), value: () }; + let node = Node { + key: key.clone(), + value: (), + }; let status = NodeStatus::arbitrary(g); match bucket.insert(node, status) { InsertResult::Inserted => {} - _ => panic!() + _ => panic!(), } } bucket @@ -482,7 +452,7 @@ mod tests { // Fill a bucket with random nodes with the given status. fn fill_bucket(bucket: &mut KBucket, ()>, status: NodeStatus) { let num_entries_start = bucket.num_entries(); - for i in 0 .. K_VALUE.get() - num_entries_start { + for i in 0..K_VALUE.get() - num_entries_start { let key = Key::new(PeerId::random()); let node = Node { key, value: () }; assert_eq!(InsertResult::Inserted, bucket.insert(node, status)); @@ -502,13 +472,16 @@ mod tests { // Fill the bucket, thereby populating the expected lists in insertion order. for status in status { let key = Key::new(PeerId::random()); - let node = Node { key: key.clone(), value: () }; + let node = Node { + key: key.clone(), + value: (), + }; let full = bucket.num_entries() == K_VALUE.get(); match bucket.insert(node, status) { InsertResult::Inserted => { let vec = match status { NodeStatus::Connected => &mut connected, - NodeStatus::Disconnected => &mut disconnected + NodeStatus::Disconnected => &mut disconnected, }; if full { vec.pop_front(); @@ -520,21 +493,20 @@ mod tests { } // Get all nodes from the bucket, together with their status. - let mut nodes = bucket.iter() + let mut nodes = bucket + .iter() .map(|(n, s)| (s, n.key.clone())) .collect::>(); // Split the list of nodes at the first connected node. - let first_connected_pos = nodes.iter().position(|(s,_)| *s == NodeStatus::Connected); + let first_connected_pos = nodes.iter().position(|(s, _)| *s == NodeStatus::Connected); assert_eq!(bucket.first_connected_pos, first_connected_pos); let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p)); // All nodes before the first connected node must be disconnected and // in insertion order. Similarly, all remaining nodes must be connected // and in insertion order. - nodes == Vec::from(disconnected) - && - tail == Vec::from(connected) + nodes == Vec::from(disconnected) && tail == Vec::from(connected) } quickcheck(prop as fn(_) -> _); @@ -551,12 +523,12 @@ mod tests { let key = Key::new(PeerId::random()); let node = Node { key, value: () }; match bucket.insert(node, NodeStatus::Disconnected) { - InsertResult::Full => {}, - x => panic!("{:?}", x) + InsertResult::Full => {} + x => panic!("{:?}", x), } // One-by-one fill the bucket with connected nodes, replacing the disconnected ones. - for i in 0 .. K_VALUE.get() { + for i in 0..K_VALUE.get() { let (first, first_status) = bucket.iter().next().unwrap(); let first_disconnected = first.clone(); assert_eq!(first_status, NodeStatus::Disconnected); @@ -564,17 +536,21 @@ mod tests { // Add a connected node, which is expected to be pending, scheduled to // replace the first (i.e. least-recently connected) node. let key = Key::new(PeerId::random()); - let node = Node { key: key.clone(), value: () }; + let node = Node { + key: key.clone(), + value: (), + }; match bucket.insert(node.clone(), NodeStatus::Connected) { - InsertResult::Pending { disconnected } => - assert_eq!(disconnected, first_disconnected.key), - x => panic!("{:?}", x) + InsertResult::Pending { disconnected } => { + assert_eq!(disconnected, first_disconnected.key) + } + x => panic!("{:?}", x), } // Trying to insert another connected node fails. match bucket.insert(node.clone(), NodeStatus::Connected) { - InsertResult::Full => {}, - x => panic!("{:?}", x) + InsertResult::Full => {} + x => panic!("{:?}", x), } assert!(bucket.pending().is_some()); @@ -583,10 +559,13 @@ mod tests { let pending = bucket.pending_mut().expect("No pending node."); pending.set_ready_at(Instant::now() - Duration::from_secs(1)); let result = bucket.apply_pending(); - assert_eq!(result, Some(AppliedPending { - inserted: node.clone(), - evicted: Some(first_disconnected) - })); + assert_eq!( + result, + Some(AppliedPending { + inserted: node.clone(), + evicted: Some(first_disconnected) + }) + ); assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last()); assert!(bucket.pending().is_none()); assert_eq!(Some(K_VALUE.get() - (i + 1)), bucket.first_connected_pos); @@ -599,8 +578,8 @@ mod tests { let key = Key::new(PeerId::random()); let node = Node { key, value: () }; match bucket.insert(node, NodeStatus::Connected) { - InsertResult::Full => {}, - x => panic!("{:?}", x) + InsertResult::Full => {} + x => panic!("{:?}", x), } } @@ -613,7 +592,10 @@ mod tests { // Add a connected pending node. let key = Key::new(PeerId::random()); - let node = Node { key: key.clone(), value: () }; + let node = Node { + key: key.clone(), + value: (), + }; if let InsertResult::Pending { disconnected } = bucket.insert(node, NodeStatus::Connected) { assert_eq!(&disconnected, &first_disconnected.key); } else { @@ -626,16 +608,21 @@ mod tests { // The pending node has been discarded. assert!(bucket.pending().is_none()); - assert!(bucket.iter().all(|(n,_)| &n.key != &key)); + assert!(bucket.iter().all(|(n, _)| &n.key != &key)); // The initially disconnected node is now the most-recently connected. - assert_eq!(Some((&first_disconnected, NodeStatus::Connected)), bucket.iter().last()); - assert_eq!(bucket.position(&first_disconnected.key).map(|p| p.0), bucket.first_connected_pos); + assert_eq!( + Some((&first_disconnected, NodeStatus::Connected)), + bucket.iter().last() + ); + assert_eq!( + bucket.position(&first_disconnected.key).map(|p| p.0), + bucket.first_connected_pos + ); assert_eq!(1, bucket.num_connected()); assert_eq!(K_VALUE.get() - 1, bucket.num_disconnected()); } - #[test] fn bucket_update() { fn prop(mut bucket: KBucket, ()>, pos: Position, status: NodeStatus) -> bool { @@ -646,7 +633,10 @@ mod tests { let key = bucket.nodes[pos].key.clone(); // Record the (ordered) list of status of all nodes in the bucket. - let mut expected = bucket.iter().map(|(n,s)| (n.key.clone(), s)).collect::>(); + let mut expected = bucket + .iter() + .map(|(n, s)| (n.key.clone(), s)) + .collect::>(); // Update the node in the bucket. bucket.update(&key, status); @@ -655,14 +645,17 @@ mod tests { // preserving the status and relative order of all other nodes. let expected_pos = match status { NodeStatus::Connected => num_nodes - 1, - NodeStatus::Disconnected => bucket.first_connected_pos.unwrap_or(num_nodes) - 1 + NodeStatus::Disconnected => bucket.first_connected_pos.unwrap_or(num_nodes) - 1, }; expected.remove(pos); expected.insert(expected_pos, (key.clone(), status)); - let actual = bucket.iter().map(|(n,s)| (n.key.clone(), s)).collect::>(); + let actual = bucket + .iter() + .map(|(n, s)| (n.key.clone(), s)) + .collect::>(); expected == actual } - quickcheck(prop as fn(_,_,_) -> _); + quickcheck(prop as fn(_, _, _) -> _); } } diff --git a/protocols/kad/src/kbucket/entry.rs b/protocols/kad/src/kbucket/entry.rs index 4bb98671..3eff9c9b 100644 --- a/protocols/kad/src/kbucket/entry.rs +++ b/protocols/kad/src/kbucket/entry.rs @@ -115,7 +115,7 @@ where /// /// Returns `None` if the entry is neither present in a bucket nor /// pending insertion into a bucket. - pub fn view(&'a mut self) -> Option> { + pub fn view(&'a self) -> Option> { match self { Entry::Present(entry, status) => Some(EntryRefView { node: NodeRefView { @@ -189,7 +189,7 @@ where .value } - /// Sets the status of the entry to `NodeStatus::Disconnected`. + /// Sets the status of the entry. pub fn update(self, status: NodeStatus) -> Self { self.0.bucket.update(self.0.key, status); Self::new(self.0.bucket, self.0.key) @@ -218,7 +218,7 @@ where pub fn value(&mut self) -> &mut TVal { self.0.bucket .pending_mut() - .expect("We can only build a ConnectedPendingEntry if the entry is pending; QED") + .expect("We can only build a PendingEntry if the entry is pending; QED") .value_mut() } @@ -251,7 +251,8 @@ where pub fn insert(self, value: TVal, status: NodeStatus) -> InsertResult { self.0.bucket.insert(Node { key: self.0.key.clone(), - value + value, + weight: unimplemented!("TODO: pass weight to AbsentEntry") }, status) } } diff --git a/protocols/kad/src/kbucket/sub_bucket.rs b/protocols/kad/src/kbucket/sub_bucket.rs new file mode 100644 index 00000000..8be65aa1 --- /dev/null +++ b/protocols/kad/src/kbucket/sub_bucket.rs @@ -0,0 +1,187 @@ +/* + * Copyright 2019 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// #[derive(Debug, Clone)] +// struct WeightedSubBucket { +// nodes: Vec>, +// first_connected_pos: Option, +// pending: Option>, +// } +// +// impl WeightedSubBucket { +// pub fn new() -> Self { +// Self { +// nodes: Vec::new(), +// first_connected_pos: None, +// pending: None, +// } +// } +// } + +use crate::kbucket::InsertResult; +use crate::K_VALUE; +use arrayvec::ArrayVec; +use std::time::Instant; + +/// The status of a node in a bucket. +/// +/// The status of a node in a bucket together with the time of the +/// last status change determines the position of the node in a +/// bucket. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum NodeStatus { + /// The node is considered connected. + Connected, + /// The node is considered disconnected. + Disconnected, +} + +/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`. +#[derive(Debug, Clone)] +pub struct PendingNode { + /// The pending node to insert. + pub node: Node, + + /// The status of the pending node. + pub status: NodeStatus, + + /// The instant at which the pending node is eligible for insertion into a bucket. + pub replace: Instant, +} + +impl PendingNode { + pub fn key(&self) -> &TKey { + &self.node.key + } + + pub fn status(&self) -> NodeStatus { + self.status + } + + pub fn value_mut(&mut self) -> &mut TVal { + &mut self.node.value + } + + pub fn is_ready(&self) -> bool { + Instant::now() >= self.replace + } + + pub fn set_ready_at(&mut self, t: Instant) { + self.replace = t; + } +} + +/// A `Node` in a bucket, representing a peer participating +/// in the Kademlia DHT together with an associated value (e.g. contact +/// information). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Node { + /// The key of the node, identifying the peer. + pub key: TKey, + /// The associated value. + pub value: TVal, + pub weight: u32, +} + +/// The position of a node in a `KBucket`, i.e. a non-negative integer +/// in the range `[0, K_VALUE)`. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Position(pub usize); + +#[derive(Debug, Clone)] +pub struct SubBucket { + pub nodes: ArrayVec<[Node; K_VALUE.get()]>, + pub first_connected_pos: Option, + // pub pending: Option>, +} + +impl SubBucket { + pub fn new() -> Self { + Self { + nodes: ArrayVec::new(), + first_connected_pos: None, + } + } + + pub fn status(&self, pos: Position) -> NodeStatus { + if self.first_connected_pos.map_or(false, |i| pos.0 >= i) { + NodeStatus::Connected + } else { + NodeStatus::Disconnected + } + } + + /// Returns an iterator over the nodes in the bucket, together with their status. + pub fn iter(&self) -> impl Iterator { + self.nodes + .iter() + .enumerate() + .map(move |(p, n)| (n, self.status(Position(p)))) + } + + pub fn is_full(&self) -> bool { + self.nodes.is_full() + } + + pub fn all_nodes_connected(&self) -> bool { + self.first_connected_pos == Some(0) + } + + pub fn append_connected_node(&mut self, node: Node) { + // `num_entries` MUST be calculated BEFORE insertion + self.change_connected_pos(ChangePosition::AppendConnected { + num_entries: self.num_entries(), + }); + self.nodes.push(node); + } + + pub fn insert_disconnected_node(&mut self, node: Node) { + let current_position = self.first_connected_pos; + self.change_connected_pos(ChangePosition::AddDisconnected); + match current_position { + Some(p) => self.nodes.insert(p, node), // Insert disconnected node just before the first connected node + None => self.nodes.push(node), // Or simply append disconnected node + } + } + + fn change_connected_pos(&mut self, action: ChangePosition) { + match action { + ChangePosition::AddDisconnected => { + // New disconnected node added => position of the first connected node moved by 1 + self.first_connected_pos = self.first_connected_pos.map(|p| p + 1) + } + ChangePosition::AppendConnected { num_entries } => { + // If there were no previously connected nodes – set mark to the given one (usually the last one) + // Otherwise – keep it the same + self.first_connected_pos = self.first_connected_pos.or(Some(num_entries)); + } + ChangePosition::RemoveConnected => { + if self.num_connected() == 1 { + // If it was the last connected node + self.first_connected_pos = None // Then mark there is no connected nodes left + } + // Otherwise – keep mark the same + } + ChangePosition::RemoveDisconnected => { + // If there are connected nodes – lower mark + // Otherwise – keep it None + self.first_connected_pos = self + .first_connected_pos + .map(|p| p.checked_sub(1).unwrap_or(0)) + } + } + } +} diff --git a/protocols/kad/src/kbucket/swamp.rs b/protocols/kad/src/kbucket/swamp.rs new file mode 100644 index 00000000..2111c9e8 --- /dev/null +++ b/protocols/kad/src/kbucket/swamp.rs @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::kbucket::{InsertResult, Node, NodeStatus, PendingNode, SubBucket}; +use std::time::Instant; + +pub struct Swamp { + bucket: SubBucket>, + pending: Option>, +} + +impl Swamp { + pub fn new() -> Self { + Self { + bucket: SubBucket::new(), + pending: None, + } + } + + pub fn exists_active_pending(&self) -> bool { + self.pending.is_some() // TODO: check replace timeout + } + + pub fn set_pending(&mut self, node: PendingNode) { + self.pending = Some(node) + } + + pub fn remove_pending(&mut self) { + self.pending = None + } + + pub fn pending_ready(&self) -> bool { + self.pending + .as_ref() + .map_or(false, |pending| pending.replace <= Instant::now()) + } + + pub fn insert(&mut self, node: Node, status: NodeStatus) -> InsertResult { + match status { + NodeStatus::Connected => { + if self.bucket.is_full() { + if self.bucket.all_nodes_connected() || self.exists_active_pending() { + // TODO: check pending.replace in exists_active_pending & call apply_pending? + return InsertResult::Full; + } else { + self.set_pending(PendingNode { + node, + status: NodeStatus::Connected, + replace: Instant::now() + self.pending_timeout, + }); + return InsertResult::Pending { + // Schedule a dial-up to check if the node is reachable + // NOTE: nodes[0] is disconnected (see all_nodes_connected check above) + // and the least recently connected + disconnected: self.nodes[0].key.clone(), + }; + } + } + self.bucket.append_connected_node(node); + InsertResult::Inserted + } + NodeStatus::Disconnected => { + if self.bucket.is_full() { + return InsertResult::Full; + } + self.bucket.insert_disconnected_node(node); + InsertResult::Inserted + } + } + } +} diff --git a/protocols/kad/src/kbucket/weighted.rs b/protocols/kad/src/kbucket/weighted.rs new file mode 100644 index 00000000..98c23c8d --- /dev/null +++ b/protocols/kad/src/kbucket/weighted.rs @@ -0,0 +1,168 @@ +/* + * Copyright 2019 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::kbucket::{InsertResult, Node, NodeStatus, PendingNode, SubBucket}; +use crate::W_VALUE; +use std::cmp::Ordering; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +pub struct WeightedNode { + /// The key of the node, identifying the peer. + pub key: TKey, + /// The associated value. + pub value: TVal, + pub weight: u32, + pub last_contact_time: u128, +} + +impl Into> for WeightedNode { + fn into(self) -> Node { + Node { + key: self.key, + value: self.value, + weight: self.weight, + } + } +} + +pub struct Weighted { + map: HashMap>>, + pending: Option>, + capacity: usize, + pending_timeout: Duration, +} + +impl Weighted { + pub fn new(pending_timeout: Duration) -> Self { + Self { + map: HashMap::new(), + pending: None, + capacity: W_VALUE.get(), + pending_timeout, + } + } + + pub fn all_nodes_connected(&self, weight: u32) -> bool { + self.map + .get(&weight) + .map_or_else(false, |bucket| bucket.all_nodes_connected()) + } + + pub fn pending_active(&self) -> bool { + self.pending.is_some() // TODO: check replace timeout + } + + pub fn set_pending(&mut self, node: PendingNode) { + self.pending = Some(node) + } + + pub fn remove_pending(&mut self) { + self.pending = None + } + + pub fn pending_ready(&self) -> bool { + self.pending + .as_ref() + .map_or(false, |pending| pending.replace <= Instant::now()) + } + + fn num_entries(&self) -> usize { + self.map.values().map(|bucket| bucket.nodes.len()).sum() + } + + pub fn is_full(&self) -> bool { + self.num_entries() >= self.capacity + } + + fn get_bucket_mut(&mut self, weight: u32) -> &mut SubBucket> { + match self.map.entry(weight) { + Entry::Occupied(mut e) => e.get_mut(), + Entry::Vacant(e) => { + let mut bucket = SubBucket::new(); + bucket.append_connected_node(node); + bucket + } + } + } + + fn append_connected_node(&mut self, node: WeightedNode) { + self.get_bucket_mut(node.weight).append_connected_node(node) + } + + fn insert_disconnected_node(&mut self, node: WeightedNode) { + self.get_bucket_mut(node.weight) + .insert_disconnected_node(node) + } + + fn min_key(&self) -> Option { + self.map.keys().min().cloned() + } + + fn least_recent(&self, weight_bound: u32) -> Option> { + self.map + .iter() + .filter(|(&&key, _)| key <= weight_bound) + .map(|(_, bucket)| bucket.iter()) + .flatten() + .min_by(|(a, b)| Ord::cmp(a.last_contact_time, b.last_contact_time)) + } + + pub fn insert( + &mut self, + node: WeightedNode, + status: NodeStatus, + ) -> InsertResult { + match status { + NodeStatus::Connected => { + if !self.is_full() { + // If there's free space in bucket, append the node + self.append_connected_node(node); + InsertResult::Inserted + } else { + let min_key = self.min_key().expect("bucket MUST be full here"); + + if min_key < node.weight && !self.pending_active() { + // If bucket is full, but there's a sub-bucket with lower weight, and no pending node + // then set `node` to be pending, and schedule a dial-up check for the least recent node + match self.least_recent(node.weight) { + Some(least_recent) => { + self.set_pending(PendingNode { + node: node.into(), + status, + replace: Instant::now() + self.pending_timeout, + }); + InsertResult::Pending { + disconnected: least_recent, + } + } + // There's no node to evict + None => InsertResult::Full, + } + } else { + InsertResult::Full + } + } + } + NodeStatus::Disconnected if !self.is_full() => { + self.insert_disconnected_node(node); // TODO: maybe schedule a dial-up to this node? + InsertResult::Inserted + } + _ => InsertResult::Full, + } + } +} diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 258a1e86..e30fb607 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -88,6 +88,9 @@ use std::num::NonZeroUsize; /// The current value is `20`. pub const K_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) }; +/// Total number of weighted nodes in weighted bucket +pub const W_VALUE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) }; + /// The `α` parameter of the Kademlia specification. /// /// This parameter determines the default parallelism for iterative queries,