Rewrite the Kademlia k-buckets to be more explicit (#996)

* Some k-buckets improvements

* Apply suggestions from code review

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>

* Use NonZeroUsize for the distance

* Update TODO comment
This commit is contained in:
Pierre Krieger
2019-03-20 17:09:48 +01:00
committed by GitHub
parent 96e559b503
commit 7be97b4f93
2 changed files with 747 additions and 385 deletions

View File

@ -19,8 +19,8 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::addresses::Addresses; use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kbucket::{KBucketsTable, KBucketsPeerId, Update}; use crate::kbucket::{self, KBucketsTable, KBucketsPeerId};
use crate::protocol::{KadConnectionType, KadPeer}; use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use fnv::{FnvHashMap, FnvHashSet}; use fnv::{FnvHashMap, FnvHashSet};
@ -30,7 +30,7 @@ use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use multihash::Multihash; use multihash::Multihash;
use rand; use rand;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{cmp::Ordering, error, marker::PhantomData, time::Duration, time::Instant}; use std::{cmp::Ordering, error, marker::PhantomData, num::NonZeroUsize, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Interval; use tokio_timer::Interval;
@ -136,7 +136,7 @@ impl KBucketsPeerId<PeerId> for QueryInfo {
self.as_ref().distance_with(other) self.as_ref().distance_with(other)
} }
fn max_distance() -> usize { fn max_distance() -> NonZeroUsize {
<PeerId as KBucketsPeerId>::max_distance() <PeerId as KBucketsPeerId>::max_distance()
} }
} }
@ -192,31 +192,62 @@ impl<TSubstream> Kademlia<TSubstream> {
/// Creates a `Kademlia`. /// Creates a `Kademlia`.
/// ///
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into /// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
/// the DHT. /// the DHT and fill our buckets.
#[inline] #[inline]
pub fn without_init(local_peer_id: PeerId) -> Self { pub fn without_init(local_peer_id: PeerId) -> Self {
Self::new_inner(local_peer_id, false) Self::new_inner(local_peer_id, false)
} }
/// Adds a known address for the given `PeerId`.
#[deprecated(note = "Use add_connected_address or add_not_connected_address instead")]
pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
self.add_connected_address(peer_id, address)
}
/// Adds a known address for the given `PeerId`. We are connected to this address. /// Adds a known address for the given `PeerId`. We are connected to this address.
// TODO: report if the address was inserted? also, semantics unclear
pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
if let Some(list) = self.kbuckets.entry_mut(peer_id) { self.add_address(peer_id, address, true)
list.insert_connected(address);
}
} }
/// Adds a known address for the given `PeerId`. We are not connected or don't know whether we /// Adds a known address for the given `PeerId`. We are not connected or don't know whether we
/// are connected to this address. /// are connected to this address.
// TODO: report if the address was inserted? also, semantics unclear
pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
if let Some(list) = self.kbuckets.entry_mut(peer_id) { self.add_address(peer_id, address, false)
list.insert_not_connected(address); }
}
/// Underlying implementation for `add_connected_address` and `add_not_connected_address`.
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, connected: bool) {
match self.kbuckets.entry(peer_id) {
kbucket::Entry::InKbucketConnected(mut entry) => if connected {
entry.value().insert_connected(address)
} else {
entry.value().insert_not_connected(address)
},
kbucket::Entry::InKbucketConnectedPending(mut entry) => if connected {
entry.value().insert_connected(address)
} else {
entry.value().insert_not_connected(address)
},
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert_not_connected(address),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert_not_connected(address),
kbucket::Entry::NotInKbucket(entry) => {
let mut addresses = Addresses::new();
addresses.insert_not_connected(address);
match entry.insert_disconnected(addresses) {
kbucket::InsertOutcome::Inserted => {
let event = KademliaOut::KBucketAdded {
peer_id: peer_id.clone(),
replaced: None,
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.clone(),
})
},
}
return;
},
kbucket::Entry::SelfEntry => return,
};
} }
/// Inner implementation of the constructors. /// Inner implementation of the constructors.
@ -241,16 +272,7 @@ impl<TSubstream> Kademlia<TSubstream> {
}; };
if initialize { if initialize {
// As part of the initialization process, we start one `FIND_NODE` for each bit of the behaviour.initialize();
// possible range of peer IDs.
for n in 0..256 {
let target = match gen_random_id(behaviour.kbuckets.my_id(), n) {
Ok(p) => p,
Err(()) => continue,
};
behaviour.start_query(QueryInfoInner::Initialization { target });
}
} }
behaviour behaviour
@ -258,17 +280,34 @@ impl<TSubstream> Kademlia<TSubstream> {
} }
impl<TSubstream> Kademlia<TSubstream> { impl<TSubstream> Kademlia<TSubstream> {
/// Performs the Kademlia initialization process.
///
/// If you called `new` to create the `Kademlia`, then this has been started. Calling this
/// method manually is useful in order to re-perform the initialization later.
///
/// Starts one query per bucket with the intention of connecting to nodes along the way and
/// fill our own buckets. This also adds the effect of adding our local node to other nodes'
/// buckets.
pub fn initialize(&mut self) {
for n in 0..256 { // TODO: 256 should be grabbed from the kbuckets module
let target = match gen_random_id(self.kbuckets.my_id(), n) {
Ok(p) => p,
Err(()) => continue,
};
self.start_query(QueryInfoInner::Initialization { target });
}
}
/// Starts an iterative `FIND_NODE` request. /// Starts an iterative `FIND_NODE` request.
/// ///
/// This will eventually produce an event containing the nodes of the DHT closest to the /// This will eventually produce an event containing the nodes of the DHT closest to the
/// requested `PeerId`. /// requested `PeerId`.
#[inline]
pub fn find_node(&mut self, peer_id: PeerId) { pub fn find_node(&mut self, peer_id: PeerId) {
self.start_query(QueryInfoInner::FindPeer(peer_id)); self.start_query(QueryInfoInner::FindPeer(peer_id));
} }
/// Starts an iterative `GET_PROVIDERS` request. /// Starts an iterative `GET_PROVIDERS` request.
#[inline]
pub fn get_providers(&mut self, target: Multihash) { pub fn get_providers(&mut self, target: Multihash) {
self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() }); self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() });
} }
@ -322,7 +361,7 @@ impl<TSubstream> Kademlia<TSubstream> {
}; };
let known_closest_peers = self.kbuckets let known_closest_peers = self.kbuckets
.find_closest::<Multihash>(target.as_ref()) .find_closest(target.as_ref())
.take(self.num_results); .take(self.num_results);
self.active_queries.insert( self.active_queries.insert(
@ -353,7 +392,8 @@ where
// We should order addresses from decreasing likelyhood of connectivity, so start with // We should order addresses from decreasing likelyhood of connectivity, so start with
// the addresses of that peer in the k-buckets. // the addresses of that peer in the k-buckets.
let mut out_list = self.kbuckets let mut out_list = self.kbuckets
.get(peer_id) .entry(peer_id)
.value_not_pending()
.map(|l| l.iter().cloned().collect::<Vec<_>>()) .map(|l| l.iter().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new); .unwrap_or_else(Vec::new);
@ -378,16 +418,58 @@ where
}); });
} }
if let Update::Pending(to_ping) = self.kbuckets.set_connected(&id) { let address = match endpoint {
self.queued_events.push(NetworkBehaviourAction::DialPeer { ConnectedPoint::Dialer { address } => Some(address),
peer_id: to_ping.clone(), ConnectedPoint::Listener { .. } => None,
}) };
}
if let ConnectedPoint::Dialer { address } = endpoint { match self.kbuckets.entry(&id) {
if let Some(list) = self.kbuckets.entry_mut(&id) { kbucket::Entry::InKbucketConnected(_) |
list.insert_connected(address); kbucket::Entry::InKbucketConnectedPending(_) => {
} unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::InKbucketDisconnected(mut entry) => {
debug_assert!(!entry.value().is_connected());
if let Some(address) = address {
entry.value().insert_connected(address);
}
entry.set_connected();
},
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => {
debug_assert!(!entry.value().is_connected());
if let Some(address) = address {
entry.value().insert_connected(address);
}
entry.set_connected();
},
kbucket::Entry::NotInKbucket(entry) => {
let mut addresses = Addresses::new();
if let Some(address) = address {
addresses.insert_connected(address);
}
match entry.insert_connected(addresses) {
kbucket::InsertOutcome::Inserted => {
let event = KademliaOut::KBucketAdded {
peer_id: id.clone(),
replaced: None,
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.clone(),
})
},
}
},
kbucket::Entry::SelfEntry => {
unreachable!("Guaranteed to never receive disconnected even for self; QED")
},
} }
self.connected_peers.insert(id); self.connected_peers.insert(id);
@ -395,7 +477,7 @@ where
fn inject_dial_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) { fn inject_dial_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) {
if let Some(peer_id) = peer_id { if let Some(peer_id) = peer_id {
if let Some(list) = self.kbuckets.get_mut(peer_id) { if let Some(list) = self.kbuckets.entry(peer_id).value() {
// TODO: don't remove the address if the error is that we are already connected // TODO: don't remove the address if the error is that we are already connected
// to this peer // to this peer
list.remove_addr(addr); list.remove_addr(addr);
@ -412,13 +494,38 @@ where
} }
if let ConnectedPoint::Dialer { address } = old_endpoint { if let ConnectedPoint::Dialer { address } = old_endpoint {
if let Some(list) = self.kbuckets.get_mut(id) { match self.kbuckets.entry(id) {
debug_assert!(list.is_connected()); kbucket::Entry::InKbucketConnected(mut entry) => {
list.set_disconnected(&address); debug_assert!(entry.value().is_connected());
entry.value().set_disconnected(&address);
match entry.set_disconnected() {
kbucket::SetDisconnectedOutcome::Kept(_) => {},
kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => {
let event = KademliaOut::KBucketAdded {
peer_id: replacement,
replaced: Some(id.clone()),
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
}
},
kbucket::Entry::InKbucketConnectedPending(mut entry) => {
debug_assert!(entry.value().is_connected());
entry.value().set_disconnected(&address);
entry.set_disconnected();
},
kbucket::Entry::InKbucketDisconnected(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::InKbucketDisconnectedPending(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::NotInKbucket(_) => (),
kbucket::Entry::SelfEntry => {
unreachable!("Guaranteed to never receive disconnected even for self; QED")
},
} }
} }
self.kbuckets.set_disconnected(&id);
} }
fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
@ -432,14 +539,12 @@ where
} }
} }
if let ConnectedPoint::Dialer { address } = old_endpoint { if let Some(list) = self.kbuckets.entry(&peer_id).value() {
if let Some(list) = self.kbuckets.get_mut(&peer_id) { if let ConnectedPoint::Dialer { address } = old_endpoint {
list.set_disconnected(&address); list.set_disconnected(&address);
} }
}
if let ConnectedPoint::Dialer { address } = new_endpoint { if let ConnectedPoint::Dialer { address } = new_endpoint {
if let Some(list) = self.kbuckets.entry_mut(&peer_id) {
list.insert_connected(address); list.insert_connected(address);
} }
} }
@ -451,7 +556,7 @@ where
let closer_peers = self.kbuckets let closer_peers = self.kbuckets
.find_closest(&key) .find_closest(&key)
.take(self.num_results) .take(self.num_results)
.map(|peer_id| build_kad_peer(peer_id, &self.kbuckets)) .map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets))
.collect(); .collect();
self.queued_events.push(NetworkBehaviourAction::SendEvent { self.queued_events.push(NetworkBehaviourAction::SendEvent {
@ -487,15 +592,18 @@ where
let closer_peers = self.kbuckets let closer_peers = self.kbuckets
.find_closest(&key) .find_closest(&key)
.take(self.num_results) .take(self.num_results)
.map(|peer_id| build_kad_peer(peer_id, &self.kbuckets)) .map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets))
.collect(); .collect();
let provider_peers = self.values_providers let provider_peers = {
.get(&key) let kbuckets = &mut self.kbuckets;
.into_iter() self.values_providers
.flat_map(|peers| peers) .get(&key)
.map(|peer_id| build_kad_peer(peer_id.clone(), &self.kbuckets)) .into_iter()
.collect(); .flat_map(|peers| peers)
.map(move |peer_id| build_kad_peer(peer_id.clone(), kbuckets))
.collect()
};
self.queued_events.push(NetworkBehaviourAction::SendEvent { self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source, peer_id: source,
@ -661,7 +769,7 @@ where
peer_id: closest, peer_id: closest,
event: KademliaHandlerIn::AddProvider { event: KademliaHandlerIn::AddProvider {
key: target.clone(), key: target.clone(),
provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &self.kbuckets), provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &mut self.kbuckets),
}, },
}; };
@ -680,6 +788,9 @@ where
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum KademliaOut { pub enum KademliaOut {
/// We have discovered a node. /// We have discovered a node.
///
/// > **Note**: The Kademlia behaviour doesn't store the addresses of this node, and therefore
/// > attempting to connect to this node may or may not work.
Discovered { Discovered {
/// Id of the node that was discovered. /// Id of the node that was discovered.
peer_id: PeerId, peer_id: PeerId,
@ -689,6 +800,14 @@ pub enum KademliaOut {
ty: KadConnectionType, ty: KadConnectionType,
}, },
/// A node has been added to a k-bucket.
KBucketAdded {
/// Id of the node that was added.
peer_id: PeerId,
/// If `Some`, this addition replaced the value that is inside the option.
replaced: Option<PeerId>,
},
/// Result of a `FIND_NODE` iterative query. /// Result of a `FIND_NODE` iterative query.
FindNodeResult { FindNodeResult {
/// The key that we looked for in the query. /// The key that we looked for in the query.
@ -747,11 +866,11 @@ fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
/// > **Note**: This is just a convenience function that doesn't do anything note-worthy. /// > **Note**: This is just a convenience function that doesn't do anything note-worthy.
fn build_kad_peer( fn build_kad_peer(
peer_id: PeerId, peer_id: PeerId,
kbuckets: &KBucketsTable<PeerId, Addresses> kbuckets: &mut KBucketsTable<PeerId, Addresses>
) -> KadPeer { ) -> KadPeer {
debug_assert_ne!(*kbuckets.my_id(), peer_id); debug_assert_ne!(*kbuckets.my_id(), peer_id);
let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.get(&peer_id) { let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.entry(&peer_id).value() {
let connected = if addresses.is_connected() { let connected = if addresses.is_connected() {
KadConnectionType::Connected KadConnectionType::Connected
} else { } else {

File diff suppressed because it is too large Load Diff