weighted bucket: implement update_pending

This commit is contained in:
folex 2020-03-24 19:27:06 +03:00
parent 0c724c815f
commit 1e9e42065a
4 changed files with 74 additions and 38 deletions

View File

@ -126,9 +126,13 @@ where
} }
/// Returns a reference to the pending node of the bucket, if there is any. /// Returns a reference to the pending node of the bucket, if there is any.
fn pending(&self) -> Option<&PendingNode<TKey, TVal>> { // TODO: maybe return `impl Iterator`?
// self.swamp.as_ref() fn pending(&self) -> Vec<&PendingNode<TKey, TVal>> {
unimplemented!("pending") Iterator::chain(
self.weighted.pending().into_iter(),
self.swamp.pending().into_iter(),
)
.collect()
} }
/// Returns a mutable reference to the pending node of the bucket, if there is any. /// Returns a mutable reference to the pending node of the bucket, if there is any.
@ -149,12 +153,16 @@ where
} }
/// Updates the status of the pending node, if any. /// Updates the status of the pending node, if any.
pub fn update_pending(&mut self, status: NodeStatus) { pub fn update_pending(&mut self, key: &TKey, status: NodeStatus) {
unimplemented!("update_pending"); if !self.weighted.update_pending(key, status) {
if !self.swamp.update_pending(key, status) {
// if let Some(pending) = &mut self.swamp_pending { println!(
// pending.status = status "Didn't update pending node {:?} to {:?}",
// } key.as_ref(),
status
)
}
}
} }
/// Gets a mutable reference to the node identified by the given key. /// Gets a mutable reference to the node identified by the given key.
@ -414,7 +422,7 @@ mod tests {
x => panic!("Expected Full, got {:?}", x), x => panic!("Expected Full, got {:?}", x),
} }
assert!(bucket.pending().is_some()); assert!(!bucket.pending().is_empty());
// Apply the pending node. // Apply the pending node.
let pending = bucket.pending_mut().expect("No pending node."); let pending = bucket.pending_mut().expect("No pending node.");
@ -428,11 +436,11 @@ mod tests {
}] }]
); );
assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last()); assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last());
assert!(bucket.pending().is_none()); assert!(bucket.pending().is_empty());
/* assert_eq!(Some(K_VALUE.get() - (i + 1)), bucket.first_connected_pos); */ /* assert_eq!(Some(K_VALUE.get() - (i + 1)), bucket.first_connected_pos); */
} }
assert!(bucket.pending().is_none()); assert!(bucket.pending().is_empty());
assert_eq!(K_VALUE.get(), bucket.num_entries()); assert_eq!(K_VALUE.get(), bucket.num_entries());
// Trying to insert another connected node fails. // Trying to insert another connected node fails.
@ -467,13 +475,13 @@ mod tests {
} else { } else {
panic!() panic!()
} }
assert!(bucket.pending().is_some()); assert!(!bucket.pending().is_empty());
// Update the status of the first disconnected node to be connected. // Update the status of the first disconnected node to be connected.
bucket.update(&first_disconnected.key, NodeStatus::Connected); bucket.update(&first_disconnected.key, NodeStatus::Connected);
// The pending node has been discarded. // The pending node has been discarded.
assert!(bucket.pending().is_none()); assert!(bucket.pending().is_empty());
assert!(bucket.iter().all(|(n, _)| &n.key != &key)); assert!(bucket.iter().all(|(n, _)| &n.key != &key));
// The initially disconnected node is now the most-recently connected. // The initially disconnected node is now the most-recently connected.

View File

@ -226,7 +226,7 @@ where
/// Updates the status of the pending entry. /// Updates the status of the pending entry.
pub fn update(self, status: NodeStatus) -> PendingEntry<'a, TKey, TVal> { pub fn update(self, status: NodeStatus) -> PendingEntry<'a, TKey, TVal> {
self.0.bucket.update_pending(status); self.0.bucket.update_pending(self.0.key, status);
PendingEntry::new(self.0.bucket, self.0.key) PendingEntry::new(self.0.bucket, self.0.key)
} }
} }

View File

@ -176,4 +176,21 @@ where
.position(|node| node.key.as_ref() == key.as_ref()) .position(|node| node.key.as_ref() == key.as_ref())
.map(|p| self.bucket.status(p)) .map(|p| self.bucket.status(p))
} }
pub fn update_pending(&mut self, key: &TKey, status: NodeStatus) -> bool {
if let Some(mut pending) = self.pending.as_mut() {
if pending.key().as_ref() == key.as_ref() {
pending.status = status;
true
} else {
false
}
} else {
false
}
}
pub fn pending(&self) -> Option<&PendingNode<TKey, TVal>> {
self.pending.as_ref()
}
} }

View File

@ -15,24 +15,12 @@
*/ */
use crate::kbucket::{ use crate::kbucket::{
AppliedPending, InsertResult, KeyBytes, Node, NodeStatus, Position, SubBucket, AppliedPending, InsertResult, KeyBytes, Node, NodeStatus, PendingNode, Position, SubBucket,
}; };
use crate::W_VALUE; use crate::W_VALUE;
use std::collections::HashMap; use std::collections::HashMap;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WeightedPendingNode<TKey, TVal> {
/// The pending node to insert.
pub node: WeightedNode<TKey, TVal>,
/// The status of the pending node.
pub status: NodeStatus,
/// The instant at which the pending node is eligible for insertion into a bucket.
pub replace: Instant,
}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct WeightedNode<TKey, TVal> { pub struct WeightedNode<TKey, TVal> {
pub inner: Node<TKey, TVal>, pub inner: Node<TKey, TVal>,
@ -86,7 +74,7 @@ impl WeightedPosition {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Weighted<TKey, TVal> { pub struct Weighted<TKey, TVal> {
map: HashMap<u32, SubBucket<WeightedNode<TKey, TVal>>>, map: HashMap<u32, SubBucket<WeightedNode<TKey, TVal>>>,
pending: Option<WeightedPendingNode<TKey, TVal>>, pending: Option<PendingNode<TKey, TVal>>,
capacity: usize, capacity: usize,
pending_timeout: Duration, pending_timeout: Duration,
} }
@ -111,7 +99,7 @@ where
.map_or(false, |bucket| bucket.all_nodes_connected()) .map_or(false, |bucket| bucket.all_nodes_connected())
} }
pub fn set_pending(&mut self, node: WeightedPendingNode<TKey, TVal>) { pub fn set_pending(&mut self, node: PendingNode<TKey, TVal>) {
self.pending = Some(node) self.pending = Some(node)
} }
@ -164,7 +152,8 @@ where
} }
fn append_connected_node(&mut self, node: WeightedNode<TKey, TVal>) { fn append_connected_node(&mut self, node: WeightedNode<TKey, TVal>) {
self.get_bucket_mut(node.inner.weight).append_connected_node(node) self.get_bucket_mut(node.inner.weight)
.append_connected_node(node)
} }
fn insert_disconnected_node(&mut self, node: WeightedNode<TKey, TVal>) { fn insert_disconnected_node(&mut self, node: WeightedNode<TKey, TVal>) {
@ -221,7 +210,9 @@ where
fn is_least_recently_connected(&self, node: &WeightedNode<TKey, TVal>) -> bool { fn is_least_recently_connected(&self, node: &WeightedNode<TKey, TVal>) -> bool {
let least_recent = self.least_recent(node.inner.weight); let least_recent = self.least_recent(node.inner.weight);
least_recent.map_or(false, |l_r| l_r.inner.key.as_ref() == node.inner.key.as_ref()) least_recent.map_or(false, |l_r| {
l_r.inner.key.as_ref() == node.inner.key.as_ref()
})
} }
pub fn insert<Node: Into<WeightedNode<TKey, TVal>>>( pub fn insert<Node: Into<WeightedNode<TKey, TVal>>>(
@ -244,10 +235,13 @@ where
if min_key < node.inner.weight && !self.pending_exists() { if min_key < node.inner.weight && !self.pending_exists() {
// If bucket is full, but there's a sub-bucket with lower weight, and no pending node // If bucket is full, but there's a sub-bucket with lower weight, and no pending node
// then set `node` to be pending, and schedule a dial-up check for the least recent node // then set `node` to be pending, and schedule a dial-up check for the least recent node
match self.least_recent(node.inner.weight).map(|lr| lr.inner.key.clone()) { match self
.least_recent(node.inner.weight)
.map(|lr| lr.inner.key.clone())
{
Some(least_recent_key) => { Some(least_recent_key) => {
self.set_pending(WeightedPendingNode { self.set_pending(PendingNode {
node, node: node.inner,
status, status,
replace: Instant::now() + self.pending_timeout, replace: Instant::now() + self.pending_timeout,
}); });
@ -278,16 +272,16 @@ where
self.pending self.pending
.take() .take()
.and_then(|WeightedPendingNode { node, status, .. }| { .and_then(|PendingNode { node, status, .. }| {
let evicted = if self.is_full() { let evicted = if self.is_full() {
self.pop_node(node.inner.weight) self.pop_node(node.weight)
} else { } else {
None None
}; };
if let InsertResult::Inserted = self.insert(node.clone(), status) { if let InsertResult::Inserted = self.insert(node.clone(), status) {
Some(AppliedPending { Some(AppliedPending {
inserted: node.into(), inserted: node,
evicted: evicted.map(|e| e.into()), evicted: evicted.map(|e| e.into()),
}) })
} else { } else {
@ -336,4 +330,21 @@ where
.map(|bucket| bucket.status(Position(position.position))) .map(|bucket| bucket.status(Position(position.position)))
}) })
} }
pub fn update_pending(&mut self, key: &TKey, status: NodeStatus) -> bool {
if let Some(mut pending) = self.pending.as_mut() {
if pending.key().as_ref() == key.as_ref() {
pending.status = status;
true
} else {
false
}
} else {
false
}
}
pub fn pending(&self) -> Option<&PendingNode<TKey, TVal>> {
self.pending.as_ref()
}
} }