src/exporter: Use max mind ip db to localize nodes

This commit is contained in:
Max Inden 2020-04-12 23:15:05 +02:00
parent 8a72f1db3a
commit 4588dbcb42
No known key found for this signature in database
GPG Key ID: 5403C5464810BC26
4 changed files with 72 additions and 12 deletions

23
Cargo.lock generated
View File

@ -1191,6 +1191,7 @@ dependencies = [
"libp2p",
"libp2p-kad",
"log",
"maxminddb",
"prometheus",
"structopt",
"tide",
@ -1709,6 +1710,17 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "maxminddb"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9412a854bf1355d1ff92ef6ffe557dcc4a866e20cdffc7d3fc082174dba7436e"
dependencies = [
"log",
"serde",
"serde_derive",
]
[[package]]
name = "maybe-uninit"
version = "2.0.0"
@ -2526,6 +2538,17 @@ version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1217f97ab8e8904b57dd22eb61cde455fa7446a9c1cf43966066da047c1f3702"
[[package]]
name = "serde_derive"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.42"

View File

@ -20,4 +20,5 @@ exit-future = "0.2"
ctrlc = "3"
structopt = "0.3"
futures-timer = "3"
maxminddb = "*"

View File

@ -1,6 +1,7 @@
use client::Client;
use futures::prelude::*;
use libp2p::{core::Multiaddr, identify::IdentifyEvent, kad::KademliaEvent};
use libp2p::{multiaddr::{Multiaddr, Protocol}, identify::IdentifyEvent, kad::KademliaEvent};
use prometheus::{CounterVec, GaugeVec, Opts, Registry};
use std::{
collections::HashMap,
@ -8,15 +9,17 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use maxminddb::{geoip2, Reader};
mod client;
pub(crate) struct Exporter {
clients: HashMap<String, Client>,
metrics: Metrics,
ip_db: Option<Reader<Vec<u8>>>,
}
impl Exporter {
pub(crate) fn new(dhts: Vec<Multiaddr>, registry: &Registry) -> Result<Self, Box<dyn Error>> {
pub(crate) fn new(dhts: Vec<Multiaddr>, ip_db: Option<Reader<Vec<u8>>>, registry: &Registry) -> Result<Self, Box<dyn Error>> {
let metrics = Metrics::register(registry);
let clients = dhts
@ -29,7 +32,8 @@ impl Exporter {
})
.collect();
Ok(Exporter { clients, metrics })
Ok(Exporter { clients, metrics, ip_db })
}
fn record_event(&self, name: String, event: client::Event) {
@ -116,10 +120,12 @@ impl Exporter {
.with_label_values(&[&name, "kad", "discovered"])
.inc();
}
KademliaEvent::RoutingUpdated { old_peer, .. } => {
KademliaEvent::RoutingUpdated { old_peer, addresses, .. } => {
let country = self.multiaddresses_to_country_code(addresses.iter()).unwrap_or("unknown".to_string());
// Check if it is a new node, or just an update to a node.
if old_peer.is_none() {
self.metrics.bucket_size.with_label_values(&[&name]).inc();
self.metrics.nodes.with_label_values(&[&name, &country]).inc();
}
self.metrics
.event_counter
@ -136,6 +142,30 @@ impl Exporter {
}
}
}
fn multiaddresses_to_country_code<'a>(&self, addresses: impl Iterator<Item = &'a Multiaddr>) -> Option<String> {
for address in addresses {
let country = self.multiaddress_to_country_code(address);
if country.is_some() {
return country
}
}
None
}
fn multiaddress_to_country_code(&self, address: &Multiaddr) -> Option<String> {
match address.iter().next()? {
Protocol::Ip4(addr) => {
if let Some(ip_db) = &self.ip_db {
return ip_db.lookup::<geoip2::City>(std::net::IpAddr::V4(addr)).ok()?.country?.iso_code;
}
}
_ => {},
}
None
}
}
impl Future for Exporter {
@ -163,7 +193,7 @@ impl Future for Exporter {
struct Metrics {
event_counter: CounterVec,
bucket_size: GaugeVec,
nodes: GaugeVec,
}
impl Metrics {
@ -178,15 +208,16 @@ impl Metrics {
.unwrap();
registry.register(Box::new(event_counter.clone())).unwrap();
let bucket_size = GaugeVec::new(
Opts::new("kad_kbuckets_size", "Libp2p Kademlia K-Buckets size."),
&["dht"],
let nodes = GaugeVec::new(
Opts::new("nodes", "Unique nodes discovered through the Dht."),
&["dht", "country"],
)
.unwrap();
registry.register(Box::new(bucket_size.clone())).unwrap();
registry.register(Box::new(nodes.clone())).unwrap();
Metrics {
event_counter,
bucket_size,
nodes,
}
}
}

View File

@ -5,6 +5,7 @@ use libp2p::core::Multiaddr;
use prometheus::{Encoder, Registry, TextEncoder};
use std::{
error::Error,
path::PathBuf,
sync::{Arc, Mutex},
};
use structopt::StructOpt;
@ -19,6 +20,9 @@ mod exporter;
struct Opt {
#[structopt(long)]
dht: Vec<Multiaddr>,
#[structopt(long)]
max_mind_db: Option<PathBuf>,
}
fn main() -> Result<(), Box<dyn Error>> {
@ -37,7 +41,8 @@ fn main() -> Result<(), Box<dyn Error>> {
.unwrap();
let registry = Registry::new();
let exporter = exporter::Exporter::new(opt.dht, &registry)?;
let ip_db = opt.max_mind_db.map(|path| maxminddb::Reader::open_readfile(path).expect("Failed to open max mind db."));
let exporter = exporter::Exporter::new(opt.dht, ip_db, &registry)?;
let exit_clone = exit.clone();
let metrics_server = std::thread::spawn(move || {