From d8b42a5cb11b8e75eebb867e9b64ab7e24e3ae17 Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Fri, 20 Mar 2020 19:36:33 +0300 Subject: [PATCH] bucket: apply_pending works --- protocols/kad/src/kbucket/bucket.rs | 120 ++++++---------------------- 1 file changed, 26 insertions(+), 94 deletions(-) diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs index 31feea26..83eb2a6a 100644 --- a/protocols/kad/src/kbucket/bucket.rs +++ b/protocols/kad/src/kbucket/bucket.rs @@ -213,103 +213,24 @@ where /// the node that was replaced. `None` indicates that the nodes in the /// bucket remained unchanged. pub fn apply_pending(&mut self) -> Option> { - if let Some(pending) = self.pending.take() { - if pending.replace <= Instant::now() { - if self.nodes.is_full() { - if self.all_nodes_connected() { - // The bucket is full with connected nodes. Drop the pending node. - return None - } - debug_assert!(self.first_connected_pos.map_or(true, |p| p > 0)); // (*) - return match pending.status { - NodeStatus::Connected => { - // A connected pending node goes at the end of the list for - // the connected peers, removing the least-recently connected. - let evicted = self.pop_node(); - self.append_connected_node(pending.node.clone()); - - Some(AppliedPending { inserted: pending.node, evicted: Some(evicted) }) - }, - NodeStatus::Disconnected => { - // A disconnected pending node goes at the end of the list - // for the disconnected peers. - let evicted = self.pop_node(); - self.insert_disconnected_node(pending.node.clone()); - - Some(AppliedPending { inserted: pending.node, evicted: Some(evicted) }) - }, - } - } else { - // There is room in the bucket, so just insert the pending node. - let inserted = pending.node.clone(); - match self.insert(pending.node, pending.status) { - InsertResult::Inserted => - return Some(AppliedPending { inserted, evicted: None }), - _ => unreachable!("Bucket is not full.") - } - } - } else { - self.pending = Some(pending); - } + if !self.pending_ready() { + return None } - return None - } + self.pending.take().map(|PendingNode { node, status, .. }| { + let evicted = if self.is_full() { + Some(self.pop_node()) + } else { + None + }; - // pub fn apply_pending(&mut self) -> Option> { - // if let Some(pending) = self.pending.take() { - // if pending.replace <= Instant::now() { - // if self.nodes.is_full() { - // if self.all_nodes_connected() { - // // The bucket is full with connected nodes. Drop the pending node. - // return None - // } - // debug_assert!(self.first_connected_pos.map_or(true, |p| p > 0)); // (*) - // // The pending node will be inserted. - // let inserted = pending.node.clone(); - // - // return match pending.status { - // NodeStatus::Connected => { - // // A connected pending node goes at the end of the list for - // // the connected peers, removing the least-recently connected. - // self.append_connected_node(pending.node); - // let evicted = Some(self.pop_node()); - // - // Some(AppliedPending { inserted, evicted }) - // }, - // NodeStatus::Disconnected if self.num_connected() > 0 => { - // let p = self.first_connected_pos.unwrap(); - // // A disconnected pending node goes at the end of the list - // // for the disconnected peers. - // let insert_pos = p.checked_sub(1).expect("by (*)"); - // let evicted = Some(self.nodes.remove(0)); - // self.nodes.insert(insert_pos, pending.node); - // Some(AppliedPending { inserted, evicted }) - // }, - // NodeStatus::Disconnected => { - // // All nodes are disconnected. Insert the new node as the most - // // recently disconnected, removing the least-recently disconnected. - // let evicted = Some(self.nodes.remove(0)); - // self.nodes.push(pending.node); - // Some(AppliedPending { inserted, evicted }) - // } - // } - // } else { - // // There is room in the bucket, so just insert the pending node. - // let inserted = pending.node.clone(); - // match self.insert(pending.node, pending.status) { - // InsertResult::Inserted => - // return Some(AppliedPending { inserted, evicted: None }), - // _ => unreachable!("Bucket is not full.") - // } - // } - // } else { - // self.pending = Some(pending); - // } - // } - // - // return None - // } + if let InsertResult::Inserted = self.insert(node.clone(), status) { + AppliedPending { inserted: node, evicted } + } else { + unreachable!("Bucket is not full, we just evicted a node.") + } + }) + } /// Updates the status of the pending node, if any. pub fn update_pending(&mut self, status: NodeStatus) { @@ -376,6 +297,9 @@ where replace: Instant::now() + self.pending_timeout, }); return InsertResult::Pending { + // Schedule a dial-up to check if the node is reachable + // NOTE: nodes[0] is disconnected (see all_nodes_connected check above) + // and the least recently connected disconnected: self.nodes[0].key.clone() } } @@ -393,6 +317,14 @@ where } } + fn pending_ready(&self) -> bool { + self.pending.as_ref().map_or(false, |pending| pending.replace <= Instant::now()) + } + + fn is_full(&self) -> bool { + self.nodes.is_full() + } + fn exists_active_pending(&self) -> bool { self.pending.is_some() // TODO: check is replace has passed? }