Log bucket table (#5)

This commit is contained in:
folex 2020-04-01 12:52:37 +03:00 committed by GitHub
parent 034c3e41fe
commit ec0b5dc364
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 7 deletions

View File

@ -26,13 +26,13 @@ use crate::K_VALUE;
use crate::addresses::{Addresses, Remove};
use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef};
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use crate::contact::Contact;
use fnv::{FnvHashMap, FnvHashSet};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId, multiaddr};
use libp2p_swarm::{
NetworkBehaviour,
NetworkBehaviourAction,
@ -500,9 +500,11 @@ where
/// The results of the (repeated) provider announcements sent by this node are
/// delivered in [`AddProviderResult`].
pub fn start_providing(&mut self, key: record::Key) {
self.print_bucket_table();
// TODO: calculate weight for self?
let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone());
if let Err(err) = self.store.add_provider(record) {
log::error!("Error on add_provider to local storage {}: {:?}", bs58::encode(key.as_ref()).into_string(), err);
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::StartProvidingResult(Err(
AddProviderError::LocalStorageError { key, cause: err }
@ -528,6 +530,12 @@ where
/// This is a local operation. The local node will still be considered as a
/// provider for the key by other nodes until these provider records expire.
pub fn stop_providing(&mut self, key: &record::Key) {
let target = kbucket::Key::new(key.clone());
debug!(
"stop_providing for key {} ; kademlia key {}",
bs58::encode(key.as_ref()).into_string(), // peer id
bs58::encode(target.as_ref()).into_string(), // sha256
);
self.store.remove_provider(key, self.kbuckets.local_key().preimage());
}
@ -535,6 +543,7 @@ where
///
/// The result of this operation is delivered in [`KademliaEvent::GetProvidersResult`].
pub fn get_providers(&mut self, key: record::Key) {
self.print_bucket_table();
let info = QueryInfo::GetProviders {
key: key.clone(),
providers: Vec::new(),
@ -740,10 +749,12 @@ where
fn query_finished(&mut self, q: Query<QueryInner>, params: &mut impl PollParameters)
-> Option<KademliaEvent>
{
log::trace!("Query {:?} finished.", q.id());
let result = q.into_result();
log::info!("Query {} finished", format!("{:#?}", result.inner.info).lines().take(1).next().unwrap());
match result.inner.info {
QueryInfo::Bootstrap { peer } => {
self.print_bucket_table();
let local_key = self.kbuckets.local_key().clone();
if &peer == local_key.preimage() {
// The lookup for the local key finished. To complete the bootstrap process,
@ -821,6 +832,7 @@ where
}
QueryInfo::AddProvider { key, context, .. } => {
log::info!("AddProvider finished {:?}!", context);
match context {
AddProviderContext::Publish => {
Some(KademliaEvent::StartProvidingResult(Ok(
@ -1105,6 +1117,71 @@ where
}
}
}
fn print_bucket_table(&mut self) {
let buckets = self.kbuckets.buckets().filter_map(|KBucketRef { index, bucket }| {
use multiaddr::Protocol::{Ip4, Ip6, Tcp};
let elems = bucket.iter().collect::<Vec<_>>();
if elems.len() == 0 {
return None
}
let header = format!("Bucket {:?}, elements: {}", index.get(), elems.len());
let elems = elems.into_iter().map(|(node, status)| {
let status_s = match status {
NodeStatus::Connected => "C",
NodeStatus::Disconnected => "D"
};
let address_s = node.value.addresses
.iter()
.next()
.map(|ma|
ma.iter().fold(String::new(), |acc, proto|
match proto {
Ip4(addr) => format!("{}", addr),
Ip6(addr) => format!("{}", addr),
Tcp(port) => format!("{}:{}", acc, port),
_ => acc
}
)
).unwrap_or("NOADDR".to_string());
let address_plus = node.value.addresses
.len()
.checked_sub(1)
.filter(|l| *l != 0)
.map(|l| format!(" (+{})", l))
.unwrap_or("".to_string());
let kademlia_key = bs58::encode(node.key.as_ref()).into_string();
let len = kademlia_key.len();
let kademlia_key = &kademlia_key[len - 10..];
let peer_id = node.key.preimage().to_base58();
let len = peer_id.len();
let peer_id = &peer_id[len - 10..];
format!(
"\t{} {} {} {}{} {}\n",
status_s,
node.weight,
peer_id,
address_s,
address_plus,
kademlia_key
)
}).collect::<String>();
Some(format!("{}\n{}\n", header, elems))
}).collect::<String>();
if buckets.trim().is_empty() {
log::info!("Bucket table is empty.")
} else {
log::info!("\n{}", buckets);
}
}
}
/// Exponentially decrease the given duration (base 2).
@ -1251,6 +1328,7 @@ where
) {
match event {
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
self.print_bucket_table();
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
@ -1270,6 +1348,7 @@ where
}
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
self.print_bucket_table();
let provider_peers = self.provider_peers(&key, &source);
debug!(
"provider peers: {}",
@ -1423,6 +1502,7 @@ where
Self::OutEvent,
>,
> {
log::info!("Kademlia NetworkBehaviour poll");
let now = Instant::now();
// Calculate the available capacity for queries triggered by background jobs.

View File

@ -103,7 +103,7 @@ pub struct KBucketsTable<TKey, TVal> {
/// A (type-safe) index into a `KBucketsTable`, i.e. a non-negative integer in the
/// interval `[0, NUM_BUCKETS)`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct BucketIndex(usize);
pub struct BucketIndex(usize);
impl BucketIndex {
/// Creates a new `BucketIndex` for a `Distance`.
@ -120,7 +120,7 @@ impl BucketIndex {
}
/// Gets the index value as an unsigned integer.
fn get(&self) -> usize {
pub fn get(&self) -> usize {
self.0
}
@ -506,8 +506,8 @@ where
/// A reference to a bucket in a `KBucketsTable`.
pub struct KBucketRef<'a, TPeerId, TVal> {
index: BucketIndex,
bucket: &'a mut KBucket<TPeerId, TVal>,
pub index: BucketIndex,
pub bucket: &'a mut KBucket<TPeerId, TVal>,
}
impl<TKey, TVal> KBucketRef<'_, TKey, TVal>