mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-30 11:11:21 +00:00
Report bucket size to prometheus (#19)
This commit is contained in:
parent
b7ec948326
commit
c36397b7de
@ -31,7 +31,7 @@ void = "1.0"
|
|||||||
bs58 = "0.3.0"
|
bs58 = "0.3.0"
|
||||||
derivative = "2.0.2"
|
derivative = "2.0.2"
|
||||||
|
|
||||||
trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "weighted_routing" }
|
trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "master" }
|
||||||
prometheus = "0.9.0"
|
prometheus = "0.9.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -99,7 +99,7 @@ pub struct Kademlia<TStore> {
|
|||||||
store: TStore,
|
store: TStore,
|
||||||
|
|
||||||
pub trust: TrustGraph,
|
pub trust: TrustGraph,
|
||||||
pub metrics: Metrics,
|
pub(super) metrics: Metrics,
|
||||||
|
|
||||||
// TODO: maintenance job (periodic bootstrap) (first time: after a minute or less)
|
// TODO: maintenance job (periodic bootstrap) (first time: after a minute or less)
|
||||||
// TODO: "small" bootstrap function: lookup yourself
|
// TODO: "small" bootstrap function: lookup yourself
|
||||||
@ -482,6 +482,7 @@ where
|
|||||||
pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
|
pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
|
||||||
record.publisher = Some(self.kbuckets.local_key().preimage().clone());
|
record.publisher = Some(self.kbuckets.local_key().preimage().clone());
|
||||||
self.store.put(record.clone())?;
|
self.store.put(record.clone())?;
|
||||||
|
self.metrics.store_put();
|
||||||
record.expires = record.expires.or_else(||
|
record.expires = record.expires.or_else(||
|
||||||
self.record_ttl.map(|ttl| Instant::now() + ttl));
|
self.record_ttl.map(|ttl| Instant::now() + ttl));
|
||||||
let quorum = quorum.eval(self.queries.config().replication_factor);
|
let quorum = quorum.eval(self.queries.config().replication_factor);
|
||||||
@ -1341,7 +1342,10 @@ where
|
|||||||
// requirement to send back the value in the response, although this
|
// requirement to send back the value in the response, although this
|
||||||
// is a waste of resources.
|
// is a waste of resources.
|
||||||
match self.store.put(record.clone()) {
|
match self.store.put(record.clone()) {
|
||||||
Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()),
|
Ok(()) => {
|
||||||
|
self.metrics.store_put();
|
||||||
|
debug!("Record stored: {:?}; {} bytes", record.key, record.value.len());
|
||||||
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
info!("Record not stored: {:?}", e);
|
info!("Record not stored: {:?}", e);
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
@ -1417,11 +1421,14 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn print_bucket_table(&mut self) {
|
fn print_bucket_table(&mut self) {
|
||||||
|
let mut size = 0;
|
||||||
let buckets = self.kbuckets.buckets().filter_map(|KBucketRef { index, bucket }| {
|
let buckets = self.kbuckets.buckets().filter_map(|KBucketRef { index, bucket }| {
|
||||||
use multiaddr::Protocol::{Ip4, Ip6, Tcp};
|
use multiaddr::Protocol::{Ip4, Ip6, Tcp};
|
||||||
let elems = bucket.iter().collect::<Vec<_>>();
|
let elems = bucket.iter().collect::<Vec<_>>();
|
||||||
if elems.len() == 0 {
|
if elems.len() == 0 {
|
||||||
return None
|
return None
|
||||||
|
} else {
|
||||||
|
size += elems.len();
|
||||||
}
|
}
|
||||||
|
|
||||||
let header = format!("Bucket {:?}, elements: {}", index.get(), elems.len());
|
let header = format!("Bucket {:?}, elements: {}", index.get(), elems.len());
|
||||||
@ -1474,7 +1481,9 @@ where
|
|||||||
Some(format!("[bcktdbg] {}\n{}\n", header, elems))
|
Some(format!("[bcktdbg] {}\n{}\n", header, elems))
|
||||||
}).collect::<String>();
|
}).collect::<String>();
|
||||||
|
|
||||||
if buckets.trim().is_empty() {
|
self.metrics.report_routing_table_size(size);
|
||||||
|
|
||||||
|
if size == 0 {
|
||||||
log::info!("[bcktdbg] Bucket table is empty.")
|
log::info!("[bcktdbg] Bucket table is empty.")
|
||||||
} else {
|
} else {
|
||||||
log::info!("\n{}", buckets);
|
log::info!("\n{}", buckets);
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#![deny(dead_code)]
|
||||||
|
|
||||||
use prometheus::{IntCounterVec, IntGauge, Opts, Registry};
|
use prometheus::{IntCounterVec, IntGauge, Opts, Registry};
|
||||||
|
|
||||||
use libp2p_core::PeerId;
|
use libp2p_core::PeerId;
|
||||||
@ -22,7 +24,7 @@ use libp2p_swarm::NetworkBehaviourAction;
|
|||||||
use crate::handler::{KademliaHandlerEvent, KademliaHandlerIn};
|
use crate::handler::{KademliaHandlerEvent, KademliaHandlerIn};
|
||||||
use crate::{KademliaEvent, QueryId};
|
use crate::{KademliaEvent, QueryId};
|
||||||
|
|
||||||
pub enum Kind {
|
pub(super) enum Kind {
|
||||||
Request,
|
Request,
|
||||||
Response,
|
Response,
|
||||||
Error,
|
Error,
|
||||||
@ -51,18 +53,18 @@ enum Inner {
|
|||||||
Enabled(InnerMetrics),
|
Enabled(InnerMetrics),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Metrics {
|
pub(super) struct Metrics {
|
||||||
inner: Inner,
|
inner: Inner,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Metrics {
|
impl Metrics {
|
||||||
pub fn disabled() -> Self {
|
pub(super) fn disabled() -> Self {
|
||||||
Self {
|
Self {
|
||||||
inner: Inner::Disabled,
|
inner: Inner::Disabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enabled(registry: &Registry, peer_id: &PeerId) -> Self {
|
pub(super) fn enabled(registry: &Registry, peer_id: &PeerId) -> Self {
|
||||||
let peer_id = bs58::encode(peer_id).into_string();
|
let peer_id = bs58::encode(peer_id).into_string();
|
||||||
let opts = |name: &str| -> Opts {
|
let opts = |name: &str| -> Opts {
|
||||||
let mut opts = Opts::new(name, name)
|
let mut opts = Opts::new(name, name)
|
||||||
@ -140,11 +142,11 @@ impl Metrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn node_connected(&self) {
|
pub(super) fn node_connected(&self) {
|
||||||
self.with_metrics(|m| m.connected_nodes.inc());
|
self.with_metrics(|m| m.connected_nodes.inc());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn received(&self, event: &KademliaHandlerEvent<QueryId>) {
|
pub(super) fn received(&self, event: &KademliaHandlerEvent<QueryId>) {
|
||||||
use Kind::*;
|
use Kind::*;
|
||||||
|
|
||||||
let (name, kind) = match event {
|
let (name, kind) = match event {
|
||||||
@ -176,7 +178,7 @@ impl Metrics {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn sent(&self, event: &KademliaHandlerIn<QueryId>) {
|
pub(super) fn sent(&self, event: &KademliaHandlerIn<QueryId>) {
|
||||||
use Kind::*;
|
use Kind::*;
|
||||||
|
|
||||||
let (name, kind) = match event {
|
let (name, kind) = match event {
|
||||||
@ -208,7 +210,7 @@ impl Metrics {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn generated_event_name(event: &KademliaEvent) -> &str {
|
pub(super) fn generated_event_name(event: &KademliaEvent) -> &str {
|
||||||
match event {
|
match event {
|
||||||
KademliaEvent::QueryResult { .. } => "query_result",
|
KademliaEvent::QueryResult { .. } => "query_result",
|
||||||
KademliaEvent::Discovered { .. } => "discovered",
|
KademliaEvent::Discovered { .. } => "discovered",
|
||||||
@ -217,7 +219,7 @@ impl Metrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn polled_event(
|
pub(super) fn polled_event(
|
||||||
&self,
|
&self,
|
||||||
event: &NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>,
|
event: &NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>,
|
||||||
) {
|
) {
|
||||||
@ -235,15 +237,15 @@ impl Metrics {
|
|||||||
self.with_metrics(|m| Self::inc_by_name(name, &m.kademlia_events));
|
self.with_metrics(|m| Self::inc_by_name(name, &m.kademlia_events));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn store_put(&self) {
|
pub(super) fn store_put(&self) {
|
||||||
self.with_metrics(|m| m.records_stored.inc())
|
self.with_metrics(|m| m.records_stored.inc())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn record_removed(&self) {
|
pub(super) fn record_removed(&self) {
|
||||||
self.with_metrics(|m| m.records_stored.dec())
|
self.with_metrics(|m| m.records_stored.dec())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn report_routing_table_size(&self, size: usize) {
|
pub(super) fn report_routing_table_size(&self, size: usize) {
|
||||||
self.with_metrics(|m| m.routing_table_size.set(size as i64))
|
self.with_metrics(|m| m.routing_table_size.set(size as i64))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user