From 1b3a30aa6cef3c5e90c4a6c7ea5c626c3e6c4252 Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Tue, 17 Mar 2020 18:35:39 +0300 Subject: [PATCH] Add public key to buckets WIP --- protocols/kad/src/addresses.rs | 18 ++++++++++++++++-- protocols/kad/src/behaviour.rs | 24 +++++++++++++----------- protocols/kad/src/contact.rs | 14 +++++++++++--- 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/protocols/kad/src/addresses.rs b/protocols/kad/src/addresses.rs index 4c894a40..1198a927 100644 --- a/protocols/kad/src/addresses.rs +++ b/protocols/kad/src/addresses.rs @@ -28,6 +28,12 @@ pub struct Addresses { addrs: SmallVec<[Multiaddr; 6]>, } +#[derive(PartialEq, Eq, Debug)] +pub enum Remove { + Completely = 0, + KeepLast = 1 +} + impl Addresses { /// Creates a new list of addresses. pub fn new(addr: Multiaddr) -> Addresses { @@ -64,8 +70,8 @@ impl Addresses { /// /// An address should only be removed if is determined to be invalid or /// otherwise unreachable. - pub fn remove(&mut self, addr: &Multiaddr) -> Result<(),()> { - if self.addrs.len() == 1 { + pub fn remove(&mut self, addr: &Multiaddr, mode: Remove) -> Result<(),()> { + if mode == Remove::KeepLast && self.addrs.len() == 1 { return Err(()) } @@ -93,6 +99,14 @@ impl Addresses { } } +impl From> for Addresses { + fn from(addrs: SmallVec<[Multiaddr; 6]>) -> Self { + Addresses { + addrs + } + } +} + impl fmt::Debug for Addresses { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_list() diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index fc0e3e2f..1b55b554 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -23,7 +23,7 @@ mod test; use crate::K_VALUE; -use crate::addresses::Addresses; +use crate::addresses::{Addresses, Remove}; use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; use crate::jobs::*; use crate::kbucket::{self, KBucketsTable, NodeStatus}; @@ -558,8 +558,7 @@ where for peer in others_iter.clone() { log::trace!("Peer {:?} reported by {:?} in query {:?}.", peer, source, query_id); - let addrs = peer.multiaddrs.iter().cloned().collect(); - query.inner.addresses.insert(peer.node_id.clone(), addrs); // TODO: ??? + query.inner.contacts.insert(peer.node_id.clone(), peer.clone().into()); } query.on_success(source, others_iter.cloned().map(|kp| kp.node_id)) } @@ -1077,7 +1076,7 @@ where // We add to that a temporary list of addresses from the ongoing queries. for query in self.queries.iter() { - if let Some(addrs) = query.inner.addresses.get(peer_id) { + if let Some(addrs) = query.inner.contacts.get(peer_id) { peer_addrs.extend(addrs.iter().cloned()) } } @@ -1096,6 +1095,8 @@ where self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, event, handler: NotifyHandler::Any }); + + // TODO: collect inner.contacts here, and pass them to connection_updated } // The remote's address can only be put into the routing table, @@ -1120,13 +1121,13 @@ where if let Some(peer_id) = peer_id { let key = kbucket::Key::new(peer_id.clone()); - if let Some(addrs) = self.kbuckets.entry(&key).value() { + if let Some(contact) = self.kbuckets.entry(&key).value() { // TODO: Ideally, the address should only be removed if the error can // be classified as "permanent" but since `err` is currently a borrowed // trait object without a `'static` bound, even downcasting for inspection // of the error is not possible (and also not truly desirable or ergonomic). // The error passed in should rather be a dedicated enum. - if addrs.remove(addr).is_ok() { + if contact.addresses.remove(addr, Remove::KeepLast).is_ok() { debug!("Address '{}' removed from peer '{}' due to error: {}.", addr, peer_id, err); } else { @@ -1145,8 +1146,9 @@ where } for query in self.queries.iter_mut() { - if let Some(addrs) = query.inner.addresses.get_mut(peer_id) { - addrs.retain(|a| a != addr); + if let Some(contact) = query.inner.contacts.get_mut(peer_id) { + // It's safe to unwrap because there's no errors on Remove::Completely + contact.addresses.remove(addr, Remove::Completely).unwrap(); } } } @@ -1782,8 +1784,8 @@ impl From, Contact>> for KadPeer { struct QueryInner { /// The query-specific state. info: QueryInfo, - /// Addresses of peers discovered during a query. - addresses: FnvHashMap>, + /// Contacts of peers discovered during a query. + contacts: FnvHashMap, /// A map of pending requests to peers. /// /// A request is pending if the targeted peer is not currently connected @@ -1795,7 +1797,7 @@ impl QueryInner { fn new(info: QueryInfo) -> Self { QueryInner { info, - addresses: Default::default(), + contacts: Default::default(), pending_rpcs: SmallVec::default() } } diff --git a/protocols/kad/src/contact.rs b/protocols/kad/src/contact.rs index f7f7b6a1..a285b286 100644 --- a/protocols/kad/src/contact.rs +++ b/protocols/kad/src/contact.rs @@ -20,6 +20,8 @@ use crate::Addresses; use libp2p_core::{Multiaddr}; use libp2p_core::identity::ed25519::PublicKey; +use crate::protocol::KadPeer; +use smallvec::SmallVec; #[derive(Clone)] pub struct Contact { @@ -40,9 +42,6 @@ impl Contact { pub fn into_vec(self) -> Vec { self.addresses.into_vec() } - pub fn remove(&mut self, addr: &Multiaddr) -> Result<(),()> { - self.addresses.remove(addr) - } pub fn insert(&mut self, addr: Multiaddr) -> bool { self.addresses.insert(addr) } @@ -53,3 +52,12 @@ impl Into for Contact { self.addresses } } + +impl From for Contact { + fn from(peer: KadPeer) -> Self { + Contact { + addresses: peer.multiaddrs.iter().cloned().collect::>().into(), + public_key: peer.public_key + } + } +}