diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs index 815fd919..61ba64bb 100644 --- a/protocols/kad/src/kbucket/bucket.rs +++ b/protocols/kad/src/kbucket/bucket.rs @@ -126,9 +126,13 @@ where } /// Returns a reference to the pending node of the bucket, if there is any. - fn pending(&self) -> Option<&PendingNode> { - // self.swamp.as_ref() - unimplemented!("pending") + // TODO: maybe return `impl Iterator`? + fn pending(&self) -> Vec<&PendingNode> { + Iterator::chain( + self.weighted.pending().into_iter(), + self.swamp.pending().into_iter(), + ) + .collect() } /// Returns a mutable reference to the pending node of the bucket, if there is any. @@ -149,12 +153,16 @@ where } /// Updates the status of the pending node, if any. - pub fn update_pending(&mut self, status: NodeStatus) { - unimplemented!("update_pending"); - - // if let Some(pending) = &mut self.swamp_pending { - // pending.status = status - // } + pub fn update_pending(&mut self, key: &TKey, status: NodeStatus) { + if !self.weighted.update_pending(key, status) { + if !self.swamp.update_pending(key, status) { + println!( + "Didn't update pending node {:?} to {:?}", + key.as_ref(), + status + ) + } + } } /// Gets a mutable reference to the node identified by the given key. @@ -414,7 +422,7 @@ mod tests { x => panic!("Expected Full, got {:?}", x), } - assert!(bucket.pending().is_some()); + assert!(!bucket.pending().is_empty()); // Apply the pending node. let pending = bucket.pending_mut().expect("No pending node."); @@ -428,11 +436,11 @@ mod tests { }] ); assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last()); - assert!(bucket.pending().is_none()); + assert!(bucket.pending().is_empty()); /* assert_eq!(Some(K_VALUE.get() - (i + 1)), bucket.first_connected_pos); */ } - assert!(bucket.pending().is_none()); + assert!(bucket.pending().is_empty()); assert_eq!(K_VALUE.get(), bucket.num_entries()); // Trying to insert another connected node fails. @@ -467,13 +475,13 @@ mod tests { } else { panic!() } - assert!(bucket.pending().is_some()); + assert!(!bucket.pending().is_empty()); // Update the status of the first disconnected node to be connected. bucket.update(&first_disconnected.key, NodeStatus::Connected); // The pending node has been discarded. - assert!(bucket.pending().is_none()); + assert!(bucket.pending().is_empty()); assert!(bucket.iter().all(|(n, _)| &n.key != &key)); // The initially disconnected node is now the most-recently connected. diff --git a/protocols/kad/src/kbucket/entry.rs b/protocols/kad/src/kbucket/entry.rs index 1dc77b31..be61b731 100644 --- a/protocols/kad/src/kbucket/entry.rs +++ b/protocols/kad/src/kbucket/entry.rs @@ -226,7 +226,7 @@ where /// Updates the status of the pending entry. pub fn update(self, status: NodeStatus) -> PendingEntry<'a, TKey, TVal> { - self.0.bucket.update_pending(status); + self.0.bucket.update_pending(self.0.key, status); PendingEntry::new(self.0.bucket, self.0.key) } } diff --git a/protocols/kad/src/kbucket/swamp.rs b/protocols/kad/src/kbucket/swamp.rs index 6b6ca209..a77635dc 100644 --- a/protocols/kad/src/kbucket/swamp.rs +++ b/protocols/kad/src/kbucket/swamp.rs @@ -176,4 +176,21 @@ where .position(|node| node.key.as_ref() == key.as_ref()) .map(|p| self.bucket.status(p)) } + + pub fn update_pending(&mut self, key: &TKey, status: NodeStatus) -> bool { + if let Some(mut pending) = self.pending.as_mut() { + if pending.key().as_ref() == key.as_ref() { + pending.status = status; + true + } else { + false + } + } else { + false + } + } + + pub fn pending(&self) -> Option<&PendingNode> { + self.pending.as_ref() + } } diff --git a/protocols/kad/src/kbucket/weighted.rs b/protocols/kad/src/kbucket/weighted.rs index 460f9b80..0422ee97 100644 --- a/protocols/kad/src/kbucket/weighted.rs +++ b/protocols/kad/src/kbucket/weighted.rs @@ -15,24 +15,12 @@ */ use crate::kbucket::{ - AppliedPending, InsertResult, KeyBytes, Node, NodeStatus, Position, SubBucket, + AppliedPending, InsertResult, KeyBytes, Node, NodeStatus, PendingNode, Position, SubBucket, }; use crate::W_VALUE; use std::collections::HashMap; use std::time::{Duration, Instant}; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct WeightedPendingNode { - /// The pending node to insert. - pub node: WeightedNode, - - /// The status of the pending node. - pub status: NodeStatus, - - /// The instant at which the pending node is eligible for insertion into a bucket. - pub replace: Instant, -} - #[derive(Debug, Clone, PartialEq, Eq)] pub struct WeightedNode { pub inner: Node, @@ -86,7 +74,7 @@ impl WeightedPosition { #[derive(Debug, Clone)] pub struct Weighted { map: HashMap>>, - pending: Option>, + pending: Option>, capacity: usize, pending_timeout: Duration, } @@ -111,7 +99,7 @@ where .map_or(false, |bucket| bucket.all_nodes_connected()) } - pub fn set_pending(&mut self, node: WeightedPendingNode) { + pub fn set_pending(&mut self, node: PendingNode) { self.pending = Some(node) } @@ -164,7 +152,8 @@ where } fn append_connected_node(&mut self, node: WeightedNode) { - self.get_bucket_mut(node.inner.weight).append_connected_node(node) + self.get_bucket_mut(node.inner.weight) + .append_connected_node(node) } fn insert_disconnected_node(&mut self, node: WeightedNode) { @@ -221,7 +210,9 @@ where fn is_least_recently_connected(&self, node: &WeightedNode) -> bool { let least_recent = self.least_recent(node.inner.weight); - least_recent.map_or(false, |l_r| l_r.inner.key.as_ref() == node.inner.key.as_ref()) + least_recent.map_or(false, |l_r| { + l_r.inner.key.as_ref() == node.inner.key.as_ref() + }) } pub fn insert>>( @@ -244,10 +235,13 @@ where if min_key < node.inner.weight && !self.pending_exists() { // If bucket is full, but there's a sub-bucket with lower weight, and no pending node // then set `node` to be pending, and schedule a dial-up check for the least recent node - match self.least_recent(node.inner.weight).map(|lr| lr.inner.key.clone()) { + match self + .least_recent(node.inner.weight) + .map(|lr| lr.inner.key.clone()) + { Some(least_recent_key) => { - self.set_pending(WeightedPendingNode { - node, + self.set_pending(PendingNode { + node: node.inner, status, replace: Instant::now() + self.pending_timeout, }); @@ -278,16 +272,16 @@ where self.pending .take() - .and_then(|WeightedPendingNode { node, status, .. }| { + .and_then(|PendingNode { node, status, .. }| { let evicted = if self.is_full() { - self.pop_node(node.inner.weight) + self.pop_node(node.weight) } else { None }; if let InsertResult::Inserted = self.insert(node.clone(), status) { Some(AppliedPending { - inserted: node.into(), + inserted: node, evicted: evicted.map(|e| e.into()), }) } else { @@ -336,4 +330,21 @@ where .map(|bucket| bucket.status(Position(position.position))) }) } + + pub fn update_pending(&mut self, key: &TKey, status: NodeStatus) -> bool { + if let Some(mut pending) = self.pending.as_mut() { + if pending.key().as_ref() == key.as_ref() { + pending.status = status; + true + } else { + false + } + } else { + false + } + } + + pub fn pending(&self) -> Option<&PendingNode> { + self.pending.as_ref() + } }