[libp2p-kad] More control & insight for k-buckets. (#1628)

* More control & insight for k-buckets.

1) More control: It is now possible to disable automatic
insertions of peers into the routing table via a new
`KademliaBucketInserts` configuration option. The
default is `OnConnected`, but it can be set to `Manual`,
in which case `add_address` must be called explicitly.
In order to communicate all situations in which a user
of `Kademlia` may want to manually update the routing
table, two new events are introduced:

  * `KademliaEvent::RoutablePeer`: When a connection to
    a peer with a known listen address is established
    which may be added to the routing table. This is
    also emitted when automatic inserts are allowed but
    the corresponding k-bucket is full.
  * `KademliaEvent::PendingRoutablePeer`: When a connection
    to a peer with a known listen address is established
    which is pending insertion into the routing table
    (but may not make it). This is only emitted when
    `OnConnected` (i.e. automatic inserts) are used.

These complement the existing `UnroutablePeer` and
`RoutingUpdated` events. It is now also possible to
explicitly remove peers and addresses from the
routing table.

2) More insight: `Kademlia::kbuckets` now gives an
iterator over `KBucketRef`s and `Kademlia::bucket`
a particular `KBucketRef`. A `KBucketRef` in turn
allows iteration over its entries. In this way,
the full contents of the routing table can be
inspected, e.g. in order to decide which peer(s)
to remove.

* Update protocols/kad/src/behaviour.rs

* Update protocols/kad/src/behaviour.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Update CHANGELOG.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Roman Borschel
2020-06-29 16:44:44 +02:00
committed by GitHub
parent b61c3f9d04
commit 7270ed8721
6 changed files with 360 additions and 69 deletions

View File

@ -1,3 +1,15 @@
# 0.21.0 [????-??-??]
- More control and insight for k-buckets
([PR 1628](https://github.com/libp2p/rust-libp2p/pull/1628)).
In particular, `Kademlia::kbuckets_entries` has been removed and
replaced by `Kademlia::kbuckets`/`Kademlia::kbucket` which provide
more information than just the peer IDs. Furthermore `Kademlia::add_address`
now returns a result and two new events, `KademliaEvent::RoutablePeer`
and `KademliaEvent::PendingRoutablePeer` are introduced (but are not
required to be acted upon in order to retain existing behaviour).
For more details, see the PR description.
# 0.20.1 [2020-06-23]
Maintenance release ([PR 1623](https://github.com/libp2p/rust-libp2p/pull/1623)).

View File

@ -52,11 +52,15 @@ use wasm_timer::Instant;
pub use crate::query::QueryStats;
/// Network behaviour that handles Kademlia.
/// `Kademlia` is a `NetworkBehaviour` that implements the libp2p
/// Kademlia protocol.
pub struct Kademlia<TStore> {
/// The Kademlia routing table.
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
/// The k-bucket insertion strategy.
kbucket_inserts: KademliaBucketInserts,
/// Configuration of the wire protocol.
protocol_config: KademliaProtocolConfig,
@ -92,6 +96,30 @@ pub struct Kademlia<TStore> {
store: TStore,
}
/// The configurable strategies for the insertion of peers
/// and their addresses into the k-buckets of the Kademlia
/// routing table.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum KademliaBucketInserts {
/// Whenever a connection to a peer is established as a
/// result of a dialing attempt and that peer is not yet
/// in the routing table, it is inserted as long as there
/// is a free slot in the corresponding k-bucket. If the
/// k-bucket is full but still has a free pending slot,
/// it may be inserted into the routing table at a later time if an unresponsive
/// disconnected peer is evicted from the bucket.
OnConnected,
/// New peers and addresses are only added to the routing table via
/// explicit calls to [`Kademlia::add_address`].
///
/// > **Note**: Even though peers can only get into the
/// > routing table as a result of [`Kademlia::add_address`],
/// > routing table entries are still updated as peers
/// > connect and disconnect (i.e. the order of the entries
/// > as well as the network addresses).
Manual,
}
/// The configuration for the `Kademlia` behaviour.
///
/// The configuration is consumed by [`Kademlia::new`].
@ -106,6 +134,7 @@ pub struct KademliaConfig {
provider_record_ttl: Option<Duration>,
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
kbucket_inserts: KademliaBucketInserts,
}
impl Default for KademliaConfig {
@ -120,6 +149,7 @@ impl Default for KademliaConfig {
provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
kbucket_inserts: KademliaBucketInserts::OnConnected,
}
}
}
@ -275,6 +305,12 @@ impl KademliaConfig {
self.protocol_config.set_max_packet_size(size);
self
}
/// Sets the k-bucket insertion strategy for the Kademlia routing table.
pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self {
self.kbucket_inserts = inserts;
self
}
}
impl<TStore> Kademlia<TStore>
@ -312,6 +348,7 @@ where
Kademlia {
store,
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
kbucket_inserts: config.kbucket_inserts,
protocol_config: config.protocol_config,
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
@ -381,7 +418,7 @@ where
///
/// If the routing table has been updated as a result of this operation,
/// a [`KademliaEvent::RoutingUpdated`] event is emitted.
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
@ -394,9 +431,11 @@ where
}
))
}
RoutingUpdate::Success
}
kbucket::Entry::Pending(mut entry, _) => {
entry.value().insert(address);
RoutingUpdate::Pending
}
kbucket::Entry::Absent(entry) => {
let addresses = Addresses::new(address);
@ -415,26 +454,97 @@ where
old_peer: None,
}
));
RoutingUpdate::Success
},
kbucket::InsertResult::Full => {
debug!("Bucket full. Peer not added to routing table: {}", peer)
debug!("Bucket full. Peer not added to routing table: {}", peer);
RoutingUpdate::Failed
},
kbucket::InsertResult::Pending { disconnected } => {
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected
})
});
RoutingUpdate::Pending
},
}
},
kbucket::Entry::SelfEntry => {},
kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
}
}
/// Returns an iterator over all peer IDs of nodes currently contained in a bucket
/// of the Kademlia routing table.
pub fn kbuckets_entries(&mut self) -> impl Iterator<Item = &PeerId> {
self.kbuckets.iter().map(|entry| entry.node.key.preimage())
/// Removes an address of a peer from the routing table.
///
/// If the given address is the last address of the peer in the
/// routing table, the peer is removed from the routing table
/// and `Some` is returned with a view of the removed entry.
/// The same applies if the peer is currently pending insertion
/// into the routing table.
///
/// If the given peer or address is not in the routing table,
/// this is a no-op.
pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr)
-> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
{
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
if entry.value().remove(address).is_err() {
Some(entry.remove()) // it is the last address, thus remove the peer.
} else {
None
}
}
kbucket::Entry::Pending(mut entry, _) => {
if entry.value().remove(address).is_err() {
Some(entry.remove()) // it is the last address, thus remove the peer.
} else {
None
}
}
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
None
}
}
}
/// Removes a peer from the routing table.
///
/// Returns `None` if the peer was not in the routing table,
/// not even pending insertion.
pub fn remove_peer(&mut self, peer: &PeerId)
-> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
{
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(entry, _) => {
Some(entry.remove())
}
kbucket::Entry::Pending(entry, _) => {
Some(entry.remove())
}
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
None
}
}
}
/// Returns an iterator over all non-empty buckets in the routing table.
pub fn kbuckets(&mut self)
-> impl Iterator<Item = kbucket::KBucketRef<kbucket::Key<PeerId>, Addresses>>
{
self.kbuckets.iter().filter(|b| !b.is_empty())
}
/// Returns the k-bucket for the distance to the given key.
///
/// Returns `None` if the given key refers to the local key.
pub fn kbucket<K>(&mut self, key: K)
-> Option<kbucket::KBucketRef<kbucket::Key<PeerId>, Addresses>>
where
K: Borrow<[u8]> + Clone
{
self.kbuckets.bucket(&kbucket::Key::new(key))
}
/// Initiates an iterative query for the closest peers to the given key.
@ -723,7 +833,7 @@ where
self.queries.add_iter_closest(target.clone(), peers, inner);
}
/// Updates the connection status of a peer in the Kademlia routing table.
/// Updates the routing table with a new connection status and address of a peer.
fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
@ -755,9 +865,22 @@ where
kbucket::Entry::Absent(entry) => {
// Only connected nodes with a known address are newly inserted.
if new_status == NodeStatus::Connected {
if let Some(address) = address {
let addresses = Addresses::new(address);
if new_status != NodeStatus::Connected {
return
}
match (address, self.kbucket_inserts) {
(None, _) => {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer }
));
}
(Some(a), KademliaBucketInserts::Manual) => {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutablePeer { peer, address: a }
));
}
(Some(a), KademliaBucketInserts::OnConnected) => {
let addresses = Addresses::new(a);
match entry.insert(addresses.clone(), new_status) {
kbucket::InsertResult::Inserted => {
let event = KademliaEvent::RoutingUpdated {
@ -769,20 +892,24 @@ where
NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertResult::Full => {
debug!("Bucket full. Peer not added to routing table: {}", peer)
debug!("Bucket full. Peer not added to routing table: {}", peer);
let address = addresses.first().clone();
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutablePeer { peer, address }
));
},
kbucket::InsertResult::Pending { disconnected } => {
debug_assert!(!self.connected_peers.contains(disconnected.preimage()));
let address = addresses.first().clone();
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::PendingRoutablePeer { peer, address }
));
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected
})
},
}
} else {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer }
));
}
}
},
@ -806,8 +933,8 @@ where
// a bucket refresh should be performed for every bucket farther away than
// the first non-empty bucket (which are most likely no more than the last
// few, i.e. farthest, buckets).
self.kbuckets.buckets()
.skip_while(|b| b.num_entries() == 0)
self.kbuckets.iter()
.skip_while(|b| b.is_empty())
.skip(1) // Skip the bucket with the closest neighbour.
.map(|b| {
// Try to find a key that falls into the bucket. While such keys can
@ -1770,10 +1897,42 @@ pub enum KademliaEvent {
/// A peer has connected for whom no listen address is known.
///
/// If the peer is to be added to the local node's routing table, a known
/// If the peer is to be added to the routing table, a known
/// listen address for the peer must be provided via [`Kademlia::add_address`].
UnroutablePeer {
peer: PeerId
},
/// A connection to a peer has been established for whom a listen address
/// is known but the peer has not been added to the routing table either
/// because [`KademliaBucketInserts::Manual`] is configured or because
/// the corresponding bucket is full.
///
/// If the peer is to be included in the routing table, it must
/// must be explicitly added via [`Kademlia::add_address`], possibly after
/// removing another peer.
///
/// See [`Kademlia::kbucket`] for insight into the contents of
/// the k-bucket of `peer`.
RoutablePeer {
peer: PeerId,
address: Multiaddr,
},
/// A connection to a peer has been established for whom a listen address
/// is known but the peer is only pending insertion into the routing table
/// if the least-recently disconnected peer is unresponsive, i.e. the peer
/// may not make it into the routing table.
///
/// If the peer is to be unconditionally included in the routing table,
/// it should be explicitly added via [`Kademlia::add_address`] after
/// removing another peer.
///
/// See [`Kademlia::kbucket`] for insight into the contents of
/// the k-bucket of `peer`.
PendingRoutablePeer {
peer: PeerId,
address: Multiaddr,
}
}
@ -2294,3 +2453,22 @@ impl fmt::Display for NoKnownPeers {
}
impl std::error::Error for NoKnownPeers {}
/// The possible outcomes of [`Kademlia::add_address`].
pub enum RoutingUpdate {
/// The given peer and address has been added to the routing
/// table.
Success,
/// The peer and address is pending insertion into
/// the routing table, if a disconnected peer fails
/// to respond. If the given peer and address ends up
/// in the routing table, [`KademliaEvent::RoutingUpdated`]
/// is eventually emitted.
Pending,
/// The routing table update failed, either because the
/// corresponding bucket for the peer is full and the
/// pending slot(s) are occupied, or because the given
/// peer ID is deemed invalid (e.g. refers to the local
/// peer ID).
Failed,
}

View File

@ -196,9 +196,12 @@ fn bootstrap() {
}
first = false;
if ok.num_remaining == 0 {
let known = swarm.kbuckets.iter()
.map(|e| e.node.key.preimage().clone())
.collect::<HashSet<_>>();
let mut known = HashSet::new();
for b in swarm.kbuckets.iter() {
for e in b.iter() {
known.insert(e.node.key.preimage().clone());
}
}
assert_eq!(expected_known, known);
return Poll::Ready(())
}
@ -1052,3 +1055,47 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
record: record_trudy,
}));
}
/// Tests that peers are not automatically inserted into
/// the routing table with `KademliaBucketInserts::Manual`.
#[test]
fn manual_bucket_inserts() {
let mut cfg = KademliaConfig::default();
cfg.set_kbucket_inserts(KademliaBucketInserts::Manual);
// 1 -> 2 -> [3 -> ...]
let mut swarms = build_connected_nodes_with_config(3, 1, cfg);
// The peers and their addresses for which we expect `RoutablePeer` events.
let mut expected = swarms.iter().skip(2)
.map(|(a, s)| (a.clone(), Swarm::local_peer_id(s).clone()))
.collect::<HashMap<_,_>>();
// We collect the peers for which a `RoutablePeer` event
// was received in here to check at the end of the test
// that none of them was inserted into a bucket.
let mut routable = Vec::new();
// Start an iterative query from the first peer.
swarms[0].1.get_closest_peers(PeerId::random());
block_on(poll_fn(move |ctx| {
for (_, swarm) in swarms.iter_mut() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::RoutablePeer {
peer, address
})) => {
assert_eq!(peer, expected.remove(&address).expect("Unexpected address"));
routable.push(peer);
if expected.is_empty() {
for peer in routable.iter() {
let bucket = swarm.kbucket(peer.clone()).unwrap();
assert!(bucket.iter().all(|e| e.node.key.preimage() != peer));
}
return Poll::Ready(())
}
}
Poll::Ready(..) => {},
Poll::Pending => break
}
}
}
Poll::Pending
}));
}

View File

@ -170,31 +170,11 @@ where
}
}
/// Returns an iterator over all the entries in the routing table.
pub fn iter<'a>(&'a mut self) -> impl Iterator<Item = EntryRefView<'a, TKey, TVal>> {
let applied_pending = &mut self.applied_pending;
self.buckets.iter_mut().flat_map(move |table| {
if let Some(applied) = table.apply_pending() {
applied_pending.push_back(applied)
}
let table = &*table;
table.iter().map(move |(n, status)| {
EntryRefView {
node: NodeRefView {
key: &n.key,
value: &n.value
},
status
}
})
})
}
/// Returns a by-reference iterator over all buckets.
/// Returns an iterator over all buckets.
///
/// The buckets are ordered by proximity to the `local_key`, i.e. the first
/// bucket is the closest bucket (containing at most one key).
pub fn buckets<'a>(&'a mut self) -> impl Iterator<Item = KBucketRef<'a, TKey, TVal>> + 'a {
pub fn iter<'a>(&'a mut self) -> impl Iterator<Item = KBucketRef<'a, TKey, TVal>> + 'a {
let applied_pending = &mut self.applied_pending;
self.buckets.iter_mut().enumerate().map(move |(i, b)| {
if let Some(applied) = b.apply_pending() {
@ -207,6 +187,25 @@ where
})
}
/// Returns the bucket for the distance to the given key.
///
/// Returns `None` if the given key refers to the local key.
pub fn bucket<K>(&mut self, key: &K) -> Option<KBucketRef<'_, TKey, TVal>>
where
K: AsRef<KeyBytes>,
{
let d = self.local_key.as_ref().distance(key);
if let Some(index) = BucketIndex::new(&d) {
let bucket = &mut self.buckets[index.0];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
Some(KBucketRef { bucket, index })
} else {
None
}
}
/// Consumes the next applied pending entry, if any.
///
/// When an entry is attempted to be inserted and the respective bucket is full,
@ -437,17 +436,22 @@ where
}
}
/// A reference to a bucket in a `KBucketsTable`.
pub struct KBucketRef<'a, TPeerId, TVal> {
/// A reference to a bucket in a [`KBucketsTable`].
pub struct KBucketRef<'a, TKey, TVal> {
index: BucketIndex,
bucket: &'a mut KBucket<TPeerId, TVal>
bucket: &'a mut KBucket<TKey, TVal>
}
impl<TKey, TVal> KBucketRef<'_, TKey, TVal>
impl<'a, TKey, TVal> KBucketRef<'a, TKey, TVal>
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone
{
/// Checks whether the bucket is empty.
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
}
/// Returns the number of entries in the bucket.
pub fn num_entries(&self) -> usize {
self.bucket.num_entries()
@ -472,6 +476,19 @@ where
pub fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
self.index.rand_distance(rng)
}
/// Returns an iterator over the entries in the bucket.
pub fn iter(&'a self) -> impl Iterator<Item = EntryRefView<'a, TKey, TVal>> {
self.bucket.iter().map(move |(n, status)| {
EntryRefView {
node: NodeRefView {
key: &n.key,
value: &n.value
},
status
}
})
}
}
#[cfg(test)]

View File

@ -74,6 +74,10 @@ impl<TKey, TVal> PendingNode<TKey, TVal> {
pub fn set_ready_at(&mut self, t: Instant) {
self.replace = t;
}
pub fn into_node(self) -> Node<TKey, TVal> {
self.node
}
}
/// A `Node` in a bucket, representing a peer participating
@ -264,6 +268,11 @@ where
}
}
/// Removes the pending node from the bucket, if any.
pub fn remove_pending(&mut self) -> Option<PendingNode<TKey, TVal>> {
self.pending.take()
}
/// Updates the status of the node referred to by the given key, if it is
/// in the bucket.
pub fn update(&mut self, key: &TKey, status: NodeStatus) {
@ -272,24 +281,7 @@ where
// prefix list of disconnected nodes or the suffix list of connected
// nodes (i.e. most-recently disconnected or most-recently connected,
// respectively).
if let Some(pos) = self.position(key) {
// Remove the node from its current position.
let old_status = self.status(pos);
let node = self.nodes.remove(pos.0);
// Adjust `first_connected_pos` accordingly.
match old_status {
NodeStatus::Connected =>
if self.first_connected_pos.map_or(false, |p| p == pos.0) {
if pos.0 == self.nodes.len() {
// It was the last connected node.
self.first_connected_pos = None
}
}
NodeStatus::Disconnected =>
if let Some(ref mut p) = self.first_connected_pos {
*p -= 1;
}
}
if let Some((node, _status, pos)) = self.remove(key) {
// If the least-recently connected node re-establishes its
// connected status, drop the pending node.
if pos == Position(0) && status == NodeStatus::Connected {
@ -357,6 +349,32 @@ where
}
}
/// Removes the node with the given key from the bucket, if it exists.
pub fn remove(&mut self, key: &TKey) -> Option<(Node<TKey, TVal>, NodeStatus, Position)> {
if let Some(pos) = self.position(key) {
// Remove the node from its current position.
let status = self.status(pos);
let node = self.nodes.remove(pos.0);
// Adjust `first_connected_pos` accordingly.
match status {
NodeStatus::Connected =>
if self.first_connected_pos.map_or(false, |p| p == pos.0) {
if pos.0 == self.nodes.len() {
// It was the last connected node.
self.first_connected_pos = None
}
}
NodeStatus::Disconnected =>
if let Some(ref mut p) = self.first_connected_pos {
*p -= 1;
}
}
Some((node, status, pos))
} else {
None
}
}
/// Returns the status of the node at the given position.
pub fn status(&self, pos: Position) -> NodeStatus {
if self.first_connected_pos.map_or(false, |i| pos.0 >= i) {

View File

@ -185,7 +185,7 @@ where
pub fn value(&mut self) -> &mut TVal {
&mut self.0.bucket
.get_mut(self.0.key)
.expect("We can only build a ConnectedEntry if the entry is in the bucket; QED")
.expect("We can only build a PresentEntry if the entry is in the bucket; QED")
.value
}
@ -194,6 +194,14 @@ where
self.0.bucket.update(self.0.key, status);
Self::new(self.0.bucket, self.0.key)
}
/// Removes the entry from the bucket.
pub fn remove(self) -> EntryView<TKey, TVal> {
let (node, status, _pos) = self.0.bucket
.remove(&self.0.key)
.expect("We can only build a PresentEntry if the entry is in the bucket; QED");
EntryView { node, status }
}
}
/// An entry waiting for a slot to be available in a bucket.
@ -227,6 +235,17 @@ where
self.0.bucket.update_pending(status);
PendingEntry::new(self.0.bucket, self.0.key)
}
/// Removes the pending entry from the bucket.
pub fn remove(self) -> EntryView<TKey, TVal> {
let pending = self.0.bucket
.remove_pending()
.expect("We can only build a PendingEntry if the entry is pending insertion
into the bucket; QED");
let status = pending.status();
let node = pending.into_node();
EntryView { node, status }
}
}
/// An entry that is not present in any bucket.