diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index f48a43d1..9431fc20 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -31,7 +31,7 @@ void = "1.0" bs58 = "0.3.0" derivative = "2.0.2" -trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "weighted_routing" } +trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "master" } prometheus = "0.9.0" [dev-dependencies] diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index e651c609..efcc3e76 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -99,7 +99,7 @@ pub struct Kademlia { store: TStore, pub trust: TrustGraph, - pub metrics: Metrics, + pub(super) metrics: Metrics, // TODO: maintenance job (periodic bootstrap) (first time: after a minute or less) // TODO: "small" bootstrap function: lookup yourself @@ -482,6 +482,7 @@ where pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result { record.publisher = Some(self.kbuckets.local_key().preimage().clone()); self.store.put(record.clone())?; + self.metrics.store_put(); record.expires = record.expires.or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl)); let quorum = quorum.eval(self.queries.config().replication_factor); @@ -1341,7 +1342,10 @@ where // requirement to send back the value in the response, although this // is a waste of resources. match self.store.put(record.clone()) { - Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()), + Ok(()) => { + self.metrics.store_put(); + debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()); + }, Err(e) => { info!("Record not stored: {:?}", e); self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { @@ -1417,11 +1421,14 @@ where } fn print_bucket_table(&mut self) { + let mut size = 0; let buckets = self.kbuckets.buckets().filter_map(|KBucketRef { index, bucket }| { use multiaddr::Protocol::{Ip4, Ip6, Tcp}; let elems = bucket.iter().collect::>(); if elems.len() == 0 { return None + } else { + size += elems.len(); } let header = format!("Bucket {:?}, elements: {}", index.get(), elems.len()); @@ -1474,7 +1481,9 @@ where Some(format!("[bcktdbg] {}\n{}\n", header, elems)) }).collect::(); - if buckets.trim().is_empty() { + self.metrics.report_routing_table_size(size); + + if size == 0 { log::info!("[bcktdbg] Bucket table is empty.") } else { log::info!("\n{}", buckets); diff --git a/protocols/kad/src/metrics.rs b/protocols/kad/src/metrics.rs index 18183a94..890058ff 100644 --- a/protocols/kad/src/metrics.rs +++ b/protocols/kad/src/metrics.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +#![deny(dead_code)] + use prometheus::{IntCounterVec, IntGauge, Opts, Registry}; use libp2p_core::PeerId; @@ -22,7 +24,7 @@ use libp2p_swarm::NetworkBehaviourAction; use crate::handler::{KademliaHandlerEvent, KademliaHandlerIn}; use crate::{KademliaEvent, QueryId}; -pub enum Kind { +pub(super) enum Kind { Request, Response, Error, @@ -51,18 +53,18 @@ enum Inner { Enabled(InnerMetrics), } -pub struct Metrics { +pub(super) struct Metrics { inner: Inner, } impl Metrics { - pub fn disabled() -> Self { + pub(super) fn disabled() -> Self { Self { inner: Inner::Disabled, } } - pub fn enabled(registry: &Registry, peer_id: &PeerId) -> Self { + pub(super) fn enabled(registry: &Registry, peer_id: &PeerId) -> Self { let peer_id = bs58::encode(peer_id).into_string(); let opts = |name: &str| -> Opts { let mut opts = Opts::new(name, name) @@ -140,11 +142,11 @@ impl Metrics { } } - pub fn node_connected(&self) { + pub(super) fn node_connected(&self) { self.with_metrics(|m| m.connected_nodes.inc()); } - pub fn received(&self, event: &KademliaHandlerEvent) { + pub(super) fn received(&self, event: &KademliaHandlerEvent) { use Kind::*; let (name, kind) = match event { @@ -176,7 +178,7 @@ impl Metrics { }); } - pub fn sent(&self, event: &KademliaHandlerIn) { + pub(super) fn sent(&self, event: &KademliaHandlerIn) { use Kind::*; let (name, kind) = match event { @@ -208,7 +210,7 @@ impl Metrics { }); } - pub fn generated_event_name(event: &KademliaEvent) -> &str { + pub(super) fn generated_event_name(event: &KademliaEvent) -> &str { match event { KademliaEvent::QueryResult { .. } => "query_result", KademliaEvent::Discovered { .. } => "discovered", @@ -217,7 +219,7 @@ impl Metrics { } } - pub fn polled_event( + pub(super) fn polled_event( &self, event: &NetworkBehaviourAction, KademliaEvent>, ) { @@ -235,15 +237,15 @@ impl Metrics { self.with_metrics(|m| Self::inc_by_name(name, &m.kademlia_events)); } - pub fn store_put(&self) { + pub(super) fn store_put(&self) { self.with_metrics(|m| m.records_stored.inc()) } - pub fn record_removed(&self) { + pub(super) fn record_removed(&self) { self.with_metrics(|m| m.records_stored.dec()) } - pub fn report_routing_table_size(&self, size: usize) { + pub(super) fn report_routing_table_size(&self, size: usize) { self.with_metrics(|m| m.routing_table_size.set(size as i64)) } }