diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index b7caa59a..2b145cfc 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +arrayref = "0.3" arrayvec = "0.4.7" bs58 = "0.2.0" bigint = "4.2" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 0276268c..cda99fda 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -20,6 +20,7 @@ use crate::addresses::Addresses; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn}; +use crate::kad_hash::KadHash; use crate::kbucket::{self, KBucketsTable, KBucketsPeerId}; use crate::protocol::{KadConnectionType, KadPeer}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; @@ -28,9 +29,8 @@ use futures::{prelude::*, stream}; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use multihash::Multihash; -use rand; use smallvec::SmallVec; -use std::{cmp::Ordering, error, marker::PhantomData, num::NonZeroUsize, time::Duration, time::Instant}; +use std::{error, marker::PhantomData, num::NonZeroUsize, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Interval; @@ -39,7 +39,7 @@ mod test; /// Network behaviour that handles Kademlia. pub struct Kademlia { /// Storage for the nodes. Contains the known multiaddresses for this node. - kbuckets: KBucketsTable, + kbuckets: KBucketsTable, /// All the iterative queries we are currently performing, with their ID. The last parameter /// is the list of accumulated providers for `GET_PROVIDERS` queries. @@ -174,7 +174,7 @@ impl QueryInfo { key: target.clone().into(), user_data, }, - QueryInfoInner::AddProvider { target, .. } => KademliaHandlerIn::FindNodeReq { + QueryInfoInner::AddProvider { .. } => KademliaHandlerIn::FindNodeReq { key: unimplemented!(), // TODO: target.clone(), user_data, }, @@ -186,7 +186,7 @@ impl Kademlia { /// Creates a `Kademlia`. #[inline] pub fn new(local_peer_id: PeerId) -> Self { - Self::new_inner(local_peer_id, true) + Self::new_inner(local_peer_id) } /// Creates a `Kademlia`. @@ -194,8 +194,9 @@ impl Kademlia { /// Contrary to `new`, doesn't perform the initialization queries that store our local ID into /// the DHT and fill our buckets. #[inline] + #[deprecated(note="this function is now equivalent to new() and will be removed in the future")] pub fn without_init(local_peer_id: PeerId) -> Self { - Self::new_inner(local_peer_id, false) + Self::new_inner(local_peer_id) } /// Adds a known address for the given `PeerId`. We are connected to this address. @@ -212,8 +213,10 @@ impl Kademlia { } /// Underlying implementation for `add_connected_address` and `add_not_connected_address`. - fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, connected: bool) { - match self.kbuckets.entry(peer_id) { + fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) { + let kad_hash = KadHash::from(peer_id.clone()); + + match self.kbuckets.entry(&kad_hash) { kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address), kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address), kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address), @@ -232,7 +235,7 @@ impl Kademlia { kbucket::InsertOutcome::Full => (), kbucket::InsertOutcome::Pending { to_ping } => { self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.clone(), + peer_id: to_ping.peer_id().clone(), }) }, } @@ -243,11 +246,11 @@ impl Kademlia { } /// Inner implementation of the constructors. - fn new_inner(local_peer_id: PeerId, initialize: bool) -> Self { + fn new_inner(local_peer_id: PeerId) -> Self { let parallelism = 3; - let mut behaviour = Kademlia { - kbuckets: KBucketsTable::new(local_peer_id, Duration::from_secs(60)), // TODO: constant + Kademlia { + kbuckets: KBucketsTable::new(KadHash::from(local_peer_id), Duration::from_secs(60)), // TODO: constant queued_events: SmallVec::new(), active_queries: Default::default(), connected_peers: Default::default(), @@ -261,37 +264,12 @@ impl Kademlia { rpc_timeout: Duration::from_secs(8), add_provider: SmallVec::new(), marker: PhantomData, - }; - - if initialize { - behaviour.initialize(); } - - behaviour } /// Returns an iterator to all the peer IDs in the bucket, without the pending nodes. pub fn kbuckets_entries(&self) -> impl Iterator { - self.kbuckets.entries_not_pending().map(|(id, _)| id) - } - - /// Performs the Kademlia initialization process. - /// - /// If you called `new` to create the `Kademlia`, then this has been started. Calling this - /// method manually is useful in order to re-perform the initialization later. - /// - /// Starts one query per bucket with the intention of connecting to nodes along the way and - /// fill our own buckets. This also adds the effect of adding our local node to other nodes' - /// buckets. - pub fn initialize(&mut self) { - for n in 0..256 { // TODO: 256 should be grabbed from the kbuckets module - let target = match gen_random_id(self.kbuckets.my_id(), n) { - Ok(p) => p, - Err(()) => continue, - }; - - self.start_query(QueryInfoInner::Initialization { target }); - } + self.kbuckets.entries_not_pending().map(|(kad_hash, _)| kad_hash.peer_id()) } /// Starts an iterative `FIND_NODE` request. @@ -319,8 +297,8 @@ impl Kademlia { self.providing_keys.insert(key.clone().into()); let providers = self.values_providers.entry(key.into()).or_insert_with(Default::default); let my_id = self.kbuckets.my_id(); - if !providers.iter().any(|k| k == my_id) { - providers.push(my_id.clone()); + if !providers.iter().any(|peer_id| peer_id == my_id.peer_id()) { + providers.push(my_id.peer_id().clone()); } // Trigger the next refresh now. @@ -357,7 +335,8 @@ impl Kademlia { let known_closest_peers = self.kbuckets .find_closest(target.as_ref()) - .take(self.num_results); + .take(self.num_results) + .map(|h| h.peer_id().clone()); self.active_queries.insert( query_id, @@ -387,7 +366,7 @@ where // We should order addresses from decreasing likelyhood of connectivity, so start with // the addresses of that peer in the k-buckets. let mut out_list = self.kbuckets - .entry(peer_id) + .entry(&KadHash::from(peer_id.clone())) .value_not_pending() .map(|l| l.iter().cloned().collect::>()) .unwrap_or_else(Vec::new); @@ -418,7 +397,9 @@ where ConnectedPoint::Listener { .. } => None, }; - match self.kbuckets.entry(&id) { + let id_kad_hash = KadHash::from(id.clone()); + + match self.kbuckets.entry(&id_kad_hash) { kbucket::Entry::InKbucketConnected(_) => { unreachable!("Kbuckets are always kept in sync with the connection state; QED") }, @@ -456,7 +437,7 @@ where kbucket::InsertOutcome::Full => (), kbucket::InsertOutcome::Pending { to_ping } => { self.queued_events.push(NetworkBehaviourAction::DialPeer { - peer_id: to_ping.clone(), + peer_id: to_ping.peer_id().clone(), }) }, } @@ -472,14 +453,16 @@ where fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) { if let Some(peer_id) = peer_id { - if let Some(list) = self.kbuckets.entry(peer_id).value() { + let id_kad_hash = KadHash::from(peer_id.clone()); + + if let Some(list) = self.kbuckets.entry(&id_kad_hash).value() { // TODO: don't remove the address if the error is that we are already connected // to this peer list.remove(addr); } for query in self.active_queries.values_mut() { - if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(peer_id) { + if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(id_kad_hash.peer_id()) { addrs.retain(|a| a != addr); } } @@ -492,7 +475,7 @@ where } } - fn inject_disconnected(&mut self, id: &PeerId, old_endpoint: ConnectedPoint) { + fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) { let was_in = self.connected_peers.remove(id); debug_assert!(was_in); @@ -500,13 +483,13 @@ where query.inject_rpc_error(id); } - match self.kbuckets.entry(id) { + match self.kbuckets.entry(&KadHash::from(id.clone())) { kbucket::Entry::InKbucketConnected(entry) => { match entry.set_disconnected() { kbucket::SetDisconnectedOutcome::Kept(_) => {}, kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => { let event = KademliaOut::KBucketAdded { - peer_id: replacement, + peer_id: replacement.peer_id().clone(), replaced: Some(id.clone()), }; self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event)); @@ -540,7 +523,7 @@ where } } - if let Some(list) = self.kbuckets.entry(&peer_id).value() { + if let Some(list) = self.kbuckets.entry(&KadHash::from(peer_id)).value() { if let ConnectedPoint::Dialer { address } = new_endpoint { list.insert(address); } @@ -551,9 +534,9 @@ where match event { KademliaHandlerEvent::FindNodeReq { key, request_id } => { let closer_peers = self.kbuckets - .find_closest(&key) + .find_closest(&KadHash::from(key.clone())) .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets)) + .map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets)) .collect(); self.queued_events.push(NetworkBehaviourAction::SendEvent { @@ -589,7 +572,7 @@ where let closer_peers = self.kbuckets .find_closest(&key) .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, &mut self.kbuckets)) + .map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets)) .collect(); let provider_peers = { @@ -598,7 +581,7 @@ where .get(&key) .into_iter() .flat_map(|peers| peers) - .map(move |peer_id| build_kad_peer(peer_id.clone(), kbuckets)) + .map(move |peer_id| build_kad_peer(&KadHash::from(peer_id.clone()), kbuckets)) .collect() }; @@ -670,11 +653,11 @@ where // Flush the changes to the topology that we want to make. for (key, provider) in self.add_provider.drain() { // Don't add ourselves to the providers. - if provider == *self.kbuckets.my_id() { + if provider == *self.kbuckets.my_id().peer_id() { continue; } let providers = self.values_providers.entry(key).or_insert_with(Default::default); - if !providers.iter().any(|k| k == &provider) { + if !providers.iter().any(|peer_id| peer_id == &provider) { providers.push(provider); } } @@ -766,7 +749,7 @@ where peer_id: closest, event: KademliaHandlerIn::AddProvider { key: target.clone(), - provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &mut self.kbuckets), + provider_peer: build_kad_peer(&KadHash::from(parameters.local_peer_id().clone()), &mut self.kbuckets), }, }; @@ -824,58 +807,25 @@ pub enum KademliaOut { }, } -// Generates a random `PeerId` that belongs to the given bucket. -// -// Returns an error if `bucket_num` is out of range. -fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result { - let my_id_len = my_id.as_bytes().len(); - - // TODO: this 2 is magic here; it is the length of the hash of the multihash - let bits_diff = bucket_num + 1; - if bits_diff > 8 * (my_id_len - 2) { - return Err(()); - } - - let mut random_id = [0; 64]; - for byte in 0..my_id_len { - match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) { - Ordering::Less => { - random_id[byte] = my_id.as_bytes()[byte]; - } - Ordering::Equal => { - let mask: u8 = (1 << (bits_diff % 8)) - 1; - random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::() & mask); - } - Ordering::Greater => { - random_id[byte] = rand::random(); - } - } - } - - let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned()) - .expect("randomly-generated peer ID should always be valid"); - Ok(peer_id) -} - /// Builds a `KadPeer` struct corresponding to the given `PeerId`. /// The `PeerId` cannot be the same as the local one. /// /// > **Note**: This is just a convenience function that doesn't do anything note-worthy. fn build_kad_peer( - peer_id: PeerId, - kbuckets: &mut KBucketsTable + kad_hash: &KadHash, + kbuckets: &mut KBucketsTable ) -> KadPeer { - let (multiaddrs, connection_ty) = match kbuckets.entry(&peer_id) { + let (multiaddrs, connection_ty) = match kbuckets.entry(kad_hash) { kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection? kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), - kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the local ID"), + kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the KadHash of the local ID"), }; KadPeer { - node_id: peer_id, + node_id: kad_hash.peer_id().clone(), multiaddrs, connection_ty, } diff --git a/protocols/kad/src/kad_hash.rs b/protocols/kad/src/kad_hash.rs new file mode 100644 index 00000000..f630ea84 --- /dev/null +++ b/protocols/kad/src/kad_hash.rs @@ -0,0 +1,73 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Inside a KBucketsTable we would like to store the hash of a PeerId +//! even if a PeerId is itself already a hash. When querying the table +//! we may be interested in getting the PeerId back. This module provides +//! a struct, KadHash that stores together a PeerId and its hash for +//! convenience. + +use arrayref::array_ref; +use libp2p_core::PeerId; + +/// Used as key in a KBucketsTable for Kademlia. Stores the hash of a +/// PeerId, and the PeerId itself because it may need to be queried. +#[derive(Clone, Debug, PartialEq)] +pub struct KadHash { + peer_id: PeerId, + hash: [u8; 32], +} + +/// Provide convenience getters. +impl KadHash { + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } + + pub fn hash(&self) -> &[u8; 32] { + &self.hash + } +} + +impl From for KadHash { + fn from(peer_id: PeerId) -> Self { + let encoding = multihash::encode(multihash::Hash::SHA2256, peer_id.as_bytes()).expect("sha2-256 is always supported"); + + KadHash{ + peer_id: peer_id, + hash: array_ref!(encoding.digest(), 0, 32).clone(), + } + } +} + +impl PartialEq for KadHash { + #[inline] + fn eq(&self, other: &multihash::Multihash) -> bool { + self.hash() == other.digest() + } +} + + +impl PartialEq for multihash::Multihash { + #[inline] + fn eq(&self, other: &KadHash) -> bool { + self.digest() == other.hash() + } +} diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index e3e19e0b..87c85f69 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -26,7 +26,8 @@ //! corresponding to its distance with the reference key. use arrayvec::ArrayVec; -use bigint::U512; +use bigint::{U512, U256}; +use crate::kad_hash::KadHash; use libp2p_core::PeerId; use multihash::Multihash; use std::num::NonZeroUsize; @@ -120,6 +121,36 @@ impl KBucketsPeerId for Multihash { } } +impl KBucketsPeerId for KadHash { + fn distance_with(&self, other: &Self) -> u32 { + // Note that we don't compare the hash functions because there's no chance of collision + // of the same value hashed with two different hash functions. + let my_hash = U256::from(self.hash()); + let other_hash = U256::from(other.hash()); + let xor = my_hash ^ other_hash; + 256 - xor.leading_zeros() + } + + fn max_distance() -> NonZeroUsize { + // Hash is SHA2256, so fixed value + NonZeroUsize::new(256).expect("256 is not zero; QED") + } +} + +impl KBucketsPeerId for Multihash { + fn distance_with(&self, other: &KadHash) -> u32 { + let my_hash = U512::from(self.digest()); + let other_hash = U512::from(U256::from(other.hash())); + + let xor = my_hash ^ other_hash; + 512 - xor.leading_zeros() + } + + fn max_distance() -> NonZeroUsize { + NonZeroUsize::new(512).expect("512 is not zero; QED") + } +} + impl KBucketsPeerId for Multihash { fn distance_with(&self, other: &Self) -> u32 { // Note that we don't compare the hash functions because there's no chance of collision diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index f91f209c..6d523f8e 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -66,5 +66,6 @@ pub mod protocol; mod addresses; mod behaviour; +mod kad_hash; mod protobuf_structs; mod query;