mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-07-31 00:41:59 +00:00
Add public key to buckets WIP
This commit is contained in:
@@ -28,6 +28,12 @@ pub struct Addresses {
|
||||
addrs: SmallVec<[Multiaddr; 6]>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum Remove {
|
||||
Completely = 0,
|
||||
KeepLast = 1
|
||||
}
|
||||
|
||||
impl Addresses {
|
||||
/// Creates a new list of addresses.
|
||||
pub fn new(addr: Multiaddr) -> Addresses {
|
||||
@@ -64,8 +70,8 @@ impl Addresses {
|
||||
///
|
||||
/// An address should only be removed if is determined to be invalid or
|
||||
/// otherwise unreachable.
|
||||
pub fn remove(&mut self, addr: &Multiaddr) -> Result<(),()> {
|
||||
if self.addrs.len() == 1 {
|
||||
pub fn remove(&mut self, addr: &Multiaddr, mode: Remove) -> Result<(),()> {
|
||||
if mode == Remove::KeepLast && self.addrs.len() == 1 {
|
||||
return Err(())
|
||||
}
|
||||
|
||||
@@ -93,6 +99,14 @@ impl Addresses {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SmallVec<[Multiaddr; 6]>> for Addresses {
|
||||
fn from(addrs: SmallVec<[Multiaddr; 6]>) -> Self {
|
||||
Addresses {
|
||||
addrs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Addresses {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_list()
|
||||
|
@@ -23,7 +23,7 @@
|
||||
mod test;
|
||||
|
||||
use crate::K_VALUE;
|
||||
use crate::addresses::Addresses;
|
||||
use crate::addresses::{Addresses, Remove};
|
||||
use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
|
||||
use crate::jobs::*;
|
||||
use crate::kbucket::{self, KBucketsTable, NodeStatus};
|
||||
@@ -558,8 +558,7 @@ where
|
||||
for peer in others_iter.clone() {
|
||||
log::trace!("Peer {:?} reported by {:?} in query {:?}.",
|
||||
peer, source, query_id);
|
||||
let addrs = peer.multiaddrs.iter().cloned().collect();
|
||||
query.inner.addresses.insert(peer.node_id.clone(), addrs); // TODO: ???
|
||||
query.inner.contacts.insert(peer.node_id.clone(), peer.clone().into());
|
||||
}
|
||||
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
|
||||
}
|
||||
@@ -1077,7 +1076,7 @@ where
|
||||
|
||||
// We add to that a temporary list of addresses from the ongoing queries.
|
||||
for query in self.queries.iter() {
|
||||
if let Some(addrs) = query.inner.addresses.get(peer_id) {
|
||||
if let Some(addrs) = query.inner.contacts.get(peer_id) {
|
||||
peer_addrs.extend(addrs.iter().cloned())
|
||||
}
|
||||
}
|
||||
@@ -1096,6 +1095,8 @@ where
|
||||
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id, event, handler: NotifyHandler::Any
|
||||
});
|
||||
|
||||
// TODO: collect inner.contacts here, and pass them to connection_updated
|
||||
}
|
||||
|
||||
// The remote's address can only be put into the routing table,
|
||||
@@ -1120,13 +1121,13 @@ where
|
||||
if let Some(peer_id) = peer_id {
|
||||
let key = kbucket::Key::new(peer_id.clone());
|
||||
|
||||
if let Some(addrs) = self.kbuckets.entry(&key).value() {
|
||||
if let Some(contact) = self.kbuckets.entry(&key).value() {
|
||||
// TODO: Ideally, the address should only be removed if the error can
|
||||
// be classified as "permanent" but since `err` is currently a borrowed
|
||||
// trait object without a `'static` bound, even downcasting for inspection
|
||||
// of the error is not possible (and also not truly desirable or ergonomic).
|
||||
// The error passed in should rather be a dedicated enum.
|
||||
if addrs.remove(addr).is_ok() {
|
||||
if contact.addresses.remove(addr, Remove::KeepLast).is_ok() {
|
||||
debug!("Address '{}' removed from peer '{}' due to error: {}.",
|
||||
addr, peer_id, err);
|
||||
} else {
|
||||
@@ -1145,8 +1146,9 @@ where
|
||||
}
|
||||
|
||||
for query in self.queries.iter_mut() {
|
||||
if let Some(addrs) = query.inner.addresses.get_mut(peer_id) {
|
||||
addrs.retain(|a| a != addr);
|
||||
if let Some(contact) = query.inner.contacts.get_mut(peer_id) {
|
||||
// It's safe to unwrap because there's no errors on Remove::Completely
|
||||
contact.addresses.remove(addr, Remove::Completely).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1782,8 +1784,8 @@ impl From<kbucket::EntryView<kbucket::Key<PeerId>, Contact>> for KadPeer {
|
||||
struct QueryInner {
|
||||
/// The query-specific state.
|
||||
info: QueryInfo,
|
||||
/// Addresses of peers discovered during a query.
|
||||
addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
|
||||
/// Contacts of peers discovered during a query.
|
||||
contacts: FnvHashMap<PeerId, Contact>,
|
||||
/// A map of pending requests to peers.
|
||||
///
|
||||
/// A request is pending if the targeted peer is not currently connected
|
||||
@@ -1795,7 +1797,7 @@ impl QueryInner {
|
||||
fn new(info: QueryInfo) -> Self {
|
||||
QueryInner {
|
||||
info,
|
||||
addresses: Default::default(),
|
||||
contacts: Default::default(),
|
||||
pending_rpcs: SmallVec::default()
|
||||
}
|
||||
}
|
||||
|
@@ -20,6 +20,8 @@
|
||||
use crate::Addresses;
|
||||
use libp2p_core::{Multiaddr};
|
||||
use libp2p_core::identity::ed25519::PublicKey;
|
||||
use crate::protocol::KadPeer;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Contact {
|
||||
@@ -40,9 +42,6 @@ impl Contact {
|
||||
pub fn into_vec(self) -> Vec<Multiaddr> {
|
||||
self.addresses.into_vec()
|
||||
}
|
||||
pub fn remove(&mut self, addr: &Multiaddr) -> Result<(),()> {
|
||||
self.addresses.remove(addr)
|
||||
}
|
||||
pub fn insert(&mut self, addr: Multiaddr) -> bool {
|
||||
self.addresses.insert(addr)
|
||||
}
|
||||
@@ -53,3 +52,12 @@ impl Into<Addresses> for Contact {
|
||||
self.addresses
|
||||
}
|
||||
}
|
||||
|
||||
impl From<KadPeer> for Contact {
|
||||
fn from(peer: KadPeer) -> Self {
|
||||
Contact {
|
||||
addresses: peer.multiaddrs.iter().cloned().collect::<SmallVec<[Multiaddr; 6]>>().into(),
|
||||
public_key: peer.public_key
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user