mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Kademlia: Optimise iteration over closest keys / entries. (#1117)
* Kademlia: Optimise iteration over closest entries. The current implementation for finding the entries whose keys are closest to some target key in the Kademlia routing table involves copying the keys of all buckets into a new `Vec` which is then sorted based on the distances to the target and turned into an iterator from which only a small number of elements (by default 20) are drawn. This commit introduces an iterator over buckets for finding the closest keys to a target that visits the buckets in the optimal order, based on the information contained in the distance bit-string representing the distance between the local key and the target. Correctness is tested against full-table scans. Also included: * Updated documentation. * The `Entry` API was moved to the `kbucket::entry` sub-module for ease of maintenance. * The pending node handling has been slightly refactored in order to bring code and documentation in agreement and clarify the semantics a little. * Rewrite pending node handling and add tests.
This commit is contained in:
parent
8adc5fa069
commit
09f54df44d
@ -44,16 +44,16 @@ fn main() {
|
||||
// to insert our local node in the DHT. However here we use `without_init` because this
|
||||
// example is very ephemeral and we don't want to pollute the DHT. In a real world
|
||||
// application, you want to use `new` instead.
|
||||
let mut behaviour = libp2p::kad::Kademlia::without_init(local_peer_id.clone());
|
||||
behaviour.add_connected_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
|
||||
behaviour.add_connected_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
|
||||
let mut behaviour = libp2p::kad::Kademlia::new(local_peer_id.clone());
|
||||
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
|
||||
behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
|
||||
libp2p::core::Swarm::new(transport, behaviour, local_peer_id)
|
||||
};
|
||||
|
||||
|
@ -36,11 +36,16 @@ impl Addresses {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the list of addresses.
|
||||
/// Returns an iterator over the list of addresses.
|
||||
pub fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
|
||||
self.addrs.iter()
|
||||
}
|
||||
|
||||
/// Converts the addresses into a `Vec`.
|
||||
pub fn into_vec(self) -> Vec<Multiaddr> {
|
||||
self.addrs.into_vec()
|
||||
}
|
||||
|
||||
/// Returns true if the list of addresses is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.addrs.is_empty()
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
use crate::addresses::Addresses;
|
||||
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
|
||||
use crate::kbucket::{self, KBucketsTable};
|
||||
use crate::kbucket::{self, KBucketsTable, NodeStatus};
|
||||
use crate::protocol::{KadConnectionType, KadPeer};
|
||||
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
|
||||
use fnv::{FnvHashMap, FnvHashSet};
|
||||
@ -74,13 +74,14 @@ pub struct Kademlia<TSubstream> {
|
||||
/// perform in parallel.
|
||||
parallelism: usize,
|
||||
|
||||
/// `k` in the Kademlia reference papers. Number of results in a find node query.
|
||||
/// The number of results to return from a query. Defaults to the maximum number
|
||||
/// of entries in a single k-bucket, i.e. the `k` parameter.
|
||||
num_results: usize,
|
||||
|
||||
/// Timeout for each individual RPC query.
|
||||
/// Timeout for a single RPC.
|
||||
rpc_timeout: Duration,
|
||||
|
||||
/// Events to return when polling.
|
||||
/// Queued events to return when the behaviour is being polled.
|
||||
queued_events: SmallVec<[NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaOut>; 32]>,
|
||||
|
||||
/// List of providers to add to the topology as soon as we are in `poll()`.
|
||||
@ -200,42 +201,37 @@ impl<TSubstream> Kademlia<TSubstream> {
|
||||
Self::new_inner(local_peer_id)
|
||||
}
|
||||
|
||||
/// Adds a known address for the given `PeerId`. We are connected to this address.
|
||||
// TODO: report if the address was inserted? also, semantics unclear
|
||||
pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
|
||||
self.add_address(peer_id, address, true)
|
||||
}
|
||||
|
||||
/// Adds a known address for the given `PeerId`. We are not connected or don't know whether we
|
||||
/// are connected to this address.
|
||||
// TODO: report if the address was inserted? also, semantics unclear
|
||||
pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
|
||||
self.add_address(peer_id, address, false)
|
||||
}
|
||||
|
||||
/// Underlying implementation for `add_connected_address` and `add_not_connected_address`.
|
||||
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) {
|
||||
/// Adds a known address of a peer participating in the Kademlia DHT to the
|
||||
/// routing table.
|
||||
///
|
||||
/// This allows prepopulating the Kademlia routing table with known
|
||||
/// addresses, so that they can be used immediately in following DHT
|
||||
/// operations involving one of these peers, without having to dial
|
||||
/// them upfront.
|
||||
pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
|
||||
let key = kbucket::Key::new(peer_id.clone());
|
||||
match self.kbuckets.entry(&key) {
|
||||
kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address),
|
||||
kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address),
|
||||
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address),
|
||||
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert(address),
|
||||
kbucket::Entry::NotInKbucket(entry) => {
|
||||
kbucket::Entry::Present(mut entry, _) => {
|
||||
entry.value().insert(address);
|
||||
}
|
||||
kbucket::Entry::Pending(mut entry, _) => {
|
||||
entry.value().insert(address);
|
||||
}
|
||||
kbucket::Entry::Absent(entry) => {
|
||||
let mut addresses = Addresses::new();
|
||||
addresses.insert(address);
|
||||
match entry.insert_disconnected(addresses) {
|
||||
kbucket::InsertOutcome::Inserted => {
|
||||
match entry.insert(addresses, NodeStatus::Disconnected) {
|
||||
kbucket::InsertResult::Inserted => {
|
||||
let event = KademliaOut::KBucketAdded {
|
||||
peer_id: peer_id.clone(),
|
||||
replaced: None,
|
||||
};
|
||||
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
|
||||
},
|
||||
kbucket::InsertOutcome::Full => (),
|
||||
kbucket::InsertOutcome::Pending { to_ping } => {
|
||||
kbucket::InsertResult::Full => (),
|
||||
kbucket::InsertResult::Pending { disconnected } => {
|
||||
self.queued_events.push(NetworkBehaviourAction::DialPeer {
|
||||
peer_id: to_ping.into_preimage(),
|
||||
peer_id: disconnected.into_preimage(),
|
||||
})
|
||||
},
|
||||
}
|
||||
@ -261,16 +257,17 @@ impl<TSubstream> Kademlia<TSubstream> {
|
||||
providing_keys: FnvHashSet::default(),
|
||||
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant
|
||||
parallelism,
|
||||
num_results: 20,
|
||||
num_results: kbucket::MAX_NODES_PER_BUCKET,
|
||||
rpc_timeout: Duration::from_secs(8),
|
||||
add_provider: SmallVec::new(),
|
||||
marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator to all the peer IDs in the bucket, without the pending nodes.
|
||||
pub fn kbuckets_entries(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.kbuckets.entries_not_pending().map(|(key, _)| key.preimage())
|
||||
/// Returns an iterator over all peer IDs of nodes currently contained in a bucket
|
||||
/// of the Kademlia routing table.
|
||||
pub fn kbuckets_entries(&mut self) -> impl Iterator<Item = &PeerId> {
|
||||
self.kbuckets.iter().map(|entry| entry.node.key.preimage())
|
||||
}
|
||||
|
||||
/// Starts an iterative `FIND_NODE` request.
|
||||
@ -334,8 +331,10 @@ impl<TSubstream> Kademlia<TSubstream> {
|
||||
untrusted_addresses: Default::default(),
|
||||
};
|
||||
|
||||
let target_key = kbucket::Key::new(target.clone());
|
||||
|
||||
let known_closest_peers = self.kbuckets
|
||||
.find_closest(&kbucket::Key::new(target.clone()))
|
||||
.closest_keys(&target_key)
|
||||
.take(self.num_results);
|
||||
|
||||
self.active_queries.insert(
|
||||
@ -376,6 +375,83 @@ impl<TSubstream> Kademlia<TSubstream> {
|
||||
query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id))
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the closest peers to a `target` in the context of a request by
|
||||
/// the `source` peer, such that the `source` peer is never included in the
|
||||
/// result.
|
||||
fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
|
||||
self.kbuckets
|
||||
.closest(target)
|
||||
.filter(|e| e.node.key.preimage() != source)
|
||||
.take(self.num_results)
|
||||
.map(KadPeer::from)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Collects all peers who are known to be providers of the value for a given `Multihash`.
|
||||
fn provider_peers(&mut self, key: &Multihash, source: &PeerId) -> Vec<KadPeer> {
|
||||
let kbuckets = &mut self.kbuckets;
|
||||
self.values_providers
|
||||
.get(key)
|
||||
.into_iter()
|
||||
.flat_map(|peers| peers)
|
||||
.filter_map(move |p|
|
||||
if p != source {
|
||||
let key = kbucket::Key::new(p.clone());
|
||||
kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned()))
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Update the connection status of a peer in the Kademlia routing table.
|
||||
fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
|
||||
let key = kbucket::Key::new(peer.clone());
|
||||
match self.kbuckets.entry(&key) {
|
||||
kbucket::Entry::Present(mut entry, old_status) => {
|
||||
if let Some(address) = address {
|
||||
entry.value().insert(address);
|
||||
}
|
||||
if old_status != new_status {
|
||||
entry.update(new_status);
|
||||
}
|
||||
},
|
||||
|
||||
kbucket::Entry::Pending(mut entry, old_status) => {
|
||||
if let Some(address) = address {
|
||||
entry.value().insert(address);
|
||||
}
|
||||
if old_status != new_status {
|
||||
entry.update(new_status);
|
||||
}
|
||||
},
|
||||
|
||||
kbucket::Entry::Absent(entry) => if new_status == NodeStatus::Connected {
|
||||
let mut addresses = Addresses::new();
|
||||
if let Some(address) = address {
|
||||
addresses.insert(address);
|
||||
}
|
||||
match entry.insert(addresses, new_status) {
|
||||
kbucket::InsertResult::Inserted => {
|
||||
let event = KademliaOut::KBucketAdded {
|
||||
peer_id: peer.clone(),
|
||||
replaced: None,
|
||||
};
|
||||
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
|
||||
},
|
||||
kbucket::InsertResult::Full => (),
|
||||
kbucket::InsertResult::Pending { disconnected } => {
|
||||
debug_assert!(!self.connected_peers.contains(disconnected.preimage()));
|
||||
self.queued_events.push(NetworkBehaviourAction::DialPeer {
|
||||
peer_id: disconnected.into_preimage(),
|
||||
})
|
||||
},
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> NetworkBehaviour for Kademlia<TSubstream>
|
||||
@ -396,11 +472,13 @@ where
|
||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
// We should order addresses from decreasing likelyhood of connectivity, so start with
|
||||
// the addresses of that peer in the k-buckets.
|
||||
let mut out_list = self.kbuckets
|
||||
.entry(&kbucket::Key::new(peer_id.clone()))
|
||||
.value_not_pending()
|
||||
.map(|l| l.iter().cloned().collect::<Vec<_>>())
|
||||
.unwrap_or_else(Vec::new);
|
||||
let key = kbucket::Key::new(peer_id.clone());
|
||||
let mut out_list =
|
||||
if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
|
||||
entry.value().iter().cloned().collect::<Vec<_>>()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// We add to that a temporary list of addresses from the ongoing queries.
|
||||
for query in self.active_queries.values() {
|
||||
@ -428,57 +506,7 @@ where
|
||||
ConnectedPoint::Listener { .. } => None,
|
||||
};
|
||||
|
||||
let key = kbucket::Key::new(id.clone());
|
||||
|
||||
match self.kbuckets.entry(&key) {
|
||||
kbucket::Entry::InKbucketConnected(_) => {
|
||||
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
|
||||
},
|
||||
kbucket::Entry::InKbucketConnectedPending(_) => {
|
||||
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
|
||||
},
|
||||
|
||||
kbucket::Entry::InKbucketDisconnected(mut entry) => {
|
||||
if let Some(address) = address {
|
||||
entry.value().insert(address);
|
||||
}
|
||||
entry.set_connected();
|
||||
},
|
||||
|
||||
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => {
|
||||
if let Some(address) = address {
|
||||
entry.value().insert(address);
|
||||
}
|
||||
entry.set_connected();
|
||||
},
|
||||
|
||||
kbucket::Entry::NotInKbucket(entry) => {
|
||||
let mut addresses = Addresses::new();
|
||||
if let Some(address) = address {
|
||||
addresses.insert(address);
|
||||
}
|
||||
match entry.insert_connected(addresses) {
|
||||
kbucket::InsertOutcome::Inserted => {
|
||||
let event = KademliaOut::KBucketAdded {
|
||||
peer_id: id.clone(),
|
||||
replaced: None,
|
||||
};
|
||||
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
|
||||
},
|
||||
kbucket::InsertOutcome::Full => (),
|
||||
kbucket::InsertOutcome::Pending { to_ping } => {
|
||||
self.queued_events.push(NetworkBehaviourAction::DialPeer {
|
||||
peer_id: to_ping.into_preimage(),
|
||||
})
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
kbucket::Entry::SelfEntry => {
|
||||
unreachable!("Guaranteed to never receive disconnected even for self; QED")
|
||||
},
|
||||
}
|
||||
|
||||
self.connection_updated(id.clone(), address, NodeStatus::Connected);
|
||||
self.connected_peers.insert(id);
|
||||
}
|
||||
|
||||
@ -486,10 +514,10 @@ where
|
||||
if let Some(peer_id) = peer_id {
|
||||
let key = kbucket::Key::new(peer_id.clone());
|
||||
|
||||
if let Some(list) = self.kbuckets.entry(&key).value() {
|
||||
if let Some(addrs) = self.kbuckets.entry(&key).value() {
|
||||
// TODO: don't remove the address if the error is that we are already connected
|
||||
// to this peer
|
||||
list.remove(addr);
|
||||
addrs.remove(addr);
|
||||
}
|
||||
|
||||
for query in self.active_queries.values_mut() {
|
||||
@ -507,40 +535,11 @@ where
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
|
||||
let was_in = self.connected_peers.remove(id);
|
||||
debug_assert!(was_in);
|
||||
|
||||
for query in self.active_queries.values_mut() {
|
||||
query.inject_rpc_error(id);
|
||||
}
|
||||
|
||||
match self.kbuckets.entry(&kbucket::Key::new(id.clone())) {
|
||||
kbucket::Entry::InKbucketConnected(entry) => {
|
||||
match entry.set_disconnected() {
|
||||
kbucket::SetDisconnectedOutcome::Kept(_) => {},
|
||||
kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => {
|
||||
let event = KademliaOut::KBucketAdded {
|
||||
peer_id: replacement.into_preimage(),
|
||||
replaced: Some(id.clone()),
|
||||
};
|
||||
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
|
||||
},
|
||||
}
|
||||
},
|
||||
kbucket::Entry::InKbucketConnectedPending(entry) => {
|
||||
entry.set_disconnected();
|
||||
},
|
||||
kbucket::Entry::InKbucketDisconnected(_) => {
|
||||
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
|
||||
},
|
||||
kbucket::Entry::InKbucketDisconnectedPending(_) => {
|
||||
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
|
||||
},
|
||||
kbucket::Entry::NotInKbucket(_) => {},
|
||||
kbucket::Entry::SelfEntry => {
|
||||
unreachable!("Guaranteed to never receive disconnected even for self; QED")
|
||||
},
|
||||
}
|
||||
self.connection_updated(id.clone(), None, NodeStatus::Disconnected);
|
||||
self.connected_peers.remove(id);
|
||||
}
|
||||
|
||||
fn inject_replaced(&mut self, peer_id: PeerId, _old: ConnectedPoint, new_endpoint: ConnectedPoint) {
|
||||
@ -554,9 +553,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(list) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
|
||||
if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
|
||||
if let ConnectedPoint::Dialer { address } = new_endpoint {
|
||||
list.insert(address);
|
||||
addrs.insert(address);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -564,13 +563,7 @@ where
|
||||
fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent<QueryId>) {
|
||||
match event {
|
||||
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
|
||||
let closer_peers = self.kbuckets
|
||||
.find_closest(&kbucket::Key::new(key))
|
||||
.filter(|p| p.preimage() != &source)
|
||||
.take(self.num_results)
|
||||
.map(|key| build_kad_peer(&key, &mut self.kbuckets))
|
||||
.collect();
|
||||
|
||||
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
|
||||
self.queued_events.push(NetworkBehaviourAction::SendEvent {
|
||||
peer_id: source,
|
||||
event: KademliaHandlerIn::FindNodeRes {
|
||||
@ -586,23 +579,8 @@ where
|
||||
self.discovered(&user_data, &source, closer_peers.iter());
|
||||
}
|
||||
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
|
||||
let provider_peers = {
|
||||
let kbuckets = &mut self.kbuckets;
|
||||
self.values_providers
|
||||
.get(&key)
|
||||
.into_iter()
|
||||
.flat_map(|peers| peers)
|
||||
.filter(|p| *p != &source)
|
||||
.map(move |peer_id| build_kad_peer(&kbucket::Key::new(peer_id.clone()), kbuckets))
|
||||
.collect()
|
||||
};
|
||||
|
||||
let closer_peers = self.kbuckets
|
||||
.find_closest(&kbucket::Key::from(key))
|
||||
.take(self.num_results)
|
||||
.map(|key| build_kad_peer(&key, &mut self.kbuckets))
|
||||
.collect();
|
||||
|
||||
let provider_peers = self.provider_peers(&key, &source);
|
||||
let closer_peers = self.find_closest(&kbucket::Key::from(key), &source);
|
||||
self.queued_events.push(NetworkBehaviourAction::SendEvent {
|
||||
peer_id: source,
|
||||
event: KademliaHandlerIn::GetProvidersRes {
|
||||
@ -660,7 +638,7 @@ where
|
||||
// Flush the changes to the topology that we want to make.
|
||||
for (key, provider) in self.add_provider.drain() {
|
||||
// Don't add ourselves to the providers.
|
||||
if provider == *self.kbuckets.local_key().preimage() {
|
||||
if &provider == self.kbuckets.local_key().preimage() {
|
||||
continue;
|
||||
}
|
||||
let providers = self.values_providers.entry(key).or_insert_with(Default::default);
|
||||
@ -683,12 +661,21 @@ where
|
||||
}
|
||||
|
||||
loop {
|
||||
// Handle events queued by other parts of this struct
|
||||
// Drain queued events first.
|
||||
if !self.queued_events.is_empty() {
|
||||
return Async::Ready(self.queued_events.remove(0));
|
||||
}
|
||||
self.queued_events.shrink_to_fit();
|
||||
|
||||
// Drain applied pending entries from the routing table.
|
||||
if let Some(entry) = self.kbuckets.take_applied_pending() {
|
||||
let event = KademliaOut::KBucketAdded {
|
||||
peer_id: entry.inserted.into_preimage(),
|
||||
replaced: entry.evicted.map(|n| n.key.into_preimage())
|
||||
};
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event))
|
||||
}
|
||||
|
||||
// If iterating finds a query that is finished, stores it here and stops looping.
|
||||
let mut finished_query = None;
|
||||
|
||||
@ -751,16 +738,18 @@ where
|
||||
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||
},
|
||||
QueryInfoInner::AddProvider { target } => {
|
||||
let local_key = kbucket::Key::new(parameters.local_peer_id().clone());
|
||||
for closest in closer_peers {
|
||||
let event = NetworkBehaviourAction::SendEvent {
|
||||
peer_id: closest,
|
||||
event: KademliaHandlerIn::AddProvider {
|
||||
key: target.clone(),
|
||||
provider_peer: build_kad_peer(&local_key, &mut self.kbuckets),
|
||||
provider_peer: KadPeer {
|
||||
node_id: parameters.local_peer_id().clone(),
|
||||
multiaddrs: parameters.external_addresses().cloned().collect(),
|
||||
connection_ty: KadConnectionType::Connected,
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
self.queued_events.push(event);
|
||||
}
|
||||
},
|
||||
@ -815,26 +804,16 @@ pub enum KademliaOut {
|
||||
},
|
||||
}
|
||||
|
||||
/// Builds a `KadPeer` struct corresponding to the given `PeerId`.
|
||||
/// The `PeerId` cannot be the same as the local one.
|
||||
///
|
||||
/// > **Note**: This is just a convenience function that doesn't do anything note-worthy.
|
||||
fn build_kad_peer(
|
||||
key: &kbucket::Key<PeerId>,
|
||||
kbuckets: &mut KBucketsTable<PeerId, Addresses>
|
||||
) -> KadPeer {
|
||||
let (multiaddrs, connection_ty) = match kbuckets.entry(key) {
|
||||
kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection?
|
||||
kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
|
||||
kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
|
||||
kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
|
||||
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
|
||||
kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the kbucket::Key of the local ID"),
|
||||
};
|
||||
|
||||
KadPeer {
|
||||
node_id: key.preimage().clone(),
|
||||
multiaddrs,
|
||||
connection_ty,
|
||||
impl From<kbucket::EntryView<PeerId, Addresses>> for KadPeer {
|
||||
fn from(e: kbucket::EntryView<PeerId, Addresses>) -> KadPeer {
|
||||
KadPeer {
|
||||
node_id: e.node.key.into_preimage(),
|
||||
multiaddrs: e.node.value.into_vec(),
|
||||
connection_ty: match e.status {
|
||||
NodeStatus::Connected => KadConnectionType::Connected,
|
||||
NodeStatus::Disconnected => KadConnectionType::NotConnected
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ fn query_iter() {
|
||||
|
||||
// Connect each swarm in the list to its predecessor in the list.
|
||||
for (i, (swarm, peer)) in &mut swarms.iter_mut().skip(1).zip(swarm_ids.clone()).enumerate() {
|
||||
swarm.add_not_connected_address(&peer, Protocol::Memory(port_base + i as u64).into())
|
||||
swarm.add_address(&peer, Protocol::Memory(port_base + i as u64).into())
|
||||
}
|
||||
|
||||
// Ask the last peer in the list to search a random peer. The search should
|
||||
@ -150,7 +150,7 @@ fn unresponsive_not_returned_direct() {
|
||||
|
||||
// Add fake addresses.
|
||||
for _ in 0 .. 10 {
|
||||
swarms[0].add_not_connected_address(&PeerId::random(), Protocol::Udp(10u16).into());
|
||||
swarms[0].add_address(&PeerId::random(), Protocol::Udp(10u16).into());
|
||||
}
|
||||
|
||||
// Ask first to search a random value.
|
||||
@ -189,14 +189,14 @@ fn unresponsive_not_returned_indirect() {
|
||||
// Add fake addresses to first.
|
||||
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
|
||||
for _ in 0 .. 10 {
|
||||
swarms[0].add_not_connected_address(
|
||||
swarms[0].add_address(
|
||||
&PeerId::random(),
|
||||
multiaddr![Udp(10u16)]
|
||||
);
|
||||
}
|
||||
|
||||
// Connect second to first.
|
||||
swarms[1].add_not_connected_address(&first_peer_id, Protocol::Memory(port_base).into());
|
||||
swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into());
|
||||
|
||||
// Ask second to search a random value.
|
||||
let search_target = PeerId::random();
|
||||
|
File diff suppressed because it is too large
Load Diff
552
protocols/kad/src/kbucket/bucket.rs
Normal file
552
protocols/kad/src/kbucket/bucket.rs
Normal file
@ -0,0 +1,552 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! The internal API for a single `KBucket` in a `KBucketsTable`.
|
||||
//!
|
||||
//! > **Note**: Uniqueness of entries w.r.t. a `Key` in a `KBucket` is not
|
||||
//! > checked in this module. This is an invariant that must hold across all
|
||||
//! > buckets in a `KBucketsTable` and hence is enforced by the public API
|
||||
//! > of the `KBucketsTable` and in particular the public `Entry` API.
|
||||
|
||||
use super::*;
|
||||
|
||||
/// Maximum number of nodes in a bucket, i.e. the (fixed) `k` parameter.
|
||||
pub const MAX_NODES_PER_BUCKET: usize = 20;
|
||||
|
||||
/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PendingNode<TPeerId, TVal> {
|
||||
/// The pending node to insert.
|
||||
node: Node<TPeerId, TVal>,
|
||||
|
||||
/// The status of the pending node.
|
||||
status: NodeStatus,
|
||||
|
||||
/// The instant at which the pending node is eligible for insertion into a bucket.
|
||||
replace: Instant,
|
||||
}
|
||||
|
||||
/// The status of a node in a bucket.
|
||||
///
|
||||
/// The status of a node in a bucket together with the time of the
|
||||
/// last status change determines the position of the node in a
|
||||
/// bucket.
|
||||
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
|
||||
pub enum NodeStatus {
|
||||
/// The node is considered connected.
|
||||
Connected,
|
||||
/// The node is considered disconnected.
|
||||
Disconnected
|
||||
}
|
||||
|
||||
impl<TPeerId, TVal> PendingNode<TPeerId, TVal> {
|
||||
pub fn key(&self) -> &Key<TPeerId> {
|
||||
&self.node.key
|
||||
}
|
||||
|
||||
pub fn status(&self) -> NodeStatus {
|
||||
self.status
|
||||
}
|
||||
|
||||
pub fn value_mut(&mut self) -> &mut TVal {
|
||||
&mut self.node.value
|
||||
}
|
||||
|
||||
pub fn is_ready(&self) -> bool {
|
||||
Instant::now() >= self.replace
|
||||
}
|
||||
|
||||
pub fn set_ready_at(&mut self, t: Instant) {
|
||||
self.replace = t;
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Node` in a bucket, representing a peer participating
|
||||
/// in the Kademlia DHT together with an associated value (e.g. contact
|
||||
/// information).
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Node<TPeerId, TVal> {
|
||||
/// The key of the node, identifying the peer.
|
||||
pub key: Key<TPeerId>,
|
||||
/// The associated value.
|
||||
pub value: TVal,
|
||||
}
|
||||
|
||||
/// The position of a node in a `KBucket`, i.e. a non-negative integer
|
||||
/// in the range `[0, MAX_NODES_PER_BUCKET)`.
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct Position(usize);
|
||||
|
||||
/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values,
|
||||
/// ordered from least-recently connected to most-recently connected.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KBucket<TPeerId, TVal> {
|
||||
/// The nodes contained in the bucket.
|
||||
nodes: ArrayVec<[Node<TPeerId, TVal>; MAX_NODES_PER_BUCKET]>,
|
||||
|
||||
/// The position (index) in `nodes` that marks the first connected node.
|
||||
///
|
||||
/// Since the entries in `nodes` are ordered from least-recently connected to
|
||||
/// most-recently connected, all entries above this index are also considered
|
||||
/// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries
|
||||
/// that are considered disconnected and the range
|
||||
/// `[first_connected_pos, MAX_NODES_PER_BUCKET)` marks sub-list of entries that are
|
||||
/// considered connected.
|
||||
///
|
||||
/// `None` indicates that there are no connected entries in the bucket, i.e.
|
||||
/// the bucket is either empty, or contains only entries for peers that are
|
||||
/// considered disconnected.
|
||||
first_connected_pos: Option<usize>,
|
||||
|
||||
/// A node that is pending to be inserted into a full bucket, should the
|
||||
/// least-recently connected (and currently disconnected) node not be
|
||||
/// marked as connected within `unresponsive_timeout`.
|
||||
pending: Option<PendingNode<TPeerId, TVal>>,
|
||||
|
||||
/// The timeout window before a new pending node is eligible for insertion,
|
||||
/// if the least-recently connected node is not updated as being connected
|
||||
/// in the meantime.
|
||||
pending_timeout: Duration
|
||||
}
|
||||
|
||||
/// The result of inserting an entry into a bucket.
|
||||
#[must_use]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum InsertResult<TPeerId> {
|
||||
/// The entry has been successfully inserted.
|
||||
Inserted,
|
||||
/// The entry is pending insertion because the relevant bucket is currently full.
|
||||
/// The entry is inserted after a timeout elapsed, if the status of the
|
||||
/// least-recently connected (and currently disconnected) node in the bucket
|
||||
/// is not updated before the timeout expires.
|
||||
Pending {
|
||||
/// The key of the least-recently connected entry that is currently considered
|
||||
/// disconnected and whose corresponding peer should be checked for connectivity
|
||||
/// in order to prevent it from being evicted. If connectivity to the peer is
|
||||
/// re-established, the corresponding entry should be updated with
|
||||
/// [`NodeStatus::Connected`].
|
||||
disconnected: Key<TPeerId>
|
||||
},
|
||||
/// The entry was not inserted because the relevant bucket is full.
|
||||
Full
|
||||
}
|
||||
|
||||
/// The result of applying a pending node to a bucket, possibly
|
||||
/// replacing an existing node.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct AppliedPending<TPeerId, TVal> {
|
||||
/// The key of the inserted pending node.
|
||||
pub inserted: Key<TPeerId>,
|
||||
/// The node that has been evicted from the bucket to make room for the
|
||||
/// pending node, if any.
|
||||
pub evicted: Option<Node<TPeerId, TVal>>
|
||||
}
|
||||
|
||||
impl<TPeerId, TVal> KBucket<TPeerId, TVal>
|
||||
where
|
||||
TPeerId: Clone
|
||||
{
|
||||
/// Creates a new `KBucket` with the given timeout for pending entries.
|
||||
pub fn new(pending_timeout: Duration) -> Self {
|
||||
KBucket {
|
||||
nodes: ArrayVec::new(),
|
||||
first_connected_pos: None,
|
||||
pending: None,
|
||||
pending_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the pending node of the bucket, if there is any.
|
||||
pub fn pending(&self) -> Option<&PendingNode<TPeerId, TVal>> {
|
||||
self.pending.as_ref()
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the pending node of the bucket, if there is any.
|
||||
pub fn pending_mut(&mut self) -> Option<&mut PendingNode<TPeerId, TVal>> {
|
||||
self.pending.as_mut()
|
||||
}
|
||||
|
||||
/// Returns a reference to the pending node of the bucket, if there is any
|
||||
/// with a matching key.
|
||||
pub fn as_pending(&self, key: &Key<TPeerId>) -> Option<&PendingNode<TPeerId, TVal>> {
|
||||
self.pending().filter(|p| &p.node.key == key)
|
||||
}
|
||||
|
||||
/// Returns a reference to a node in the bucket.
|
||||
pub fn get(&self, key: &Key<TPeerId>) -> Option<&Node<TPeerId, TVal>> {
|
||||
self.position(key).map(|p| &self.nodes[p.0])
|
||||
}
|
||||
|
||||
/// Returns an iterator over the nodes in the bucket, together with their status.
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&Node<TPeerId, TVal>, NodeStatus)> {
|
||||
self.nodes.iter().enumerate().map(move |(p, n)| (n, self.status(Position(p))))
|
||||
}
|
||||
|
||||
/// Inserts the pending node into the bucket, if its timeout has elapsed,
|
||||
/// replacing the least-recently connected node.
|
||||
///
|
||||
/// If a pending node has been inserted, its key is returned together with
|
||||
/// the node that was replaced. `None` indicates that the nodes in the
|
||||
/// bucket remained unchanged.
|
||||
pub fn apply_pending(&mut self) -> Option<AppliedPending<TPeerId, TVal>> {
|
||||
if let Some(pending) = self.pending.take() {
|
||||
if pending.replace <= Instant::now() {
|
||||
if self.nodes.is_full() {
|
||||
if self.status(Position(0)) == NodeStatus::Connected {
|
||||
// The bucket is full with connected nodes. Drop the pending node.
|
||||
return None
|
||||
}
|
||||
// The pending node will be inserted.
|
||||
let inserted = pending.node.key.clone();
|
||||
// A connected pending node goes at the end of the list for
|
||||
// the connected peers, removing the least-recently connected.
|
||||
if pending.status == NodeStatus::Connected {
|
||||
let evicted = Some(self.nodes.remove(0));
|
||||
self.first_connected_pos = self.first_connected_pos
|
||||
.map_or_else(
|
||||
| | Some(self.nodes.len()),
|
||||
|p| p.checked_sub(1));
|
||||
self.nodes.push(pending.node);
|
||||
return Some(AppliedPending { inserted, evicted })
|
||||
}
|
||||
// A disconnected pending node goes at the end of the list
|
||||
// for the disconnected peers.
|
||||
else if let Some(p) = self.first_connected_pos {
|
||||
if let Some(insert_pos) = p.checked_sub(1) {
|
||||
let evicted = Some(self.nodes.remove(0));
|
||||
self.nodes.insert(insert_pos, pending.node);
|
||||
return Some(AppliedPending { inserted, evicted })
|
||||
}
|
||||
} else {
|
||||
// All nodes are disconnected. Insert the new node as the most
|
||||
// recently disconnected, removing the least-recently disconnected.
|
||||
let evicted = Some(self.nodes.remove(0));
|
||||
self.nodes.push(pending.node);
|
||||
return Some(AppliedPending { inserted, evicted })
|
||||
}
|
||||
} else {
|
||||
// There is room in the bucket, so just insert the pending node.
|
||||
let inserted = pending.node.key.clone();
|
||||
match self.insert(pending.node, pending.status) {
|
||||
InsertResult::Inserted =>
|
||||
return Some(AppliedPending { inserted, evicted: None }),
|
||||
_ => unreachable!("Bucket is not full.")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.pending = Some(pending);
|
||||
}
|
||||
}
|
||||
|
||||
return None
|
||||
}
|
||||
|
||||
/// Updates the status of the pending node, if any.
|
||||
pub fn update_pending(&mut self, status: NodeStatus) {
|
||||
if let Some(pending) = &mut self.pending {
|
||||
pending.status = status
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the status of the node referred to by the given key, if it is
|
||||
/// in the bucket.
|
||||
pub fn update(&mut self, key: &Key<TPeerId>, status: NodeStatus) {
|
||||
if let Some(pos) = self.position(key) {
|
||||
let node = self.nodes.remove(pos.0);
|
||||
if pos == Position(0) && status == NodeStatus::Connected {
|
||||
self.pending = None
|
||||
}
|
||||
match self.insert(node, status) {
|
||||
InsertResult::Inserted => {},
|
||||
_ => unreachable!("The node is removed before being (re)inserted.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts a new node into the bucket with the given status.
|
||||
///
|
||||
/// The status of the node to insert determines the result as follows:
|
||||
///
|
||||
/// * `NodeStatus::Connected`: If the bucket is full and either all nodes are connected
|
||||
/// or there is already a pending node, insertion fails with `InsertResult::Full`.
|
||||
/// If the bucket is full but at least one node is disconnected and there is no pending
|
||||
/// node, the new node is inserted as pending, yielding `InsertResult::Pending`.
|
||||
/// Otherwise the bucket has free slots and the new node is added to the end of the
|
||||
/// bucket as the most-recently connected node.
|
||||
///
|
||||
/// * `NodeStatus::Disconnected`: If the bucket is full, insertion fails with
|
||||
/// `InsertResult::Full`. Otherwise the bucket has free slots and the new node
|
||||
/// is inserted at the position preceding the first connected node,
|
||||
/// i.e. as the most-recently disconnected node. If there are no connected nodes,
|
||||
/// the new node is added as the last element of the bucket.
|
||||
///
|
||||
pub fn insert(&mut self, node: Node<TPeerId, TVal>, status: NodeStatus) -> InsertResult<TPeerId> {
|
||||
match status {
|
||||
NodeStatus::Connected => {
|
||||
if self.nodes.is_full() {
|
||||
if self.first_connected_pos == Some(0) || self.pending.is_some() {
|
||||
return InsertResult::Full
|
||||
} else {
|
||||
self.pending = Some(PendingNode {
|
||||
node,
|
||||
status: NodeStatus::Connected,
|
||||
replace: Instant::now() + self.pending_timeout,
|
||||
});
|
||||
return InsertResult::Pending {
|
||||
disconnected: self.nodes[0].key.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
let pos = self.nodes.len();
|
||||
self.first_connected_pos = self.first_connected_pos.or(Some(pos));
|
||||
self.nodes.push(node);
|
||||
InsertResult::Inserted
|
||||
}
|
||||
NodeStatus::Disconnected => {
|
||||
if self.nodes.is_full() {
|
||||
return InsertResult::Full
|
||||
}
|
||||
if let Some(ref mut first_connected_pos) = self.first_connected_pos {
|
||||
self.nodes.insert(*first_connected_pos, node);
|
||||
*first_connected_pos += 1;
|
||||
} else {
|
||||
self.nodes.push(node);
|
||||
}
|
||||
InsertResult::Inserted
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the status of the node at the given position.
|
||||
pub fn status(&self, pos: Position) -> NodeStatus {
|
||||
if self.first_connected_pos.map_or(false, |i| pos.0 >= i) {
|
||||
NodeStatus::Connected
|
||||
} else {
|
||||
NodeStatus::Disconnected
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether the given position refers to a connected node.
|
||||
pub fn is_connected(&self, pos: Position) -> bool {
|
||||
self.status(pos) == NodeStatus::Connected
|
||||
}
|
||||
|
||||
/// Gets the number of entries currently in the bucket.
|
||||
pub fn num_entries(&self) -> usize {
|
||||
self.nodes.len()
|
||||
}
|
||||
|
||||
/// Gets the number of entries in the bucket that are considered connected.
|
||||
pub fn num_connected(&self) -> usize {
|
||||
self.first_connected_pos.map_or(0, |i| self.nodes.len() - i)
|
||||
}
|
||||
|
||||
/// Gets the number of entries in the bucket that are considered disconnected.
|
||||
pub fn num_disconnected(&self) -> usize {
|
||||
self.nodes.len() - self.num_connected()
|
||||
}
|
||||
|
||||
/// Gets the position of an node in the bucket.
|
||||
pub fn position(&self, key: &Key<TPeerId>) -> Option<Position> {
|
||||
self.nodes.iter().position(|p| &p.key == key).map(Position)
|
||||
}
|
||||
|
||||
/// Gets a mutable reference to the node identified by the given key.
|
||||
///
|
||||
/// Returns `None` if the given key does not refer to an node in the
|
||||
/// bucket.
|
||||
pub fn get_mut(&mut self, key: &Key<TPeerId>) -> Option<&mut Node<TPeerId, TVal>> {
|
||||
self.nodes.iter_mut().find(move |p| &p.key == key)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use libp2p_core::PeerId;
|
||||
use rand::Rng;
|
||||
use std::collections::VecDeque;
|
||||
use super::*;
|
||||
use quickcheck::*;
|
||||
|
||||
impl Arbitrary for NodeStatus {
|
||||
fn arbitrary<G: Gen>(g: &mut G) -> NodeStatus {
|
||||
if g.gen() {
|
||||
NodeStatus::Connected
|
||||
} else {
|
||||
NodeStatus::Disconnected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn fill_bucket(bucket: &mut KBucket<PeerId, ()>, status: NodeStatus) {
|
||||
for i in 0 .. MAX_NODES_PER_BUCKET - bucket.num_entries() {
|
||||
let key = Key::new(PeerId::random());
|
||||
let node = Node { key, value: () };
|
||||
assert_eq!(InsertResult::Inserted, bucket.insert(node, status));
|
||||
assert_eq!(bucket.num_entries(), i + 1);
|
||||
}
|
||||
assert!(bucket.pending().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ordering() {
|
||||
fn prop(status: Vec<NodeStatus>) -> bool {
|
||||
let mut bucket = KBucket::<PeerId, ()>::new(Duration::from_secs(1));
|
||||
|
||||
// The expected lists of connected and disconnected nodes.
|
||||
let mut connected = VecDeque::new();
|
||||
let mut disconnected = VecDeque::new();
|
||||
|
||||
// Fill the bucket, thereby populating the expected lists in insertion order.
|
||||
for status in status {
|
||||
let key = Key::new(PeerId::random());
|
||||
let node = Node { key: key.clone(), value: () };
|
||||
let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
|
||||
match bucket.insert(node, status) {
|
||||
InsertResult::Inserted => {
|
||||
let vec = match status {
|
||||
NodeStatus::Connected => &mut connected,
|
||||
NodeStatus::Disconnected => &mut disconnected
|
||||
};
|
||||
if full {
|
||||
vec.pop_front();
|
||||
}
|
||||
vec.push_back((status, key.clone()));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Get all nodes from the bucket, together with their status.
|
||||
let mut nodes = bucket.iter()
|
||||
.map(|(n, s)| (s, n.key.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Split the list of nodes at the first connected node.
|
||||
let first_connected_pos = nodes.iter().position(|(s,_)| *s == NodeStatus::Connected);
|
||||
assert_eq!(bucket.first_connected_pos, first_connected_pos);
|
||||
let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p));
|
||||
|
||||
// All nodes before the first connected node must be disconnected and
|
||||
// in insertion order. Similarly, all remaining nodes must be connected
|
||||
// and in insertion order.
|
||||
nodes == Vec::from(disconnected)
|
||||
&&
|
||||
tail == Vec::from(connected)
|
||||
}
|
||||
|
||||
quickcheck(prop as fn(_) -> _);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn full_bucket() {
|
||||
let mut bucket = KBucket::<PeerId, ()>::new(Duration::from_secs(1));
|
||||
|
||||
// Fill the bucket with disconnected nodes.
|
||||
fill_bucket(&mut bucket, NodeStatus::Disconnected);
|
||||
|
||||
// Trying to insert another disconnected node fails.
|
||||
let key = Key::new(PeerId::random());
|
||||
let node = Node { key, value: () };
|
||||
match bucket.insert(node, NodeStatus::Disconnected) {
|
||||
InsertResult::Full => {},
|
||||
x => panic!("{:?}", x)
|
||||
}
|
||||
|
||||
// One-by-one fill the bucket with connected nodes, replacing the disconnected ones.
|
||||
for i in 0 .. MAX_NODES_PER_BUCKET {
|
||||
let (first, first_status) = bucket.iter().next().unwrap();
|
||||
let first_disconnected = first.clone();
|
||||
assert_eq!(first_status, NodeStatus::Disconnected);
|
||||
|
||||
// Add a connected node, which is expected to be pending, scheduled to
|
||||
// replace the first (i.e. least-recently connected) node.
|
||||
let key = Key::new(PeerId::random());
|
||||
let node = Node { key: key.clone(), value: () };
|
||||
match bucket.insert(node.clone(), NodeStatus::Connected) {
|
||||
InsertResult::Pending { disconnected } =>
|
||||
assert_eq!(disconnected, first_disconnected.key),
|
||||
x => panic!("{:?}", x)
|
||||
}
|
||||
|
||||
// Trying to insert another connected node fails.
|
||||
match bucket.insert(node.clone(), NodeStatus::Connected) {
|
||||
InsertResult::Full => {},
|
||||
x => panic!("{:?}", x)
|
||||
}
|
||||
|
||||
assert!(bucket.pending().is_some());
|
||||
|
||||
// Apply the pending node.
|
||||
let pending = bucket.pending_mut().expect("No pending node.");
|
||||
pending.set_ready_at(Instant::now() - Duration::from_secs(1));
|
||||
let result = bucket.apply_pending();
|
||||
assert_eq!(result, Some(AppliedPending {
|
||||
inserted: key.clone(),
|
||||
evicted: Some(first_disconnected)
|
||||
}));
|
||||
assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last());
|
||||
assert!(bucket.pending().is_none());
|
||||
assert_eq!(Some(MAX_NODES_PER_BUCKET - (i + 1)), bucket.first_connected_pos);
|
||||
}
|
||||
|
||||
assert!(bucket.pending().is_none());
|
||||
assert_eq!(MAX_NODES_PER_BUCKET, bucket.num_entries());
|
||||
|
||||
// Trying to insert another connected node fails.
|
||||
let key = Key::new(PeerId::random());
|
||||
let node = Node { key, value: () };
|
||||
match bucket.insert(node, NodeStatus::Connected) {
|
||||
InsertResult::Full => {},
|
||||
x => panic!("{:?}", x)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn full_bucket_discard_pending() {
|
||||
let mut bucket = KBucket::<PeerId, ()>::new(Duration::from_secs(1));
|
||||
fill_bucket(&mut bucket, NodeStatus::Disconnected);
|
||||
let (first, _) = bucket.iter().next().unwrap();
|
||||
let first_disconnected = first.clone();
|
||||
|
||||
// Add a connected pending node.
|
||||
let key = Key::new(PeerId::random());
|
||||
let node = Node { key: key.clone(), value: () };
|
||||
if let InsertResult::Pending { disconnected } = bucket.insert(node, NodeStatus::Connected) {
|
||||
assert_eq!(&disconnected, &first_disconnected.key);
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
assert!(bucket.pending().is_some());
|
||||
|
||||
// Update the status of the first disconnected node to be connected.
|
||||
bucket.update(&first_disconnected.key, NodeStatus::Connected);
|
||||
|
||||
// The pending node has been discarded.
|
||||
assert!(bucket.pending().is_none());
|
||||
assert!(bucket.iter().all(|(n,_)| &n.key != &key));
|
||||
|
||||
// The initially disconnected node is now the most-recently connected.
|
||||
assert_eq!(Some((&first_disconnected, NodeStatus::Connected)), bucket.iter().last());
|
||||
assert_eq!(bucket.position(&first_disconnected.key).map(|p| p.0), bucket.first_connected_pos);
|
||||
assert_eq!(1, bucket.num_connected());
|
||||
assert_eq!(MAX_NODES_PER_BUCKET - 1, bucket.num_disconnected());
|
||||
}
|
||||
}
|
254
protocols/kad/src/kbucket/entry.rs
Normal file
254
protocols/kad/src/kbucket/entry.rs
Normal file
@ -0,0 +1,254 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! The `Entry` API for quering and modifying the entries of a `KBucketsTable`
|
||||
//! representing the nodes participating in the Kademlia DHT.
|
||||
|
||||
pub use super::bucket::{Node, NodeStatus, InsertResult, AppliedPending, MAX_NODES_PER_BUCKET};
|
||||
pub use super::key::*;
|
||||
|
||||
use super::*;
|
||||
|
||||
/// An immutable by-reference view of a bucket entry.
|
||||
pub struct EntryRefView<'a, TPeerId, TVal> {
|
||||
/// The node represented by the entry.
|
||||
pub node: NodeRefView<'a, TPeerId, TVal>,
|
||||
/// The status of the node identified by the key.
|
||||
pub status: NodeStatus
|
||||
}
|
||||
|
||||
/// An immutable by-reference view of a `Node`.
|
||||
pub struct NodeRefView<'a, TPeerId, TVal> {
|
||||
pub key: &'a Key<TPeerId>,
|
||||
pub value: &'a TVal
|
||||
}
|
||||
|
||||
impl<TPeerId, TVal> EntryRefView<'_, TPeerId, TVal> {
|
||||
pub fn to_owned(&self) -> EntryView<TPeerId, TVal>
|
||||
where
|
||||
TPeerId: Clone,
|
||||
TVal: Clone
|
||||
{
|
||||
EntryView {
|
||||
node: Node {
|
||||
key: self.node.key.clone(),
|
||||
value: self.node.value.clone()
|
||||
},
|
||||
status: self.status
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A cloned, immutable view of an entry that is either present in a bucket
|
||||
/// or pending insertion.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EntryView<TPeerId, TVal> {
|
||||
/// The node represented by the entry.
|
||||
pub node: Node<TPeerId, TVal>,
|
||||
/// The status of the node.
|
||||
pub status: NodeStatus
|
||||
}
|
||||
|
||||
impl<TPeerId, TVal> AsRef<Key<TPeerId>> for EntryView<TPeerId, TVal> {
|
||||
fn as_ref(&self) -> &Key<TPeerId> {
|
||||
&self.node.key
|
||||
}
|
||||
}
|
||||
|
||||
/// A reference into a single entry of a `KBucketsTable`.
|
||||
#[derive(Debug)]
|
||||
pub enum Entry<'a, TPeerId, TVal> {
|
||||
/// The entry is present in a bucket.
|
||||
Present(PresentEntry<'a, TPeerId, TVal>, NodeStatus),
|
||||
/// The entry is pending insertion in a bucket.
|
||||
Pending(PendingEntry<'a, TPeerId, TVal>, NodeStatus),
|
||||
/// The entry is absent and may be inserted.
|
||||
Absent(AbsentEntry<'a, TPeerId, TVal>),
|
||||
/// The entry represents the local node.
|
||||
SelfEntry,
|
||||
}
|
||||
|
||||
/// The internal representation of the different states of an `Entry`,
|
||||
/// referencing the associated key and bucket.
|
||||
#[derive(Debug)]
|
||||
struct EntryRef<'a, TPeerId, TVal> {
|
||||
bucket: &'a mut KBucket<TPeerId, TVal>,
|
||||
key: &'a Key<TPeerId>,
|
||||
}
|
||||
|
||||
impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal>
|
||||
where
|
||||
TPeerId: Clone,
|
||||
{
|
||||
/// Creates a new `Entry` for a `Key`, encapsulating access to a bucket.
|
||||
pub(super) fn new(bucket: &'a mut KBucket<TPeerId, TVal>, key: &'a Key<TPeerId>) -> Self {
|
||||
if let Some(pos) = bucket.position(key) {
|
||||
let status = bucket.status(pos);
|
||||
Entry::Present(PresentEntry::new(bucket, key), status)
|
||||
} else if let Some(pending) = bucket.as_pending(key) {
|
||||
let status = pending.status();
|
||||
Entry::Pending(PendingEntry::new(bucket, key), status)
|
||||
} else {
|
||||
Entry::Absent(AbsentEntry::new(bucket, key))
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates an immutable by-reference view of the entry.
|
||||
///
|
||||
/// Returns `None` if the entry is neither present in a bucket nor
|
||||
/// pending insertion into a bucket.
|
||||
pub fn view(&'a mut self) -> Option<EntryRefView<'a, TPeerId, TVal>> {
|
||||
match self {
|
||||
Entry::Present(entry, status) => Some(EntryRefView {
|
||||
node: NodeRefView {
|
||||
key: entry.0.key,
|
||||
value: entry.value()
|
||||
},
|
||||
status: *status
|
||||
}),
|
||||
Entry::Pending(entry, status) => Some(EntryRefView {
|
||||
node: NodeRefView {
|
||||
key: entry.0.key,
|
||||
value: entry.value()
|
||||
},
|
||||
status: *status
|
||||
}),
|
||||
_ => None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the key of the entry.
|
||||
///
|
||||
/// Returns `None` if the `Key` used to construct this `Entry` is not a valid
|
||||
/// key for an entry in a bucket, which is the case for the `local_key` of
|
||||
/// the `KBucketsTable` referring to the local node.
|
||||
pub fn key(&self) -> Option<&Key<TPeerId>> {
|
||||
match self {
|
||||
Entry::Present(entry, _) => Some(entry.key()),
|
||||
Entry::Pending(entry, _) => Some(entry.key()),
|
||||
Entry::Absent(entry) => Some(entry.key()),
|
||||
Entry::SelfEntry => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the value associated with the entry.
|
||||
///
|
||||
/// Returns `None` if the entry absent from any bucket or refers to the
|
||||
/// local node.
|
||||
pub fn value(&mut self) -> Option<&mut TVal> {
|
||||
match self {
|
||||
Entry::Present(entry, _) => Some(entry.value()),
|
||||
Entry::Pending(entry, _) => Some(entry.value()),
|
||||
Entry::Absent(_) => None,
|
||||
Entry::SelfEntry => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An entry present in a bucket.
|
||||
#[derive(Debug)]
|
||||
pub struct PresentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>);
|
||||
|
||||
impl<'a, TPeerId, TVal> PresentEntry<'a, TPeerId, TVal>
|
||||
where
|
||||
TPeerId: Clone,
|
||||
{
|
||||
fn new(bucket: &'a mut KBucket<TPeerId, TVal>, key: &'a Key<TPeerId>) -> Self {
|
||||
PresentEntry(EntryRef { bucket, key })
|
||||
}
|
||||
|
||||
/// Returns the key of the entry.
|
||||
pub fn key(&self) -> &Key<TPeerId> {
|
||||
self.0.key
|
||||
}
|
||||
|
||||
/// Returns the value associated with the key.
|
||||
pub fn value(&mut self) -> &mut TVal {
|
||||
&mut self.0.bucket
|
||||
.get_mut(self.0.key)
|
||||
.expect("We can only build a ConnectedEntry if the entry is in the bucket; QED")
|
||||
.value
|
||||
}
|
||||
|
||||
/// Sets the status of the entry to `NodeStatus::Disconnected`.
|
||||
pub fn update(self, status: NodeStatus) -> Self {
|
||||
self.0.bucket.update(self.0.key, status);
|
||||
Self::new(self.0.bucket, self.0.key)
|
||||
}
|
||||
}
|
||||
|
||||
/// An entry waiting for a slot to be available in a bucket.
|
||||
#[derive(Debug)]
|
||||
pub struct PendingEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>);
|
||||
|
||||
impl<'a, TPeerId, TVal> PendingEntry<'a, TPeerId, TVal>
|
||||
where
|
||||
TPeerId: Clone,
|
||||
{
|
||||
fn new(bucket: &'a mut KBucket<TPeerId, TVal>, key: &'a Key<TPeerId>) -> Self {
|
||||
PendingEntry(EntryRef { bucket, key })
|
||||
}
|
||||
|
||||
/// Returns the key of the entry.
|
||||
pub fn key(&self) -> &Key<TPeerId> {
|
||||
self.0.key
|
||||
}
|
||||
|
||||
/// Returns the value associated with the key.
|
||||
pub fn value(&mut self) -> &mut TVal {
|
||||
self.0.bucket
|
||||
.pending_mut()
|
||||
.expect("We can only build a ConnectedPendingEntry if the entry is pending; QED")
|
||||
.value_mut()
|
||||
}
|
||||
|
||||
/// Updates the status of the pending entry.
|
||||
pub fn update(self, status: NodeStatus) -> PendingEntry<'a, TPeerId, TVal> {
|
||||
self.0.bucket.update_pending(status);
|
||||
PendingEntry::new(self.0.bucket, self.0.key)
|
||||
}
|
||||
}
|
||||
|
||||
/// An entry that is not present in any bucket.
|
||||
#[derive(Debug)]
|
||||
pub struct AbsentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>);
|
||||
|
||||
impl<'a, TPeerId, TVal> AbsentEntry<'a, TPeerId, TVal>
|
||||
where
|
||||
TPeerId: Clone,
|
||||
{
|
||||
fn new(bucket: &'a mut KBucket<TPeerId, TVal>, key: &'a Key<TPeerId>) -> Self {
|
||||
AbsentEntry(EntryRef { bucket, key })
|
||||
}
|
||||
|
||||
/// Returns the key of the entry.
|
||||
pub fn key(&self) -> &Key<TPeerId> {
|
||||
self.0.key
|
||||
}
|
||||
|
||||
/// Attempts to insert the entry into a bucket.
|
||||
pub fn insert(self, value: TVal, status: NodeStatus) -> InsertResult<TPeerId> {
|
||||
self.0.bucket.insert(Node {
|
||||
key: self.0.key.clone(),
|
||||
value
|
||||
}, status)
|
||||
}
|
||||
}
|
||||
|
156
protocols/kad/src/kbucket/key.rs
Normal file
156
protocols/kad/src/kbucket/key.rs
Normal file
@ -0,0 +1,156 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use bigint::U256;
|
||||
use libp2p_core::PeerId;
|
||||
use multihash::Multihash;
|
||||
use sha2::{Digest, Sha256, digest::generic_array::{GenericArray, typenum::U32}};
|
||||
|
||||
/// A `Key` is a cryptographic hash, identifying both the nodes participating in
|
||||
/// the Kademlia DHT, as well as records stored in the DHT.
|
||||
///
|
||||
/// The set of all `Key`s defines the Kademlia keyspace.
|
||||
///
|
||||
/// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of
|
||||
/// the hash digests, interpreted as an integer. See [`Key::distance`].
|
||||
///
|
||||
/// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`].
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Key<T> {
|
||||
preimage: T,
|
||||
hash: GenericArray<u8, U32>,
|
||||
}
|
||||
|
||||
impl<T> PartialEq for Key<T> {
|
||||
fn eq(&self, other: &Key<T>) -> bool {
|
||||
self.hash == other.hash
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Eq for Key<T> {}
|
||||
|
||||
impl<TPeerId> AsRef<Key<TPeerId>> for Key<TPeerId> {
|
||||
fn as_ref(&self) -> &Key<TPeerId> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Key<T> {
|
||||
/// Construct a new `Key` by hashing the bytes of the given `preimage`.
|
||||
///
|
||||
/// The preimage of type `T` is preserved. See [`Key::preimage`] and
|
||||
/// [`Key::into_preimage`].
|
||||
pub fn new(preimage: T) -> Key<T>
|
||||
where
|
||||
T: AsRef<[u8]>
|
||||
{
|
||||
let hash = Sha256::digest(preimage.as_ref());
|
||||
Key { preimage, hash }
|
||||
}
|
||||
|
||||
/// Borrows the preimage of the key.
|
||||
pub fn preimage(&self) -> &T {
|
||||
&self.preimage
|
||||
}
|
||||
|
||||
/// Converts the key into its preimage.
|
||||
pub fn into_preimage(self) -> T {
|
||||
self.preimage
|
||||
}
|
||||
|
||||
/// Computes the distance of the keys according to the XOR metric.
|
||||
pub fn distance<U>(&self, other: &Key<U>) -> Distance {
|
||||
let a = U256::from(self.hash.as_ref());
|
||||
let b = U256::from(other.hash.as_ref());
|
||||
Distance(a ^ b)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Multihash> for Key<Multihash> {
|
||||
fn from(h: Multihash) -> Self {
|
||||
let k = Key::new(h.clone().into_bytes());
|
||||
Key { preimage: h, hash: k.hash }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PeerId> for Key<PeerId> {
|
||||
fn from(peer_id: PeerId) -> Self {
|
||||
Key::new(peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// A distance between two `Key`s.
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)]
|
||||
pub struct Distance(pub(super) bigint::U256);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use quickcheck::*;
|
||||
|
||||
impl Arbitrary for Key<PeerId> {
|
||||
fn arbitrary<G: Gen>(_: &mut G) -> Key<PeerId> {
|
||||
Key::from(PeerId::random())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identity() {
|
||||
fn prop(a: Key<PeerId>) -> bool {
|
||||
a.distance(&a) == Distance::default()
|
||||
}
|
||||
quickcheck(prop as fn(_) -> _)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn symmetry() {
|
||||
fn prop(a: Key<PeerId>, b: Key<PeerId>) -> bool {
|
||||
a.distance(&b) == b.distance(&a)
|
||||
}
|
||||
quickcheck(prop as fn(_,_) -> _)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn triangle_inequality() {
|
||||
fn prop(a: Key<PeerId>, b: Key<PeerId>, c: Key<PeerId>) -> TestResult {
|
||||
let ab = a.distance(&b);
|
||||
let bc = b.distance(&c);
|
||||
let (ab_plus_bc, overflow) = ab.0.overflowing_add(bc.0);
|
||||
if overflow {
|
||||
TestResult::discard()
|
||||
} else {
|
||||
TestResult::from_bool(a.distance(&c) <= Distance(ab_plus_bc))
|
||||
}
|
||||
}
|
||||
quickcheck(prop as fn(_,_,_) -> _)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unidirectionality() {
|
||||
fn prop(a: Key<PeerId>, b: Key<PeerId>) -> bool {
|
||||
let d = a.distance(&b);
|
||||
(0 .. 100).all(|_| {
|
||||
let c = Key::from(PeerId::random());
|
||||
a.distance(&c) != d || b == c
|
||||
})
|
||||
}
|
||||
quickcheck(prop as fn(_,_) -> _)
|
||||
}
|
||||
}
|
@ -18,7 +18,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Kademlia protocol. Allows peer discovery, records store and records fetch.
|
||||
//! Implementation of the Kademlia protocol for libp2p.
|
||||
|
||||
// TODO: we allow dead_code for now because this library contains a lot of unused code that will
|
||||
// be useful later for record store
|
||||
|
@ -25,6 +25,10 @@
|
||||
//! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used
|
||||
//! to poll the underlying transport for incoming messages, and the `Sink` component
|
||||
//! is used to send messages to remote peers.
|
||||
//!
|
||||
//! [`KademliaProtocolConfig`]: protocol::KademliaProtocolConfig
|
||||
//! [`KadRequestMsg`]: protocol::KadRequestMsg
|
||||
//! [`KadResponseMsg`]: protocol::KadResponseMsg
|
||||
|
||||
use bytes::BytesMut;
|
||||
use codec::UviBytes;
|
||||
|
@ -350,7 +350,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the query and returns the targe tand known closest peers.
|
||||
/// Consumes the query and returns the target and known closest peers.
|
||||
///
|
||||
/// > **Note**: This can be called at any time, but you normally only do that once the query
|
||||
/// > is finished.
|
||||
|
Loading…
x
Reference in New Issue
Block a user