From 2d214185c79f596419806022f54f586b835198f5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 16 Apr 2020 21:38:43 +0200 Subject: [PATCH] src/exporter: Measure uptime of nodes --- README.md | 7 +++- src/exporter.rs | 8 +++- src/exporter/node_store.rs | 84 +++++++++++++++++++++++++++++++++++--- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 7fd9235..3b7b21a 100644 --- a/README.md +++ b/README.md @@ -30,8 +30,11 @@ cargo +nightly run -- --dht-name --dht-bootnode --mad- ### Metrics -- Number of nodes discovered. - `kademlia_exporter_nodes{country,dht,last_seen_within}` +- Unique nodes discovered within the time bound through the Dht. + `kademlia_exporter_nodes_seen_within{country,dht,last_seen_within}` + +- Unique nodes discovered through the Dht and up since timebound. + `kademlia_exporter_nodes_up_since{country,dht,up_since}` - Libp2p network behaviour events. `kademlia_exporter_network_behaviour_event{behaviour,dht,event}` diff --git a/src/exporter.rs b/src/exporter.rs index 5c0174b..7d1597e 100644 --- a/src/exporter.rs +++ b/src/exporter.rs @@ -109,7 +109,13 @@ impl Exporter { } // Received a ping and sent back a pong. Ok(PingSuccess::Pong) => Some("received_ping"), - Err(_) => None, + Err(_) => { + self.node_stores + .get_mut(&name) + .unwrap() + .observed_down(&peer); + None + } }; if let Some(event) = event { diff --git a/src/exporter/node_store.rs b/src/exporter/node_store.rs index 8ebe99d..b0e889a 100644 --- a/src/exporter/node_store.rs +++ b/src/exporter/node_store.rs @@ -32,9 +32,17 @@ impl NodeStore { } } + pub fn observed_down(&mut self, peer_id: &PeerId) { + self.nodes.get_mut(peer_id).unwrap().up_since = None; + } + pub fn update_metrics(&self) { let now = Instant::now(); + // + // Seen within + // + let mut nodes_by_time_by_country = HashMap::>::new(); // Insert 3h, 6h, ... buckets. @@ -63,11 +71,53 @@ impl NodeStore { for (country, count) in countries { self.metrics - .nodes + .nodes_seen_within .with_label_values(&[&self.dht, &country, &last_seen_within]) .set(count as f64); } } + + // + // Up since + // + + let mut nodes_by_time_by_country = HashMap::>::new(); + + // Insert 3h, 6h, ... buckets. + for factor in &[3, 6, 12, 24, 48, 96] { + nodes_by_time_by_country.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new()); + } + + for node in self.nodes.values() { + let up_since = match node.up_since { + Some(instant) => instant, + None => continue, + }; + + for (time_barrier, countries) in &mut nodes_by_time_by_country { + if Instant::now() - up_since > *time_barrier { + countries + .entry( + node.country + .clone() + .unwrap_or_else(|| "unknown".to_string()), + ) + .and_modify(|v| *v += 1) + .or_insert(1); + } + } + } + + for (time_barrier, countries) in nodes_by_time_by_country { + let up_since = format!("{:?}h", time_barrier.as_secs() / 60 / 60); + + for (country, count) in countries { + self.metrics + .nodes_up_since + .with_label_values(&[&self.dht, &country, &up_since]) + .set(count as f64); + } + } } pub fn get_peer(&self, peer_id: &PeerId) -> Option<&Node> { @@ -83,6 +133,7 @@ pub struct Node { pub peer_id: PeerId, pub country: Option, last_seen: Instant, + up_since: Option, } impl Node { @@ -91,6 +142,7 @@ impl Node { peer_id, country: None, last_seen: Instant::now(), + up_since: Some(Instant::now()), } } @@ -101,6 +153,7 @@ impl Node { fn merge(&mut self, other: Node) { self.country = self.country.take().or(other.country); + self.up_since = self.up_since.take().or(other.up_since); if self.last_seen < other.last_seen { self.last_seen = other.last_seen; @@ -110,18 +163,37 @@ impl Node { #[derive(Clone)] pub struct Metrics { - nodes: GaugeVec, + nodes_seen_within: GaugeVec, + nodes_up_since: GaugeVec, } impl Metrics { pub fn register(registry: &Registry) -> Metrics { - let nodes = GaugeVec::new( - Opts::new("nodes", "Unique nodes discovered through the Dht."), + let nodes_seen_within = GaugeVec::new( + Opts::new( + "nodes_seen_within", + "Unique nodes discovered within the time bound through the Dht.", + ), &["dht", "country", "last_seen_within"], ) .unwrap(); - registry.register(Box::new(nodes.clone())).unwrap(); + registry + .register(Box::new(nodes_seen_within.clone())) + .unwrap(); - Metrics { nodes } + let nodes_up_since = GaugeVec::new( + Opts::new( + "nodes_up_since", + "Unique nodes discovered through the Dht and up since timebound.", + ), + &["dht", "country", "up_since"], + ) + .unwrap(); + registry.register(Box::new(nodes_up_since.clone())).unwrap(); + + Metrics { + nodes_seen_within, + nodes_up_since, + } } }