src/exporter: Add random node lookup duration metric

This commit is contained in:
Max Inden 2020-04-13 14:23:41 +02:00
parent 87d76a7357
commit 393c3a8ff0
No known key found for this signature in database
GPG Key ID: 5403C5464810BC26
2 changed files with 83 additions and 25 deletions

View File

@ -1,28 +1,43 @@
use client::Client;
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::{
identify::IdentifyEvent,
kad::KademliaEvent,
kad::{GetClosestPeersOk, KademliaEvent},
multiaddr::{Multiaddr, Protocol},
PeerId,
};
use maxminddb::{geoip2, Reader};
use prometheus::{CounterVec, GaugeVec, Opts, Registry};
use prometheus::{
exponential_buckets, CounterVec, HistogramOpts, HistogramTimer, HistogramVec, Opts, Registry,
};
use std::{
collections::HashMap,
error::Error,
net::IpAddr,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
mod client;
const TICK_INTERVAL: Duration = Duration::from_secs(10);
pub(crate) struct Exporter {
clients: HashMap<String, Client>,
metrics: Metrics,
ip_db: Option<Reader<Vec<u8>>>,
tick: Delay,
/// Set of in-flight random peer id lookups.
///
/// When a lookup returns the entry is dropped and thus the duratation is
/// observed through `<HistogramTimer as Drop>::drop`.
in_flight_lookups: HashMap<PeerId, HistogramTimer>,
}
impl Exporter {
pub(crate) fn new(
dhts: Vec<(String, Multiaddr)>,
@ -40,10 +55,14 @@ impl Exporter {
clients,
metrics,
ip_db,
tick: futures_timer::Delay::new(TICK_INTERVAL),
in_flight_lookups: HashMap::new(),
})
}
fn record_event(&self, name: String, event: client::Event) {
fn record_event(&mut self, name: String, event: client::Event) {
match event {
client::Event::Ping(_) => {
self.metrics
@ -79,11 +98,18 @@ impl Exporter {
.with_label_values(&[&name, "kad", "bootstrap"])
.inc();
}
KademliaEvent::GetClosestPeersResult(_) => {
KademliaEvent::GetClosestPeersResult(res) => {
self.metrics
.event_counter
.with_label_values(&[&name, "kad", "get_closest_peers"])
.inc();
let peer_id = PeerId::from_bytes(match res {
Ok(GetClosestPeersOk { key, .. }) => key,
Err(err) => err.into_key(),
})
.unwrap();
self.in_flight_lookups.remove(&peer_id);
}
KademliaEvent::GetProvidersResult(_) => {
self.metrics
@ -139,7 +165,7 @@ impl Exporter {
// Check if it is a new node, or just an update to a node.
if old_peer.is_none() {
self.metrics
.nodes
.unique_nodes_discovered
.with_label_values(&[&name, &country])
.inc();
}
@ -195,9 +221,27 @@ impl Exporter {
impl Future for Exporter {
type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;
if let Poll::Ready(()) = this.tick.poll_unpin(ctx) {
this.tick = Delay::new(TICK_INTERVAL);
// Trigger a random lookup for each client.
for (name, client) in &mut this.clients {
let random_peer = PeerId::random();
let timer = this
.metrics
.random_node_lookup_duration
.with_label_values(&[&name])
.start_timer();
client.get_closest_peers(random_peer.clone());
this.in_flight_lookups.insert(random_peer, timer);
}
}
let mut events = vec![];
for (name, client) in &mut self.clients {
for (name, client) in &mut this.clients {
loop {
match client.poll_next_unpin(ctx) {
Poll::Ready(Some(event)) => events.push((name.clone(), event)),
@ -208,7 +252,7 @@ impl Future for Exporter {
}
for (name, event) in events {
self.record_event(name, event);
this.record_event(name, event);
}
Poll::Pending
@ -217,7 +261,9 @@ impl Future for Exporter {
struct Metrics {
event_counter: CounterVec,
nodes: GaugeVec,
unique_nodes_discovered: CounterVec,
random_node_lookup_duration: HistogramVec,
}
impl Metrics {
@ -232,16 +278,36 @@ impl Metrics {
.unwrap();
registry.register(Box::new(event_counter.clone())).unwrap();
let nodes = GaugeVec::new(
Opts::new("nodes", "Unique nodes discovered through the Dht."),
let unique_nodes_discovered = CounterVec::new(
Opts::new(
"unique_nodes_discovered",
"Unique nodes discovered through the Dht.",
),
&["dht", "country"],
)
.unwrap();
registry.register(Box::new(nodes.clone())).unwrap();
registry
.register(Box::new(unique_nodes_discovered.clone()))
.unwrap();
let random_node_lookup_duration = HistogramVec::new(
HistogramOpts::new(
"random_node_lookup_duration",
"Duration of random node lookups.",
)
.buckets(dbg!(exponential_buckets(0.1, 2.0, 10).unwrap())),
&["dht"],
)
.unwrap();
registry
.register(Box::new(random_node_lookup_duration.clone()))
.unwrap();
Metrics {
event_counter,
nodes,
unique_nodes_discovered,
random_node_lookup_duration,
}
}
}

View File

@ -1,5 +1,4 @@
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::{
core::{
self, either::EitherError, either::EitherOutput, multiaddr::Protocol,
@ -26,13 +25,9 @@ use std::{
mod global_only;
const RANDOM_WALK_INTERVAL: Duration = Duration::from_secs(10);
pub struct Client {
swarm: Swarm<MyBehaviour>,
listening: bool,
random_walk: Delay,
}
impl Client {
@ -63,21 +58,18 @@ impl Client {
Ok(Client {
swarm,
listening: false,
random_walk: futures_timer::Delay::new(RANDOM_WALK_INTERVAL),
})
}
pub fn get_closest_peers(&mut self, peer_id: PeerId) {
self.swarm.kademlia.get_closest_peers(peer_id);
}
}
// TODO: this should be a stream instead.
impl Stream for Client {
type Item = Event;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if let Poll::Ready(()) = self.random_walk.poll_unpin(ctx) {
self.random_walk = Delay::new(RANDOM_WALK_INTERVAL);
self.swarm.kademlia.get_closest_peers(PeerId::random());
}
match self.swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(event)) => return Poll::Ready(Some(event)),
Poll::Ready(None) => return Poll::Ready(None),