diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 57675028..3bd76ef7 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -26,13 +26,13 @@ use crate::K_VALUE; use crate::addresses::{Addresses, Remove}; use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; use crate::jobs::*; -use crate::kbucket::{self, KBucketsTable, NodeStatus}; +use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef}; use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer}; use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState}; use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord}; use crate::contact::Contact; use fnv::{FnvHashMap, FnvHashSet}; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId, multiaddr}; use libp2p_swarm::{ NetworkBehaviour, NetworkBehaviourAction, @@ -500,9 +500,11 @@ where /// The results of the (repeated) provider announcements sent by this node are /// delivered in [`AddProviderResult`]. pub fn start_providing(&mut self, key: record::Key) { + self.print_bucket_table(); // TODO: calculate weight for self? let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone()); if let Err(err) = self.store.add_provider(record) { + log::error!("Error on add_provider to local storage {}: {:?}", bs58::encode(key.as_ref()).into_string(), err); self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::StartProvidingResult(Err( AddProviderError::LocalStorageError { key, cause: err } @@ -528,6 +530,12 @@ where /// This is a local operation. The local node will still be considered as a /// provider for the key by other nodes until these provider records expire. pub fn stop_providing(&mut self, key: &record::Key) { + let target = kbucket::Key::new(key.clone()); + debug!( + "stop_providing for key {} ; kademlia key {}", + bs58::encode(key.as_ref()).into_string(), // peer id + bs58::encode(target.as_ref()).into_string(), // sha256 + ); self.store.remove_provider(key, self.kbuckets.local_key().preimage()); } @@ -535,6 +543,7 @@ where /// /// The result of this operation is delivered in [`KademliaEvent::GetProvidersResult`]. pub fn get_providers(&mut self, key: record::Key) { + self.print_bucket_table(); let info = QueryInfo::GetProviders { key: key.clone(), providers: Vec::new(), @@ -740,10 +749,12 @@ where fn query_finished(&mut self, q: Query, params: &mut impl PollParameters) -> Option { - log::trace!("Query {:?} finished.", q.id()); let result = q.into_result(); + + log::info!("Query {} finished", format!("{:#?}", result.inner.info).lines().take(1).next().unwrap()); match result.inner.info { QueryInfo::Bootstrap { peer } => { + self.print_bucket_table(); let local_key = self.kbuckets.local_key().clone(); if &peer == local_key.preimage() { // The lookup for the local key finished. To complete the bootstrap process, @@ -821,6 +832,7 @@ where } QueryInfo::AddProvider { key, context, .. } => { + log::info!("AddProvider finished {:?}!", context); match context { AddProviderContext::Publish => { Some(KademliaEvent::StartProvidingResult(Ok( @@ -1105,6 +1117,71 @@ where } } } + + fn print_bucket_table(&mut self) { + let buckets = self.kbuckets.buckets().filter_map(|KBucketRef { index, bucket }| { + use multiaddr::Protocol::{Ip4, Ip6, Tcp}; + let elems = bucket.iter().collect::>(); + if elems.len() == 0 { + return None + } + + let header = format!("Bucket {:?}, elements: {}", index.get(), elems.len()); + let elems = elems.into_iter().map(|(node, status)| { + let status_s = match status { + NodeStatus::Connected => "C", + NodeStatus::Disconnected => "D" + }; + + let address_s = node.value.addresses + .iter() + .next() + .map(|ma| + ma.iter().fold(String::new(), |acc, proto| + match proto { + Ip4(addr) => format!("{}", addr), + Ip6(addr) => format!("{}", addr), + Tcp(port) => format!("{}:{}", acc, port), + _ => acc + } + ) + ).unwrap_or("NOADDR".to_string()); + + let address_plus = node.value.addresses + .len() + .checked_sub(1) + .filter(|l| *l != 0) + .map(|l| format!(" (+{})", l)) + .unwrap_or("".to_string()); + + let kademlia_key = bs58::encode(node.key.as_ref()).into_string(); + let len = kademlia_key.len(); + let kademlia_key = &kademlia_key[len - 10..]; + + let peer_id = node.key.preimage().to_base58(); + let len = peer_id.len(); + let peer_id = &peer_id[len - 10..]; + + format!( + "\t{} {} {} {}{} {}\n", + status_s, + node.weight, + peer_id, + address_s, + address_plus, + kademlia_key + ) + }).collect::(); + + Some(format!("{}\n{}\n", header, elems)) + }).collect::(); + + if buckets.trim().is_empty() { + log::info!("Bucket table is empty.") + } else { + log::info!("\n{}", buckets); + } + } } /// Exponentially decrease the given duration (base 2). @@ -1251,6 +1328,7 @@ where ) { match event { KademliaHandlerEvent::FindNodeReq { key, request_id } => { + self.print_bucket_table(); let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source, @@ -1270,6 +1348,7 @@ where } KademliaHandlerEvent::GetProvidersReq { key, request_id } => { + self.print_bucket_table(); let provider_peers = self.provider_peers(&key, &source); debug!( "provider peers: {}", @@ -1423,6 +1502,7 @@ where Self::OutEvent, >, > { + log::info!("Kademlia NetworkBehaviour poll"); let now = Instant::now(); // Calculate the available capacity for queries triggered by background jobs. diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 52c06d0f..bbc0ccdc 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -103,7 +103,7 @@ pub struct KBucketsTable { /// A (type-safe) index into a `KBucketsTable`, i.e. a non-negative integer in the /// interval `[0, NUM_BUCKETS)`. #[derive(Debug, Copy, Clone, PartialEq, Eq)] -struct BucketIndex(usize); +pub struct BucketIndex(usize); impl BucketIndex { /// Creates a new `BucketIndex` for a `Distance`. @@ -120,7 +120,7 @@ impl BucketIndex { } /// Gets the index value as an unsigned integer. - fn get(&self) -> usize { + pub fn get(&self) -> usize { self.0 } @@ -506,8 +506,8 @@ where /// A reference to a bucket in a `KBucketsTable`. pub struct KBucketRef<'a, TPeerId, TVal> { - index: BucketIndex, - bucket: &'a mut KBucket, + pub index: BucketIndex, + pub bucket: &'a mut KBucket, } impl KBucketRef<'_, TKey, TVal>