[Kademlia] Rehash PeerId before inserting in a KBucketsTable (#1025)

Add KadHash as the type to be used as key within KBuckets and replace PeerId.
This commit is contained in:
elferdo 2019-03-26 16:17:34 +01:00 committed by GitHub
parent 538c3dffdf
commit 603c7be7c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 152 additions and 96 deletions

View File

@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
arrayref = "0.3"
arrayvec = "0.4.7"
bs58 = "0.2.0"
bigint = "4.2"

View File

@ -20,6 +20,7 @@
use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kad_hash::KadHash;
use crate::kbucket::{self, KBucketsTable, KBucketsPeerId};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
@ -28,9 +29,8 @@ use futures::{prelude::*, stream};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use multihash::Multihash;
use rand;
use smallvec::SmallVec;
use std::{cmp::Ordering, error, marker::PhantomData, num::NonZeroUsize, time::Duration, time::Instant};
use std::{error, marker::PhantomData, num::NonZeroUsize, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Interval;
@ -39,7 +39,7 @@ mod test;
/// Network behaviour that handles Kademlia.
pub struct Kademlia<TSubstream> {
/// Storage for the nodes. Contains the known multiaddresses for this node.
kbuckets: KBucketsTable<PeerId, Addresses>,
kbuckets: KBucketsTable<KadHash, Addresses>,
/// All the iterative queries we are currently performing, with their ID. The last parameter
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
@ -174,7 +174,7 @@ impl QueryInfo {
key: target.clone().into(),
user_data,
},
QueryInfoInner::AddProvider { target, .. } => KademliaHandlerIn::FindNodeReq {
QueryInfoInner::AddProvider { .. } => KademliaHandlerIn::FindNodeReq {
key: unimplemented!(), // TODO: target.clone(),
user_data,
},
@ -186,7 +186,7 @@ impl<TSubstream> Kademlia<TSubstream> {
/// Creates a `Kademlia`.
#[inline]
pub fn new(local_peer_id: PeerId) -> Self {
Self::new_inner(local_peer_id, true)
Self::new_inner(local_peer_id)
}
/// Creates a `Kademlia`.
@ -194,8 +194,9 @@ impl<TSubstream> Kademlia<TSubstream> {
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
/// the DHT and fill our buckets.
#[inline]
#[deprecated(note="this function is now equivalent to new() and will be removed in the future")]
pub fn without_init(local_peer_id: PeerId) -> Self {
Self::new_inner(local_peer_id, false)
Self::new_inner(local_peer_id)
}
/// Adds a known address for the given `PeerId`. We are connected to this address.
@ -212,8 +213,10 @@ impl<TSubstream> Kademlia<TSubstream> {
}
/// Underlying implementation for `add_connected_address` and `add_not_connected_address`.
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, connected: bool) {
match self.kbuckets.entry(peer_id) {
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) {
let kad_hash = KadHash::from(peer_id.clone());
match self.kbuckets.entry(&kad_hash) {
kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address),
@ -232,7 +235,7 @@ impl<TSubstream> Kademlia<TSubstream> {
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.clone(),
peer_id: to_ping.peer_id().clone(),
})
},
}
@ -243,11 +246,11 @@ impl<TSubstream> Kademlia<TSubstream> {
}
/// Inner implementation of the constructors.
fn new_inner(local_peer_id: PeerId, initialize: bool) -> Self {
fn new_inner(local_peer_id: PeerId) -> Self {
let parallelism = 3;
let mut behaviour = Kademlia {
kbuckets: KBucketsTable::new(local_peer_id, Duration::from_secs(60)), // TODO: constant
Kademlia {
kbuckets: KBucketsTable::new(KadHash::from(local_peer_id), Duration::from_secs(60)), // TODO: constant
queued_events: SmallVec::new(),
active_queries: Default::default(),
connected_peers: Default::default(),
@ -261,37 +264,12 @@ impl<TSubstream> Kademlia<TSubstream> {
rpc_timeout: Duration::from_secs(8),
add_provider: SmallVec::new(),
marker: PhantomData,
};
if initialize {
behaviour.initialize();
}
behaviour
}
/// Returns an iterator to all the peer IDs in the bucket, without the pending nodes.
pub fn kbuckets_entries(&self) -> impl Iterator<Item = &PeerId> {
self.kbuckets.entries_not_pending().map(|(id, _)| id)
}
/// Performs the Kademlia initialization process.
///
/// If you called `new` to create the `Kademlia`, then this has been started. Calling this
/// method manually is useful in order to re-perform the initialization later.
///
/// Starts one query per bucket with the intention of connecting to nodes along the way and
/// fill our own buckets. This also adds the effect of adding our local node to other nodes'
/// buckets.
pub fn initialize(&mut self) {
for n in 0..256 { // TODO: 256 should be grabbed from the kbuckets module
let target = match gen_random_id(self.kbuckets.my_id(), n) {
Ok(p) => p,
Err(()) => continue,
};
self.start_query(QueryInfoInner::Initialization { target });
}
self.kbuckets.entries_not_pending().map(|(kad_hash, _)| kad_hash.peer_id())
}
/// Starts an iterative `FIND_NODE` request.
@ -319,8 +297,8 @@ impl<TSubstream> Kademlia<TSubstream> {
self.providing_keys.insert(key.clone().into());
let providers = self.values_providers.entry(key.into()).or_insert_with(Default::default);
let my_id = self.kbuckets.my_id();
if !providers.iter().any(|k| k == my_id) {
providers.push(my_id.clone());
if !providers.iter().any(|peer_id| peer_id == my_id.peer_id()) {
providers.push(my_id.peer_id().clone());
}
// Trigger the next refresh now.
@ -357,7 +335,8 @@ impl<TSubstream> Kademlia<TSubstream> {
let known_closest_peers = self.kbuckets
.find_closest(target.as_ref())
.take(self.num_results);
.take(self.num_results)
.map(|h| h.peer_id().clone());
self.active_queries.insert(
query_id,
@ -387,7 +366,7 @@ where
// We should order addresses from decreasing likelyhood of connectivity, so start with
// the addresses of that peer in the k-buckets.
let mut out_list = self.kbuckets
.entry(peer_id)
.entry(&KadHash::from(peer_id.clone()))
.value_not_pending()
.map(|l| l.iter().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new);
@ -418,7 +397,9 @@ where
ConnectedPoint::Listener { .. } => None,
};
match self.kbuckets.entry(&id) {
let id_kad_hash = KadHash::from(id.clone());
match self.kbuckets.entry(&id_kad_hash) {
kbucket::Entry::InKbucketConnected(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
@ -456,7 +437,7 @@ where
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.clone(),
peer_id: to_ping.peer_id().clone(),
})
},
}
@ -472,14 +453,16 @@ where
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) {
if let Some(peer_id) = peer_id {
if let Some(list) = self.kbuckets.entry(peer_id).value() {
let id_kad_hash = KadHash::from(peer_id.clone());
if let Some(list) = self.kbuckets.entry(&id_kad_hash).value() {
// TODO: don't remove the address if the error is that we are already connected
// to this peer
list.remove(addr);
}
for query in self.active_queries.values_mut() {
if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(peer_id) {
if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(id_kad_hash.peer_id()) {
addrs.retain(|a| a != addr);
}
}
@ -492,7 +475,7 @@ where
}
}
fn inject_disconnected(&mut self, id: &PeerId, old_endpoint: ConnectedPoint) {
fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
let was_in = self.connected_peers.remove(id);
debug_assert!(was_in);
@ -500,13 +483,13 @@ where
query.inject_rpc_error(id);
}
match self.kbuckets.entry(id) {
match self.kbuckets.entry(&KadHash::from(id.clone())) {
kbucket::Entry::InKbucketConnected(entry) => {
match entry.set_disconnected() {
kbucket::SetDisconnectedOutcome::Kept(_) => {},
kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => {
let event = KademliaOut::KBucketAdded {
peer_id: replacement,
peer_id: replacement.peer_id().clone(),
replaced: Some(id.clone()),
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
@ -540,7 +523,7 @@ where
}
}
if let Some(list) = self.kbuckets.entry(&peer_id).value() {
if let Some(list) = self.kbuckets.entry(&KadHash::from(peer_id)).value() {
if let ConnectedPoint::Dialer { address } = new_endpoint {
list.insert(address);
}
@ -551,9 +534,9 @@ where
match event {
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.kbuckets
.find_closest(&key)
.find_closest(&KadHash::from(key.clone()))
.take(self.num_results)
.map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets))
.map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets))
.collect();
self.queued_events.push(NetworkBehaviourAction::SendEvent {
@ -589,7 +572,7 @@ where
let closer_peers = self.kbuckets
.find_closest(&key)
.take(self.num_results)
.map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets))
.map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets))
.collect();
let provider_peers = {
@ -598,7 +581,7 @@ where
.get(&key)
.into_iter()
.flat_map(|peers| peers)
.map(move |peer_id| build_kad_peer(peer_id.clone(), kbuckets))
.map(move |peer_id| build_kad_peer(&KadHash::from(peer_id.clone()), kbuckets))
.collect()
};
@ -670,11 +653,11 @@ where
// Flush the changes to the topology that we want to make.
for (key, provider) in self.add_provider.drain() {
// Don't add ourselves to the providers.
if provider == *self.kbuckets.my_id() {
if provider == *self.kbuckets.my_id().peer_id() {
continue;
}
let providers = self.values_providers.entry(key).or_insert_with(Default::default);
if !providers.iter().any(|k| k == &provider) {
if !providers.iter().any(|peer_id| peer_id == &provider) {
providers.push(provider);
}
}
@ -766,7 +749,7 @@ where
peer_id: closest,
event: KademliaHandlerIn::AddProvider {
key: target.clone(),
provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &mut self.kbuckets),
provider_peer: build_kad_peer(&KadHash::from(parameters.local_peer_id().clone()), &mut self.kbuckets),
},
};
@ -824,58 +807,25 @@ pub enum KademliaOut {
},
}
// Generates a random `PeerId` that belongs to the given bucket.
//
// Returns an error if `bucket_num` is out of range.
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
let my_id_len = my_id.as_bytes().len();
// TODO: this 2 is magic here; it is the length of the hash of the multihash
let bits_diff = bucket_num + 1;
if bits_diff > 8 * (my_id_len - 2) {
return Err(());
}
let mut random_id = [0; 64];
for byte in 0..my_id_len {
match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) {
Ordering::Less => {
random_id[byte] = my_id.as_bytes()[byte];
}
Ordering::Equal => {
let mask: u8 = (1 << (bits_diff % 8)) - 1;
random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::<u8>() & mask);
}
Ordering::Greater => {
random_id[byte] = rand::random();
}
}
}
let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned())
.expect("randomly-generated peer ID should always be valid");
Ok(peer_id)
}
/// Builds a `KadPeer` struct corresponding to the given `PeerId`.
/// The `PeerId` cannot be the same as the local one.
///
/// > **Note**: This is just a convenience function that doesn't do anything note-worthy.
fn build_kad_peer(
peer_id: PeerId,
kbuckets: &mut KBucketsTable<PeerId, Addresses>
kad_hash: &KadHash,
kbuckets: &mut KBucketsTable<KadHash, Addresses>
) -> KadPeer {
let (multiaddrs, connection_ty) = match kbuckets.entry(&peer_id) {
let (multiaddrs, connection_ty) = match kbuckets.entry(kad_hash) {
kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection?
kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the local ID"),
kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the KadHash of the local ID"),
};
KadPeer {
node_id: peer_id,
node_id: kad_hash.peer_id().clone(),
multiaddrs,
connection_ty,
}

View File

@ -0,0 +1,73 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Inside a KBucketsTable we would like to store the hash of a PeerId
//! even if a PeerId is itself already a hash. When querying the table
//! we may be interested in getting the PeerId back. This module provides
//! a struct, KadHash that stores together a PeerId and its hash for
//! convenience.
use arrayref::array_ref;
use libp2p_core::PeerId;
/// Used as key in a KBucketsTable for Kademlia. Stores the hash of a
/// PeerId, and the PeerId itself because it may need to be queried.
#[derive(Clone, Debug, PartialEq)]
pub struct KadHash {
peer_id: PeerId,
hash: [u8; 32],
}
/// Provide convenience getters.
impl KadHash {
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn hash(&self) -> &[u8; 32] {
&self.hash
}
}
impl From<PeerId> for KadHash {
fn from(peer_id: PeerId) -> Self {
let encoding = multihash::encode(multihash::Hash::SHA2256, peer_id.as_bytes()).expect("sha2-256 is always supported");
KadHash{
peer_id: peer_id,
hash: array_ref!(encoding.digest(), 0, 32).clone(),
}
}
}
impl PartialEq<multihash::Multihash> for KadHash {
#[inline]
fn eq(&self, other: &multihash::Multihash) -> bool {
self.hash() == other.digest()
}
}
impl PartialEq<KadHash> for multihash::Multihash {
#[inline]
fn eq(&self, other: &KadHash) -> bool {
self.digest() == other.hash()
}
}

View File

@ -26,7 +26,8 @@
//! corresponding to its distance with the reference key.
use arrayvec::ArrayVec;
use bigint::U512;
use bigint::{U512, U256};
use crate::kad_hash::KadHash;
use libp2p_core::PeerId;
use multihash::Multihash;
use std::num::NonZeroUsize;
@ -120,6 +121,36 @@ impl KBucketsPeerId<PeerId> for Multihash {
}
}
impl KBucketsPeerId for KadHash {
fn distance_with(&self, other: &Self) -> u32 {
// Note that we don't compare the hash functions because there's no chance of collision
// of the same value hashed with two different hash functions.
let my_hash = U256::from(self.hash());
let other_hash = U256::from(other.hash());
let xor = my_hash ^ other_hash;
256 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
// Hash is SHA2256, so fixed value
NonZeroUsize::new(256).expect("256 is not zero; QED")
}
}
impl KBucketsPeerId<KadHash> for Multihash {
fn distance_with(&self, other: &KadHash) -> u32 {
let my_hash = U512::from(self.digest());
let other_hash = U512::from(U256::from(other.hash()));
let xor = my_hash ^ other_hash;
512 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
NonZeroUsize::new(512).expect("512 is not zero; QED")
}
}
impl KBucketsPeerId for Multihash {
fn distance_with(&self, other: &Self) -> u32 {
// Note that we don't compare the hash functions because there's no chance of collision

View File

@ -66,5 +66,6 @@ pub mod protocol;
mod addresses;
mod behaviour;
mod kad_hash;
mod protobuf_structs;
mod query;