src/exporter: Expose time since last seen for nodes

This commit is contained in:
Max Inden 2020-04-13 18:05:00 +02:00
parent 393c3a8ff0
commit 29635d12f9
No known key found for this signature in database
GPG Key ID: 5403C5464810BC26
2 changed files with 154 additions and 34 deletions

View File

@ -1,10 +1,12 @@
use client::Client;
use node_store::{Node, NodeStore};
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::{
identify::IdentifyEvent,
kad::{GetClosestPeersOk, KademliaEvent},
multiaddr::{Multiaddr, Protocol},
ping::PingEvent,
PeerId,
};
use maxminddb::{geoip2, Reader};
@ -21,21 +23,22 @@ use std::{
};
mod client;
mod node_store;
const TICK_INTERVAL: Duration = Duration::from_secs(10);
pub(crate) struct Exporter {
// TODO: Introduce dht id new type.
clients: HashMap<String, Client>,
metrics: Metrics,
node_stores: HashMap<String, NodeStore>,
ip_db: Option<Reader<Vec<u8>>>,
tick: Delay,
/// Set of in-flight random peer id lookups.
///
/// When a lookup returns the entry is dropped and thus the duratation is
/// observed through `<HistogramTimer as Drop>::drop`.
in_flight_lookups: HashMap<PeerId, HistogramTimer>,
tick: Delay,
metrics: Metrics,
}
impl Exporter {
@ -47,14 +50,19 @@ impl Exporter {
let metrics = Metrics::register(registry);
let clients = dhts
.clone()
.into_iter()
.map(|(name, bootnode)| (name, client::Client::new(bootnode).unwrap()))
.collect();
let node_store_metrics = node_store::Metrics::register(registry);
let node_stores = dhts.into_iter().map(|(name, _)| (name.clone(), NodeStore::new(name, node_store_metrics.clone()))).collect();
Ok(Exporter {
clients,
metrics,
ip_db,
node_stores,
tick: futures_timer::Delay::new(TICK_INTERVAL),
@ -64,7 +72,13 @@ impl Exporter {
fn record_event(&mut self, name: String, event: client::Event) {
match event {
client::Event::Ping(_) => {
// TODO: We could also expose the ping latency.
client::Event::Ping(PingEvent { peer, result }) => {
if result.is_ok() {
self.node_stores.get_mut(&name)
.unwrap()
.observed_node(Node::new(peer));
}
self.metrics
.event_counter
.with_label_values(&[&name, "ping", "ping_event"])
@ -83,7 +97,11 @@ impl Exporter {
.with_label_values(&[&name, "identify", "sent"])
.inc();
}
IdentifyEvent::Received { .. } => {
IdentifyEvent::Received { peer_id, .. } => {
self.node_stores.get_mut(&name)
.unwrap()
.observed_node(Node::new(peer_id));
self.metrics
.event_counter
.with_label_values(&[&name, "identify", "received"])
@ -147,28 +165,27 @@ impl Exporter {
.with_label_values(&[&name, "kad", "republish_record"])
.inc();
}
KademliaEvent::Discovered { .. } => {
KademliaEvent::Discovered { peer_id, .. } => {
self.node_stores.get_mut(&name)
.unwrap()
.observed_node(Node::new(peer_id));
self.metrics
.event_counter
.with_label_values(&[&name, "kad", "discovered"])
.inc();
}
KademliaEvent::RoutingUpdated {
old_peer,
peer,
addresses,
..
_old_peer,
} => {
let country = self
.multiaddresses_to_country_code(addresses.iter())
.unwrap_or_else(|| "unknown".to_string());
// Check if it is a new node, or just an update to a node.
if old_peer.is_none() {
self.metrics
.unique_nodes_discovered
.with_label_values(&[&name, &country])
.inc();
let mut node = Node::new(peer);
if let Some(country) = self.multiaddresses_to_country_code(addresses.iter()) {
node = node.with_country(country);
}
self.node_stores.get_mut(&name).unwrap().observed_node(node);
self.metrics
.event_counter
.with_label_values(&[&name, "kad", "routing_updated"])
@ -226,6 +243,10 @@ impl Future for Exporter {
if let Poll::Ready(()) = this.tick.poll_unpin(ctx) {
this.tick = Delay::new(TICK_INTERVAL);
for (_, node_store) in &mut this.node_stores {
node_store.update_metrics();
}
// Trigger a random lookup for each client.
for (name, client) in &mut this.clients {
let random_peer = PeerId::random();
@ -261,7 +282,6 @@ impl Future for Exporter {
struct Metrics {
event_counter: CounterVec,
unique_nodes_discovered: CounterVec,
random_node_lookup_duration: HistogramVec,
}
@ -278,24 +298,12 @@ impl Metrics {
.unwrap();
registry.register(Box::new(event_counter.clone())).unwrap();
let unique_nodes_discovered = CounterVec::new(
Opts::new(
"unique_nodes_discovered",
"Unique nodes discovered through the Dht.",
),
&["dht", "country"],
)
.unwrap();
registry
.register(Box::new(unique_nodes_discovered.clone()))
.unwrap();
let random_node_lookup_duration = HistogramVec::new(
HistogramOpts::new(
"random_node_lookup_duration",
"Duration of random node lookups.",
)
.buckets(dbg!(exponential_buckets(0.1, 2.0, 10).unwrap())),
.buckets(exponential_buckets(0.1, 2.0, 10).unwrap()),
&["dht"],
)
.unwrap();
@ -305,7 +313,6 @@ impl Metrics {
Metrics {
event_counter,
unique_nodes_discovered,
random_node_lookup_duration,
}

113
src/exporter/node_store.rs Normal file
View File

@ -0,0 +1,113 @@
use std::{collections::HashMap, time::{Duration, Instant}};
use libp2p::PeerId;
use prometheus::{
GaugeVec, Opts, Registry,
};
/// Stores information about a set of nodes for a single Dht.
pub struct NodeStore {
dht: String,
nodes: HashMap<PeerId, Node>,
metrics: Metrics,
}
impl NodeStore {
pub fn new(dht: String, metrics: Metrics) -> Self {
NodeStore {
dht,
nodes: HashMap::new(),
metrics,
}
}
/// Record observation of a specific node.
pub fn observed_node(&mut self, node: Node) {
match self.nodes.get_mut(&node.peer_id) {
Some(n) => n.merge(node),
None => {self.nodes.insert(node.peer_id.clone(), node);},
}
}
pub fn update_metrics(&self) {
let now = Instant::now();
let mut nodes_by_time_by_country = HashMap::<Duration, HashMap<String, u64>>::new();
// Insert 3h, 6h, ... buckets.
for factor in &[3, 6, 12, 24] {
nodes_by_time_by_country.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new());
}
for (_peer_id, node) in &self.nodes {
let since_last_seen = now - node.last_seen;
for (time_barrier, countries) in &mut nodes_by_time_by_country {
if since_last_seen < *time_barrier {
countries.entry(node.country.clone().unwrap_or("unknown".to_string())).and_modify(|v| *v += 1).or_insert(1);
}
}
}
for (time_barrier, countries) in nodes_by_time_by_country {
let last_seen_within = format!("{:?}h", time_barrier.as_secs() / 60 / 60);
for (country, count) in countries {
self.metrics.nodes.with_label_values(&[&self.dht, &country, &last_seen_within]).set(count as f64);
}
}
}
}
pub struct Node {
peer_id: PeerId,
country: Option<String>,
last_seen: Instant,
}
impl Node {
pub fn new(peer_id: PeerId) -> Self {
Node {
peer_id,
country: None,
last_seen: Instant::now(),
}
}
pub fn with_country(mut self, country: String) -> Self {
self.country = Some(country);
self
}
fn merge(&mut self, other: Node) {
self.country = self.country.take().or(other.country);
if self.last_seen < other.last_seen {
self.last_seen = other.last_seen;
}
}
}
#[derive(Clone)]
pub struct Metrics {
nodes: GaugeVec,
}
impl Metrics {
pub fn register(registry: &Registry) -> Metrics {
let nodes = GaugeVec::new(
Opts::new(
"nodes",
"Unique nodes discovered through the Dht.",
),
&["dht", "country", "last_seen_within"],
)
.unwrap();
registry
.register(Box::new(nodes.clone()))
.unwrap();
Metrics {
nodes,
}
}
}