diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 89d86125..f2ce1adb 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -19,8 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::addresses::Addresses; -use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; -use crate::kbucket::{KBucketsTable, KBucketsPeerId, Update}; +use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn}; +use crate::kbucket::{self, KBucketsTable, KBucketsPeerId}; use crate::protocol::{KadConnectionType, KadPeer}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; use fnv::{FnvHashMap, FnvHashSet}; @@ -30,7 +30,7 @@ use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use multihash::Multihash; use rand; use smallvec::SmallVec; -use std::{cmp::Ordering, error, marker::PhantomData, time::Duration, time::Instant}; +use std::{cmp::Ordering, error, marker::PhantomData, num::NonZeroUsize, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Interval; @@ -136,7 +136,7 @@ impl KBucketsPeerId for QueryInfo { self.as_ref().distance_with(other) } - fn max_distance() -> usize { + fn max_distance() -> NonZeroUsize { ::max_distance() } } @@ -192,31 +192,62 @@ impl Kademlia { /// Creates a `Kademlia`. /// /// Contrary to `new`, doesn't perform the initialization queries that store our local ID into - /// the DHT. + /// the DHT and fill our buckets. #[inline] pub fn without_init(local_peer_id: PeerId) -> Self { Self::new_inner(local_peer_id, false) } - /// Adds a known address for the given `PeerId`. - #[deprecated(note = "Use add_connected_address or add_not_connected_address instead")] - pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.add_connected_address(peer_id, address) - } - /// Adds a known address for the given `PeerId`. We are connected to this address. + // TODO: report if the address was inserted? also, semantics unclear pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - if let Some(list) = self.kbuckets.entry_mut(peer_id) { - list.insert_connected(address); - } + self.add_address(peer_id, address, true) } /// Adds a known address for the given `PeerId`. We are not connected or don't know whether we /// are connected to this address. + // TODO: report if the address was inserted? also, semantics unclear pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - if let Some(list) = self.kbuckets.entry_mut(peer_id) { - list.insert_not_connected(address); - } + self.add_address(peer_id, address, false) + } + + /// Underlying implementation for `add_connected_address` and `add_not_connected_address`. + fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, connected: bool) { + match self.kbuckets.entry(peer_id) { + kbucket::Entry::InKbucketConnected(mut entry) => if connected { + entry.value().insert_connected(address) + } else { + entry.value().insert_not_connected(address) + }, + kbucket::Entry::InKbucketConnectedPending(mut entry) => if connected { + entry.value().insert_connected(address) + } else { + entry.value().insert_not_connected(address) + }, + kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert_not_connected(address), + kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert_not_connected(address), + kbucket::Entry::NotInKbucket(entry) => { + let mut addresses = Addresses::new(); + addresses.insert_not_connected(address); + match entry.insert_disconnected(addresses) { + kbucket::InsertOutcome::Inserted => { + let event = KademliaOut::KBucketAdded { + peer_id: peer_id.clone(), + replaced: None, + }; + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); + }, + kbucket::InsertOutcome::Full => (), + kbucket::InsertOutcome::Pending { to_ping } => { + self.queued_events.push(NetworkBehaviourAction::DialPeer { + peer_id: to_ping.clone(), + }) + }, + } + return; + }, + kbucket::Entry::SelfEntry => return, + }; } /// Inner implementation of the constructors. @@ -241,16 +272,7 @@ impl Kademlia { }; if initialize { - // As part of the initialization process, we start one `FIND_NODE` for each bit of the - // possible range of peer IDs. - for n in 0..256 { - let target = match gen_random_id(behaviour.kbuckets.my_id(), n) { - Ok(p) => p, - Err(()) => continue, - }; - - behaviour.start_query(QueryInfoInner::Initialization { target }); - } + behaviour.initialize(); } behaviour @@ -258,17 +280,34 @@ impl Kademlia { } impl Kademlia { + /// Performs the Kademlia initialization process. + /// + /// If you called `new` to create the `Kademlia`, then this has been started. Calling this + /// method manually is useful in order to re-perform the initialization later. + /// + /// Starts one query per bucket with the intention of connecting to nodes along the way and + /// fill our own buckets. This also adds the effect of adding our local node to other nodes' + /// buckets. + pub fn initialize(&mut self) { + for n in 0..256 { // TODO: 256 should be grabbed from the kbuckets module + let target = match gen_random_id(self.kbuckets.my_id(), n) { + Ok(p) => p, + Err(()) => continue, + }; + + self.start_query(QueryInfoInner::Initialization { target }); + } + } + /// Starts an iterative `FIND_NODE` request. /// /// This will eventually produce an event containing the nodes of the DHT closest to the /// requested `PeerId`. - #[inline] pub fn find_node(&mut self, peer_id: PeerId) { self.start_query(QueryInfoInner::FindPeer(peer_id)); } /// Starts an iterative `GET_PROVIDERS` request. - #[inline] pub fn get_providers(&mut self, target: Multihash) { self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() }); } @@ -322,7 +361,7 @@ impl Kademlia { }; let known_closest_peers = self.kbuckets - .find_closest::(target.as_ref()) + .find_closest(target.as_ref()) .take(self.num_results); self.active_queries.insert( @@ -353,7 +392,8 @@ where // We should order addresses from decreasing likelyhood of connectivity, so start with // the addresses of that peer in the k-buckets. let mut out_list = self.kbuckets - .get(peer_id) + .entry(peer_id) + .value_not_pending() .map(|l| l.iter().cloned().collect::>()) .unwrap_or_else(Vec::new); @@ -378,16 +418,58 @@ where }); } - if let Update::Pending(to_ping) = self.kbuckets.set_connected(&id) { - self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.clone(), - }) - } + let address = match endpoint { + ConnectedPoint::Dialer { address } => Some(address), + ConnectedPoint::Listener { .. } => None, + }; - if let ConnectedPoint::Dialer { address } = endpoint { - if let Some(list) = self.kbuckets.entry_mut(&id) { - list.insert_connected(address); - } + match self.kbuckets.entry(&id) { + kbucket::Entry::InKbucketConnected(_) | + kbucket::Entry::InKbucketConnectedPending(_) => { + unreachable!("Kbuckets are always kept in sync with the connection state; QED") + }, + + kbucket::Entry::InKbucketDisconnected(mut entry) => { + debug_assert!(!entry.value().is_connected()); + if let Some(address) = address { + entry.value().insert_connected(address); + } + entry.set_connected(); + }, + + kbucket::Entry::InKbucketDisconnectedPending(mut entry) => { + debug_assert!(!entry.value().is_connected()); + if let Some(address) = address { + entry.value().insert_connected(address); + } + entry.set_connected(); + }, + + kbucket::Entry::NotInKbucket(entry) => { + let mut addresses = Addresses::new(); + if let Some(address) = address { + addresses.insert_connected(address); + } + match entry.insert_connected(addresses) { + kbucket::InsertOutcome::Inserted => { + let event = KademliaOut::KBucketAdded { + peer_id: id.clone(), + replaced: None, + }; + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); + }, + kbucket::InsertOutcome::Full => (), + kbucket::InsertOutcome::Pending { to_ping } => { + self.queued_events.push(NetworkBehaviourAction::DialPeer { + peer_id: to_ping.clone(), + }) + }, + } + }, + + kbucket::Entry::SelfEntry => { + unreachable!("Guaranteed to never receive disconnected even for self; QED") + }, } self.connected_peers.insert(id); @@ -395,7 +477,7 @@ where fn inject_dial_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) { if let Some(peer_id) = peer_id { - if let Some(list) = self.kbuckets.get_mut(peer_id) { + if let Some(list) = self.kbuckets.entry(peer_id).value() { // TODO: don't remove the address if the error is that we are already connected // to this peer list.remove_addr(addr); @@ -412,13 +494,38 @@ where } if let ConnectedPoint::Dialer { address } = old_endpoint { - if let Some(list) = self.kbuckets.get_mut(id) { - debug_assert!(list.is_connected()); - list.set_disconnected(&address); + match self.kbuckets.entry(id) { + kbucket::Entry::InKbucketConnected(mut entry) => { + debug_assert!(entry.value().is_connected()); + entry.value().set_disconnected(&address); + match entry.set_disconnected() { + kbucket::SetDisconnectedOutcome::Kept(_) => {}, + kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => { + let event = KademliaOut::KBucketAdded { + peer_id: replacement, + replaced: Some(id.clone()), + }; + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); + }, + } + }, + kbucket::Entry::InKbucketConnectedPending(mut entry) => { + debug_assert!(entry.value().is_connected()); + entry.value().set_disconnected(&address); + entry.set_disconnected(); + }, + kbucket::Entry::InKbucketDisconnected(_) => { + unreachable!("Kbuckets are always kept in sync with the connection state; QED") + }, + kbucket::Entry::InKbucketDisconnectedPending(_) => { + unreachable!("Kbuckets are always kept in sync with the connection state; QED") + }, + kbucket::Entry::NotInKbucket(_) => (), + kbucket::Entry::SelfEntry => { + unreachable!("Guaranteed to never receive disconnected even for self; QED") + }, } } - - self.kbuckets.set_disconnected(&id); } fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { @@ -432,14 +539,12 @@ where } } - if let ConnectedPoint::Dialer { address } = old_endpoint { - if let Some(list) = self.kbuckets.get_mut(&peer_id) { + if let Some(list) = self.kbuckets.entry(&peer_id).value() { + if let ConnectedPoint::Dialer { address } = old_endpoint { list.set_disconnected(&address); } - } - if let ConnectedPoint::Dialer { address } = new_endpoint { - if let Some(list) = self.kbuckets.entry_mut(&peer_id) { + if let ConnectedPoint::Dialer { address } = new_endpoint { list.insert_connected(address); } } @@ -451,7 +556,7 @@ where let closer_peers = self.kbuckets .find_closest(&key) .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, &self.kbuckets)) + .map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets)) .collect(); self.queued_events.push(NetworkBehaviourAction::SendEvent { @@ -487,15 +592,18 @@ where let closer_peers = self.kbuckets .find_closest(&key) .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, &self.kbuckets)) + .map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets)) .collect(); - let provider_peers = self.values_providers - .get(&key) - .into_iter() - .flat_map(|peers| peers) - .map(|peer_id| build_kad_peer(peer_id.clone(), &self.kbuckets)) - .collect(); + let provider_peers = { + let kbuckets = &mut self.kbuckets; + self.values_providers + .get(&key) + .into_iter() + .flat_map(|peers| peers) + .map(move |peer_id| build_kad_peer(peer_id.clone(), kbuckets)) + .collect() + }; self.queued_events.push(NetworkBehaviourAction::SendEvent { peer_id: source, @@ -661,7 +769,7 @@ where peer_id: closest, event: KademliaHandlerIn::AddProvider { key: target.clone(), - provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &self.kbuckets), + provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &mut self.kbuckets), }, }; @@ -680,6 +788,9 @@ where #[derive(Debug, Clone)] pub enum KademliaOut { /// We have discovered a node. + /// + /// > **Note**: The Kademlia behaviour doesn't store the addresses of this node, and therefore + /// > attempting to connect to this node may or may not work. Discovered { /// Id of the node that was discovered. peer_id: PeerId, @@ -689,6 +800,14 @@ pub enum KademliaOut { ty: KadConnectionType, }, + /// A node has been added to a k-bucket. + KBucketAdded { + /// Id of the node that was added. + peer_id: PeerId, + /// If `Some`, this addition replaced the value that is inside the option. + replaced: Option, + }, + /// Result of a `FIND_NODE` iterative query. FindNodeResult { /// The key that we looked for in the query. @@ -747,11 +866,11 @@ fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result { /// > **Note**: This is just a convenience function that doesn't do anything note-worthy. fn build_kad_peer( peer_id: PeerId, - kbuckets: &KBucketsTable + kbuckets: &mut KBucketsTable ) -> KadPeer { debug_assert_ne!(*kbuckets.my_id(), peer_id); - let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.get(&peer_id) { + let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.entry(&peer_id).value() { let connected = if addresses.is_connected() { KadConnectionType::Connected } else { diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index c19aea37..49b33fad 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -18,8 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Key-value storage, with a refresh and a time-to-live system. -//! //! A k-buckets table allows one to store a value identified by keys, ordered by their distance //! to a reference key passed to the constructor. //! @@ -31,6 +29,7 @@ use arrayvec::ArrayVec; use bigint::U512; use libp2p_core::PeerId; use multihash::Multihash; +use std::num::NonZeroUsize; use std::slice::IterMut as SliceIterMut; use std::time::{Duration, Instant}; use std::vec::IntoIter as VecIntoIter; @@ -45,7 +44,7 @@ pub struct KBucketsTable { my_id: TPeerId, /// The actual tables that store peers or values. tables: Vec>, - /// The timeout when trying to reach the first node after which we consider it unresponsive. + /// The timeout when trying to reach the youngest node after which we consider it unresponsive. unresponsive_timeout: Duration, } @@ -53,21 +52,30 @@ pub struct KBucketsTable { #[derive(Debug, Clone)] struct KBucket { /// Nodes are always ordered from oldest to newest. The nodes we are connected to are always - /// all on top of the nodes we are not connected to. + /// all on top (ie. have higher indices) of the nodes we are not connected to. nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, /// Index in `nodes` over which all nodes are connected. Must always be <= to the length /// of `nodes`. first_connected_pos: usize, - /// Node received when the bucket was full. Will be added to the list if the first node doesn't - /// respond in time to our reach attempt. The second element is the time when the pending node - /// was added. If it is too old we drop the first node and add the pending node to the end of - /// the list. - pending_node: Option<(Node, Instant)>, + /// Node received when the bucket was full. Will be added to the list if the youngest node + /// doesn't respond in time to our reach attempt. + pending_node: Option>, +} - /// Last time this bucket was updated. - latest_update: Instant, +/// State of the pending node. +#[derive(Debug, Clone)] +struct PendingNode { + /// Node to insert. + node: Node, + + /// If true, we are connected to the pending node. + connected: bool, + + /// When the pending node will replace an existing node, provided that the youngest node + /// doesn't become responsive before. + replace: Instant, } /// A single node in a k-bucket. @@ -79,60 +87,40 @@ struct Node { value: TVal, } -impl KBucket { - /// Puts the kbucket into a coherent state. - /// If a node is pending and the timeout has expired, removes the first element of `nodes` - /// and puts the node back in `pending_node`. - fn flush(&mut self, timeout: Duration) { - if let Some((pending_node, instant)) = self.pending_node.take() { - if instant.elapsed() >= timeout { - let _ = self.nodes.remove(0); - self.nodes.push(pending_node); - } else { - self.pending_node = Some((pending_node, instant)); - } - } - } -} - /// Trait that must be implemented on types that can be used as an identifier in a k-bucket. -pub trait KBucketsPeerId: PartialEq + Clone { +/// +/// If `TOther` is not the same as `Self`, it represents an entry already in the k-buckets that +/// `Self` can compare against. +pub trait KBucketsPeerId: PartialEq { /// Computes the XOR of this value and another one. The lower the closer. fn distance_with(&self, other: &TOther) -> u32; /// Returns then number of bits that are necessary to store the distance between peer IDs. /// Used for pre-allocations. - /// - /// > **Note**: Returning 0 would lead to a panic. - fn max_distance() -> usize; + fn max_distance() -> NonZeroUsize; } impl KBucketsPeerId for PeerId { - #[inline] fn distance_with(&self, other: &Self) -> u32 { - Multihash::distance_with(self.as_ref(), other.as_ref()) + >::distance_with(self.as_ref(), other.as_ref()) } - #[inline] - fn max_distance() -> usize { + fn max_distance() -> NonZeroUsize { ::max_distance() } } -impl KBucketsPeerId for PeerId { - #[inline] - fn distance_with(&self, other: &Multihash) -> u32 { - Multihash::distance_with(self.as_ref(), other) +impl KBucketsPeerId for Multihash { + fn distance_with(&self, other: &PeerId) -> u32 { + >::distance_with(self, other.as_ref()) } - #[inline] - fn max_distance() -> usize { - ::max_distance() + fn max_distance() -> NonZeroUsize { + ::max_distance() } } impl KBucketsPeerId for Multihash { - #[inline] fn distance_with(&self, other: &Self) -> u32 { // Note that we don't compare the hash functions because there's no chance of collision // of the same value hashed with two different hash functions. @@ -142,273 +130,554 @@ impl KBucketsPeerId for Multihash { 512 - xor.leading_zeros() } - #[inline] - fn max_distance() -> usize { - 512 + fn max_distance() -> NonZeroUsize { + NonZeroUsize::new(512).expect("512 is not zero; QED") + } +} + +impl KBucketsPeerId for (A, B) +where + A: KBucketsPeerId + PartialEq, + B: KBucketsPeerId + PartialEq, +{ + fn distance_with(&self, other: &(A, B)) -> u32 { + A::distance_with(&self.0, &other.0) + B::distance_with(&self.1, &other.1) + } + + fn max_distance() -> NonZeroUsize { + let n = >::max_distance().get() + .saturating_add(>::max_distance().get()); + NonZeroUsize::new(n).expect("Saturating-add of two non-zeros can't be zero; QED") + } +} + +impl<'a, T> KBucketsPeerId for &'a T +where + T: KBucketsPeerId, +{ + fn distance_with(&self, other: &&'a T) -> u32 { + T::distance_with(*self, *other) + } + + fn max_distance() -> NonZeroUsize { + ::max_distance() } } impl KBucketsTable where - TPeerId: KBucketsPeerId, + TPeerId: KBucketsPeerId + Clone, { /// Builds a new routing table. pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self { KBucketsTable { my_id, - tables: (0..TPeerId::max_distance()) + tables: (0..TPeerId::max_distance().get()) .map(|_| KBucket { nodes: ArrayVec::new(), first_connected_pos: 0, pending_node: None, - latest_update: Instant::now(), }) .collect(), unresponsive_timeout, } } - // Returns the id of the bucket that should contain the peer with the given ID. - // - // Returns `None` if out of range, which happens if `id` is the same as the local peer id. - #[inline] + /// Returns the ID of the local node. + pub fn my_id(&self) -> &TPeerId { + &self.my_id + } + + /// Returns the id of the bucket that should contain the peer with the given ID. + /// + /// Returns `None` if out of range, which happens if `id` is the same as the local peer id. fn bucket_num(&self, id: &TPeerId) -> Option { (self.my_id.distance_with(id) as usize).checked_sub(1) } + /// Returns an object containing the state of the given entry. + pub fn entry<'a>(&'a mut self, peer_id: &'a TPeerId) -> Entry<'a, TPeerId, TVal> { + let bucket_num = if let Some(num) = self.bucket_num(peer_id) { + num + } else { + return Entry::SelfEntry; + }; + + // Update the pending node state. + // TODO: must be reported to the user somehow, in a non-annoying API + if let Some(pending) = self.tables[bucket_num].pending_node.take() { + if pending.replace < Instant::now() { + let table = &mut self.tables[bucket_num]; + let first_connected_pos = &mut table.first_connected_pos; + // If all the nodes in the bucket are connected, then there shouldn't be any + // pending node. + debug_assert!(*first_connected_pos >= 1); + table.nodes.remove(0); + if pending.connected { + *first_connected_pos -= 1; + table.nodes.insert(*first_connected_pos, pending.node); + } else { + table.nodes.insert(*first_connected_pos - 1, pending.node); + } + } else { + self.tables[bucket_num].pending_node = Some(pending); + } + } + + // Try to find the node in the bucket. + if let Some(pos) = self.tables[bucket_num].nodes.iter().position(|p| p.id == *peer_id) { + if pos >= self.tables[bucket_num].first_connected_pos { + Entry::InKbucketConnected(EntryInKbucketConn { + parent: self, + peer_id, + }) + + } else { + Entry::InKbucketDisconnected(EntryInKbucketDisc { + parent: self, + peer_id, + }) + } + + } else if self.tables[bucket_num].pending_node.as_ref().map(|p| p.node.id == *peer_id).unwrap_or(false) { + // Node is pending. + if self.tables[bucket_num].pending_node.as_ref().map(|p| p.connected).unwrap_or(false) { + Entry::InKbucketConnectedPending(EntryInKbucketConnPending { + parent: self, + peer_id, + }) + } else { + Entry::InKbucketDisconnectedPending(EntryInKbucketDiscPending { + parent: self, + peer_id, + }) + } + + } else { + Entry::NotInKbucket(EntryNotInKbucket { + parent: self, + peer_id, + }) + } + } + + /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. + pub fn entries_not_pending(&self) -> impl Iterator { + self.tables + .iter() + .flat_map(|table| table.nodes.iter()) + .map(|node| (&node.id, &node.value)) + } + /// Returns an iterator to all the buckets of this table. /// /// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes /// first. - #[inline] pub fn buckets(&mut self) -> BucketsIter<'_, TPeerId, TVal> { BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout) } - /// Returns the ID of the local node. - #[inline] - pub fn my_id(&self) -> &TPeerId { - &self.my_id - } - - /// Returns the value associated to a node, if any is present. + /// Finds the nodes closest to `id`, ordered by distance. /// - /// Does **not** include pending nodes. - pub fn get(&self, id: &TPeerId) -> Option<&TVal> { - let table = match self.bucket_num(&id) { - Some(n) => &self.tables[n], - None => return None, - }; + /// Pending nodes are ignored. + pub fn find_closest(&mut self, id: &impl KBucketsPeerId) -> VecIntoIter { + // TODO: optimize + let mut out = Vec::new(); + for table in self.tables.iter_mut() { + for node in table.nodes.iter() { + out.push(node.id.clone()); + } - for elem in &table.nodes { - if elem.id == *id { - return Some(&elem.value); + // TODO: this code that handles the pending_node should normally be shared with + // the one in `entry()`; however right now there's no mechanism to notify the + // user when a pending node has been inserted in the table, and thus we need to + // rework this pending node handling code anyway; when that is being done, we + // should rewrite this code properly + if let Some(ref pending) = table.pending_node { + if pending.replace <= Instant::now() && pending.connected { + out.pop(); + out.push(pending.node.id.clone()); + } } } + out.sort_by(|a, b| id.distance_with(a).cmp(&id.distance_with(b))); + out.into_iter() + } +} - None +/// Represents an entry or a potential entry in the k-buckets. +pub enum Entry<'a, TPeerId, TVal> { + /// Entry in a k-bucket that we're connected to. + InKbucketConnected(EntryInKbucketConn<'a, TPeerId, TVal>), + /// Entry pending waiting for a free slot to enter a k-bucket. We're connected to it. + InKbucketConnectedPending(EntryInKbucketConnPending<'a, TPeerId, TVal>), + /// Entry in a k-bucket but that we're not connected to. + InKbucketDisconnected(EntryInKbucketDisc<'a, TPeerId, TVal>), + /// Entry pending waiting for a free slot to enter a k-bucket. We're not connected to it. + InKbucketDisconnectedPending(EntryInKbucketDiscPending<'a, TPeerId, TVal>), + /// Entry is not present in any k-bucket. + NotInKbucket(EntryNotInKbucket<'a, TPeerId, TVal>), + /// Entry is the local peer ID. + SelfEntry, +} + +impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> +where + TPeerId: KBucketsPeerId + Clone, +{ + /// Returns the value associated to the entry in the bucket, including if the node is pending. + pub fn value(&mut self) -> Option<&mut TVal> { + match self { + Entry::InKbucketConnected(entry) => Some(entry.value()), + Entry::InKbucketConnectedPending(entry) => Some(entry.value()), + Entry::InKbucketDisconnected(entry) => Some(entry.value()), + Entry::InKbucketDisconnectedPending(entry) => Some(entry.value()), + Entry::NotInKbucket(_entry) => None, + Entry::SelfEntry => None, + } } - /// Returns the value associated to a node, if any is present. - /// - /// Does **not** include pending nodes. - pub fn get_mut(&mut self, id: &TPeerId) -> Option<&mut TVal> { - let table = match self.bucket_num(&id) { - Some(n) => &mut self.tables[n], - None => return None, + /// Returns the value associated to the entry in the bucket. + pub fn value_not_pending(&mut self) -> Option<&mut TVal> { + match self { + Entry::InKbucketConnected(entry) => Some(entry.value()), + Entry::InKbucketConnectedPending(_entry) => None, + Entry::InKbucketDisconnected(entry) => Some(entry.value()), + Entry::InKbucketDisconnectedPending(_entry) => None, + Entry::NotInKbucket(_entry) => None, + Entry::SelfEntry => None, + } + } +} + +/// Represents an entry in a k-bucket. +pub struct EntryInKbucketConn<'a, TPeerId, TVal> { + parent: &'a mut KBucketsTable, + peer_id: &'a TPeerId, +} + +impl<'a, TPeerId, TVal> EntryInKbucketConn<'a, TPeerId, TVal> +where + TPeerId: KBucketsPeerId + Clone, +{ + /// Returns the value associated to the entry in the bucket. + pub fn value(&mut self) -> &mut TVal { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketConn if we know of a bucket; QED"); + &mut self.parent.tables[num] }; - table.flush(self.unresponsive_timeout); - - for elem in &mut table.nodes { - if elem.id == *id { - return Some(&mut elem.value); - } - } - - None - } - - /// Returns the value associated to a node if any is present. Otherwise, tries to add the - /// node to the table in a disconnected state and return its value. Returns `None` if `id` is - /// the local peer, or if the table is full. - pub fn entry_mut(&mut self, id: &TPeerId) -> Option<&mut TVal> - where - TVal: Default, - { - if let Some((bucket, entry)) = self.entry_mut_inner(id) { - Some(&mut self.tables[bucket].nodes[entry].value) - } else { - None - } - } - - /// Apparently non-lexical lifetimes still aren't working properly in some situations, so we - /// delegate `entry_mut` to this method that returns an index within `self.tables` and the - /// node index within that table. - fn entry_mut_inner(&mut self, id: &TPeerId) -> Option<(usize, usize)> - where - TVal: Default, - { - let (bucket_num, table) = match self.bucket_num(&id) { - Some(n) => (n, &mut self.tables[n]), - None => return None, - }; - - table.flush(self.unresponsive_timeout); - - if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) { - return Some((bucket_num, pos)); - } - - if !table.nodes.is_full() { - table.nodes.insert(table.first_connected_pos, Node { - id: id.clone(), - value: Default::default(), - }); - table.first_connected_pos += 1; - table.latest_update = Instant::now(); - return Some((bucket_num, table.first_connected_pos - 1)); - } - - None - } - - /// Reports that we are connected to the given node. - /// - /// This inserts the node in the k-buckets, if possible. If it is already in a k-bucket, puts - /// it above the disconnected nodes. If it is not already in a k-bucket, then the value will - /// be built with the `Default` trait. - pub fn set_connected(&mut self, id: &TPeerId) -> Update<'_, TPeerId> - where - TVal: Default, - { - let table = match self.bucket_num(&id) { - Some(n) => &mut self.tables[n], - None => return Update::FailSelfUpdate, - }; - - table.flush(self.unresponsive_timeout); - - if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) { - // Node is already in the table; move it over `first_connected_pos` if necessary. - // We do a `saturating_sub(1)`, because if `first_connected_pos` is 0 then - // `pos < first_connected_pos` can never be true anyway. - if pos < table.first_connected_pos.saturating_sub(1) { - let elem = table.nodes.remove(pos); - table.first_connected_pos -= 1; - table.nodes.insert(table.first_connected_pos, elem); - } - table.latest_update = Instant::now(); - Update::Updated - - } else if !table.nodes.is_full() { - // Node is not in the table yet, but there's plenty of space for it. - table.nodes.insert(table.first_connected_pos, Node { - id: id.clone(), - value: Default::default(), - }); - table.latest_update = Instant::now(); - Update::Added - - } else if table.first_connected_pos > 0 && table.pending_node.is_none() { - // Node is not in the table yet, but there could be room for it if we drop the first - // element. However we first add the node to add to `pending_node` and try to reconnect - // to the oldest node. - let pending_node = Node { - id: id.clone(), - value: Default::default(), - }; - table.pending_node = Some((pending_node, Instant::now())); - Update::Pending(&table.nodes[0].id) - - } else { - debug_assert!(table.first_connected_pos == 0 || table.pending_node.is_some()); - Update::Discarded - } + let peer_id = self.peer_id; + &mut table.nodes.iter_mut() + .find(move |p| p.id == *peer_id) + .expect("We can only build a EntryInKbucketConn if we know that the peer is in its \ + bucket; QED") + .value } /// Reports that we are now disconnected from the given node. /// - /// This does *not* remove the node from the k-buckets, but moves it underneath the nodes we - /// are still connected to. - pub fn set_disconnected(&mut self, id: &TPeerId) { - let table = match self.bucket_num(&id) { - Some(n) => &mut self.tables[n], - None => return, + /// This moves the node down in its bucket. There are two possible outcomes: + /// + /// - Either we had a pending node which replaces the current node. `Replaced` is returned. + /// - Or we had no pending node, and the current node is kept. `Kept` is returned. + /// + pub fn set_disconnected(self) -> SetDisconnectedOutcome<'a, TPeerId, TVal> { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketConn if we know of a bucket; QED"); + &mut self.parent.tables[num] }; - table.flush(self.unresponsive_timeout); + let peer_id = self.peer_id; + let pos = table.nodes.iter().position(move |elem| elem.id == *peer_id) + .expect("we can only build a EntryInKbucketConn if the node is in its bucket; QED"); + debug_assert!(table.first_connected_pos <= pos); - let pos = match table.nodes.iter().position(|elem| elem.id == *id) { - Some(pos) => pos, - None => return, - }; + // We replace it with the pending node, if any. + if let Some(pending) = table.pending_node.take() { + if pending.connected { + let removed = table.nodes.remove(pos); + let ret = SetDisconnectedOutcome::Replaced { + replacement: pending.node.id.clone(), + old_val: removed.value, + }; + table.nodes.insert(table.first_connected_pos, pending.node); + return ret; + } else { + table.pending_node = Some(pending); + } + } - if pos > table.first_connected_pos { + // Move the node in the bucket. + if pos != table.first_connected_pos { let elem = table.nodes.remove(pos); table.nodes.insert(table.first_connected_pos, elem); - table.first_connected_pos += 1; - } else if pos == table.first_connected_pos { - table.first_connected_pos += 1; } - } + table.first_connected_pos += 1; - /// Finds the `num` nodes closest to `id`, ordered by distance. - pub fn find_closest(&mut self, id: &TOther) -> VecIntoIter - where - TPeerId: Clone + KBucketsPeerId, - { - // TODO: optimize - let mut out = Vec::new(); - for table in self.tables.iter_mut() { - table.flush(self.unresponsive_timeout); - if table.latest_update.elapsed() > self.unresponsive_timeout { - continue; // ignore bucket with expired nodes - } - for node in table.nodes.iter() { - out.push(node.id.clone()); - } - } - out.sort_by(|a, b| b.distance_with(id).cmp(&a.distance_with(id))); - out.into_iter() - } - - /// Same as `find_closest`, but includes the local peer as well. - pub fn find_closest_with_self(&mut self, id: &TOther) -> VecIntoIter - where - TPeerId: Clone + KBucketsPeerId, - { - // TODO: optimize - let mut intermediate: Vec<_> = self.find_closest(id).collect(); - if let Some(pos) = intermediate - .iter() - .position(|e| e.distance_with(id) >= self.my_id.distance_with(id)) - { - if intermediate[pos] != self.my_id { - intermediate.insert(pos, self.my_id.clone()); - } - } else { - intermediate.push(self.my_id.clone()); - } - intermediate.into_iter() + // And return a EntryInKbucketDisc. + SetDisconnectedOutcome::Kept(EntryInKbucketDisc { + parent: self.parent, + peer_id: self.peer_id, + }) } } -/// Return value of the `set_connected()` method. -#[derive(Debug)] +/// Outcome of calling `set_disconnected`. #[must_use] -pub enum Update<'a, TPeerId> { - /// The node has been added to the bucket. - Added, - /// The node was already in the bucket and has been updated. - Updated, - /// The node has been added as pending. We need to try connect to the node passed as parameter. - Pending(&'a TPeerId), - /// The node wasn't added at all because a node was already pending. - Discarded, - /// Tried to update the local peer ID. This is an invalid operation. - FailSelfUpdate, +pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> { + /// Node is kept in the bucket. + Kept(EntryInKbucketDisc<'a, TPeerId, TVal>), + /// Node is pushed out of the bucket. + Replaced { + /// Node that replaced the node. + // TODO: could be a EntryInKbucketConn, but we have borrow issues with the new peer id + replacement: TPeerId, + /// Value os the node that has been pushed out. + old_val: TVal, + }, +} + +/// Represents an entry waiting for a slot to be available in its k-bucket. +pub struct EntryInKbucketConnPending<'a, TPeerId, TVal> { + parent: &'a mut KBucketsTable, + peer_id: &'a TPeerId, +} + +impl<'a, TPeerId, TVal> EntryInKbucketConnPending<'a, TPeerId, TVal> +where + TPeerId: KBucketsPeerId + Clone, +{ + /// Returns the value associated to the entry in the bucket. + pub fn value(&mut self) -> &mut TVal { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id)); + &mut table.pending_node + .as_mut() + .expect("we can only build a EntryInKbucketConnPending if the node is pending; QED") + .node.value + } + + /// Reports that we are now disconnected from the given node. + pub fn set_disconnected(self) -> EntryInKbucketDiscPending<'a, TPeerId, TVal> { + { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + let mut pending = table.pending_node.as_mut() + .expect("we can only build a EntryInKbucketConnPending if there's a pending node; QED"); + debug_assert!(pending.connected); + pending.connected = false; + } + + EntryInKbucketDiscPending { + parent: self.parent, + peer_id: self.peer_id, + } + } +} + +/// Represents an entry waiting for a slot to be available in its k-bucket. +pub struct EntryInKbucketDiscPending<'a, TPeerId, TVal> { + parent: &'a mut KBucketsTable, + peer_id: &'a TPeerId, +} + +impl<'a, TPeerId, TVal> EntryInKbucketDiscPending<'a, TPeerId, TVal> +where + TPeerId: KBucketsPeerId + Clone, +{ + /// Returns the value associated to the entry in the bucket. + pub fn value(&mut self) -> &mut TVal { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id)); + &mut table.pending_node + .as_mut() + .expect("we can only build a EntryInKbucketDiscPending if the node is pending; QED") + .node.value + } + + /// Reports that we are now connected to the given node. + pub fn set_connected(self) -> EntryInKbucketConnPending<'a, TPeerId, TVal> { + { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + let mut pending = table.pending_node.as_mut() + .expect("we can only build a EntryInKbucketDiscPending if there's a pending node; QED"); + debug_assert!(!pending.connected); + pending.connected = true; + } + + EntryInKbucketConnPending { + parent: self.parent, + peer_id: self.peer_id, + } + } +} + +/// Represents an entry in a k-bucket. +pub struct EntryInKbucketDisc<'a, TPeerId, TVal> { + parent: &'a mut KBucketsTable, + peer_id: &'a TPeerId, +} + +impl<'a, TPeerId, TVal> EntryInKbucketDisc<'a, TPeerId, TVal> +where + TPeerId: KBucketsPeerId + Clone, +{ + /// Returns the value associated to the entry in the bucket. + pub fn value(&mut self) -> &mut TVal { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + let peer_id = self.peer_id; + &mut table.nodes.iter_mut() + .find(move |p| p.id == *peer_id) + .expect("We can only build a EntryInKbucketDisc if we know that the peer is in its \ + bucket; QED") + .value + } + + /// Sets the node as connected. This moves the entry in the bucket. + pub fn set_connected(self) -> EntryInKbucketConn<'a, TPeerId, TVal> { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + let pos = { + let peer_id = self.peer_id; + table.nodes.iter().position(move |p| p.id == *peer_id) + .expect("We can only build a EntryInKbucketDisc if we know that the peer is in \ + its bucket; QED") + }; + + // If we are the youngest node, we are now connected, which means that we have to drop the + // pending node. + // Note that it is theoretically possible that the replacement should have occurred between + // the moment when we build the `EntryInKbucketConn` and the moment when we call + // `set_connected`, but we don't take that into account. + if pos == 0 { + table.pending_node = None; + } + + debug_assert!(pos < table.first_connected_pos); + table.first_connected_pos -= 1; + if pos != table.first_connected_pos { + let entry = table.nodes.remove(pos); + table.nodes.insert(table.first_connected_pos, entry); + } + + // There shouldn't be a pending node if all slots are full of connected nodes. + debug_assert!(!(table.first_connected_pos == 0 && table.pending_node.is_some())); + + EntryInKbucketConn { + parent: self.parent, + peer_id: self.peer_id, + } + } +} + +/// Represents an entry not in any k-bucket. +pub struct EntryNotInKbucket<'a, TPeerId, TVal> { + parent: &'a mut KBucketsTable, + peer_id: &'a TPeerId, +} + +impl<'a, TPeerId, TVal> EntryNotInKbucket<'a, TPeerId, TVal> +where + TPeerId: KBucketsPeerId + Clone, +{ + /// Inserts the node as connected, if possible. + pub fn insert_connected(self, value: TVal) -> InsertOutcome { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryNotInKbucket if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + if table.nodes.is_full() { + if table.first_connected_pos == 0 || table.pending_node.is_some() { + InsertOutcome::Full + } else { + table.pending_node = Some(PendingNode { + node: Node { id: self.peer_id.clone(), value }, + replace: Instant::now() + self.parent.unresponsive_timeout, + connected: true, + }); + InsertOutcome::Pending { + to_ping: table.nodes[0].id.clone() + } + } + } else { + table.nodes.insert(table.first_connected_pos, Node { + id: self.peer_id.clone(), + value, + }); + InsertOutcome::Inserted + } + } + + /// Inserts the node as disconnected, if possible. + /// + /// > **Note**: This function will never return `Pending`. If the bucket is full, we simply + /// > do nothing. + pub fn insert_disconnected(self, value: TVal) -> InsertOutcome { + let table = { + let num = self.parent.bucket_num(&self.peer_id) + .expect("we can only build a EntryNotInKbucket if we know of a bucket; QED"); + &mut self.parent.tables[num] + }; + + if table.nodes.is_full() { + InsertOutcome::Full + } else { + table.nodes.insert(table.first_connected_pos, Node { + id: self.peer_id.clone(), + value, + }); + table.first_connected_pos += 1; + InsertOutcome::Inserted + } + } +} + +/// Outcome of calling `insert`. +#[must_use] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum InsertOutcome { + /// The entry has been successfully inserted. + Inserted, + /// The entry has been inserted as a pending node. + Pending { + /// We have to try connect to the returned node. + to_ping: TPeerId, + }, + /// The entry was not inserted because the bucket was full of connected nodes. + Full, } /// Iterator giving access to a bucket. @@ -417,15 +686,12 @@ pub struct BucketsIter<'a, TPeerId, TVal>(SliceIterMut<'a, KBucket Iterator for BucketsIter<'a, TPeerId, TVal> { type Item = Bucket<'a, TPeerId, TVal>; - #[inline] fn next(&mut self) -> Option { self.0.next().map(|bucket| { - bucket.flush(self.1); Bucket(bucket) }) } - #[inline] fn size_hint(&self) -> (usize, Option) { self.0.size_hint() } @@ -442,30 +708,23 @@ impl<'a, TPeerId, TVal> Bucket<'a, TPeerId, TVal> { /// > **Note**: Keep in mind that this operation can be racy. If `update()` is called on the /// > table while this function is running, the `update()` may or may not be taken /// > into account. - #[inline] pub fn num_entries(&self) -> usize { self.0.nodes.len() } /// Returns true if this bucket has a pending node. - #[inline] pub fn has_pending(&self) -> bool { - self.0.pending_node.is_some() - } - - /// Returns the time when any of the values in this bucket was last updated. - /// - /// If the bucket is empty, this returns the time when the whole table was created. - #[inline] - pub fn latest_update(&self) -> Instant { - self.0.latest_update + if let Some(ref node) = self.0.pending_node { + node.replace > Instant::now() + } else { + false + } } } #[cfg(test)] mod tests { - use rand::random; - use crate::kbucket::{KBucketsPeerId, KBucketsTable, Update, MAX_NODES_PER_BUCKET}; + use crate::kbucket::{Entry, InsertOutcome, KBucketsPeerId, KBucketsTable, MAX_NODES_PER_BUCKET}; use multihash::{Multihash, Hash}; use std::thread; use std::time::Duration; @@ -476,7 +735,14 @@ mod tests { let other_id = Multihash::random(Hash::SHA2256); let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5)); - table.entry_mut(&other_id); + if let Entry::NotInKbucket(entry) = table.entry(&other_id) { + match entry.insert_connected(()) { + InsertOutcome::Inserted => (), + _ => panic!() + } + } else { + panic!() + } let res = table.find_closest(&other_id).collect::>(); assert_eq!(res.len(), 1); @@ -488,46 +754,12 @@ mod tests { let my_id = Multihash::random(Hash::SHA2256); let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5)); - assert!(table.entry_mut(&my_id).is_none()); - match table.set_connected(&my_id) { - Update::FailSelfUpdate => (), + match table.entry(&my_id) { + Entry::SelfEntry => (), _ => panic!(), } } - #[test] - fn update_time_last_refresh() { - let my_id = Multihash::random(Hash::SHA2256); - - // Generate some other IDs varying by just one bit. - let other_ids = (0..random::() % 20) - .map(|_| { - let bit_num = random::() % 256; - let mut id = my_id.as_bytes().to_vec().clone(); - id[33 - (bit_num / 8)] ^= 1 << (bit_num % 8); - (Multihash::from_bytes(id).unwrap(), bit_num) - }) - .collect::>(); - - let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5)); - let before_update = table.buckets().map(|b| b.latest_update()).collect::>(); - - thread::sleep(Duration::from_secs(2)); - for &(ref id, _) in &other_ids { - table.entry_mut(&id); - } - - let after_update = table.buckets().map(|b| b.latest_update()).collect::>(); - - for (offset, (bef, aft)) in before_update.iter().zip(after_update.iter()).enumerate() { - if other_ids.iter().any(|&(_, bucket)| bucket == offset) { - assert_ne!(bef, aft); - } else { - assert_eq!(bef, aft); - } - } - } - #[test] fn full_kbucket() { let my_id = Multihash::random(Hash::SHA2256); @@ -548,11 +780,14 @@ mod tests { let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(1)); for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { - match table.set_connected(&id) { - Update::Added => (), - _ => panic!() + if let Entry::NotInKbucket(entry) = table.entry(&id) { + match entry.insert_disconnected(()) { + InsertOutcome::Inserted => (), + _ => panic!() + } + } else { + panic!() } - table.set_disconnected(&id); assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); } @@ -561,11 +796,13 @@ mod tests { MAX_NODES_PER_BUCKET ); assert!(!table.buckets().nth(255).unwrap().has_pending()); - match table.set_connected(&fill_ids.remove(0)) { - Update::Pending(to_ping) => { - assert_eq!(*to_ping, first_node); - }, - _ => panic!() + if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { + match entry.insert_connected(()) { + InsertOutcome::Pending { ref to_ping } if *to_ping == first_node => (), + _ => panic!() + } + } else { + panic!() } assert_eq!( @@ -573,18 +810,24 @@ mod tests { MAX_NODES_PER_BUCKET ); assert!(table.buckets().nth(255).unwrap().has_pending()); - match table.set_connected(&fill_ids.remove(0)) { - Update::Discarded => (), - _ => panic!() + if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { + match entry.insert_connected(()) { + InsertOutcome::Full => (), + _ => panic!() + } + } else { + panic!() } thread::sleep(Duration::from_secs(2)); assert!(!table.buckets().nth(255).unwrap().has_pending()); - match table.set_connected(&fill_ids.remove(0)) { - Update::Pending(to_ping) => { - assert_eq!(*to_ping, second_node); - }, - _ => panic!() + if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) { + match entry.insert_connected(()) { + InsertOutcome::Pending { ref to_ping } if *to_ping == second_node => (), + _ => panic!() + } + } else { + panic!() } }