*: Track cloud provider

This commit is contained in:
Max Inden 2020-04-24 19:34:27 +02:00
parent 26c1902c3a
commit 94acb400f2
No known key found for this signature in database
GPG Key ID: 5403C5464810BC26
8 changed files with 189 additions and 19 deletions

3
.assets/cidrs.r Normal file
View File

@ -0,0 +1,3 @@
library(cloudcidrs)
write.csv(all_ranges(), file = "./cidrs.csv")

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target
**/*.rs.bk
.assets/cidrs.csv

61
Cargo.lock generated
View File

@ -221,6 +221,12 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitstring"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e54f7b7a46d7b183eb41e2d82965261fa8a1597c68b50aced268ee1fc70272d"
[[package]]
name = "blake2-rfc"
version = "0.2.18"
@ -303,6 +309,18 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b170cd256a3f9fa6b9edae3e44a7dfdfc77e8124dbc3e2612d75f9c3e2396dae"
[[package]]
name = "bstr"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2889e6d50f394968c8bf4240dc3f2a7eb4680844d27308f798229ac9d4725f41"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde",
]
[[package]]
name = "bumpalo"
version = "2.6.0"
@ -380,6 +398,16 @@ dependencies = [
"constant_time_eq",
]
[[package]]
name = "cidr"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2da1cf0f275bb8dc1867a7f40cdb3b746951db73a183048e6e37fa89ed81bd01"
dependencies = [
"bitstring",
"serde",
]
[[package]]
name = "clap"
version = "2.33.0"
@ -537,6 +565,28 @@ dependencies = [
"subtle 1.0.0",
]
[[package]]
name = "csv"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279"
dependencies = [
"bstr",
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]]
name = "ctr"
version = "0.3.2"
@ -1183,6 +1233,8 @@ name = "kademlia-exporter"
version = "0.1.0"
dependencies = [
"async-std",
"cidr",
"csv",
"ctrlc",
"env_logger",
"exit-future",
@ -2388,6 +2440,15 @@ dependencies = [
"thread_local",
]
[[package]]
name = "regex-automata"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder 1.3.2",
]
[[package]]
name = "regex-syntax"
version = "0.6.12"

View File

@ -8,6 +8,8 @@ edition = "2018"
[dependencies]
async-std = "1.0"
cidr = "0.1"
csv = "1"
env_logger = "0.7.1"
futures = "0.3.1"
libp2p = "0.18.0"

44
src/cloud_provider_db.rs Normal file
View File

@ -0,0 +1,44 @@
use cidr::{Cidr, Ipv4Cidr};
use csv::Reader;
use std::collections::HashMap;
use std::error::Error;
use std::net::Ipv4Addr;
use std::path::PathBuf;
use std::str::FromStr;
pub struct Db {
providers: HashMap<String, Vec<Ipv4Cidr>>,
}
impl Db {
pub fn new(path: PathBuf) -> Result<Self, Box<dyn Error>> {
let mut providers = HashMap::<String, Vec<Ipv4Cidr>>::new();
let mut reader = Reader::from_path(path)?;
for record in reader.records() {
let record = record?;
let provider = &record[1];
let cidr = Ipv4Cidr::from_str(&record[2]).unwrap();
providers
.entry(provider.to_string())
.and_modify(|p| p.push(cidr.clone()))
.or_insert_with(|| vec![cidr]);
}
Ok(Db { providers })
}
pub fn get_provider(&self, addr: Ipv4Addr) -> Option<String> {
for (provider, cidrs) in self.providers.iter() {
for cidr in cidrs.iter() {
if cidr.contains(&addr) {
return Some(provider.clone());
}
}
}
None
}
}

View File

@ -1,3 +1,4 @@
use crate::cloud_provider_db;
use client::Client;
use futures::prelude::*;
use futures_timer::Delay;
@ -31,6 +32,7 @@ pub(crate) struct Exporter {
clients: HashMap<String, Client>,
node_stores: HashMap<String, NodeStore>,
ip_db: Option<Reader<Vec<u8>>>,
cloud_provider_db: Option<cloud_provider_db::Db>,
/// Set of in-flight random peer id lookups.
///
/// When a lookup returns the entry is dropped and thus the duratation is
@ -47,6 +49,7 @@ impl Exporter {
pub(crate) fn new(
dhts: Vec<(String, Multiaddr)>,
ip_db: Option<Reader<Vec<u8>>>,
cloud_provider_db: Option<cloud_provider_db::Db>,
registry: &Registry,
) -> Result<Self, Box<dyn Error>> {
let metrics = Metrics::register(registry);
@ -76,6 +79,7 @@ impl Exporter {
clients,
metrics,
ip_db,
cloud_provider_db,
node_stores,
tick: futures_timer::Delay::new(TICK_INTERVAL),
@ -234,6 +238,10 @@ impl Exporter {
if let Some(country) = self.multiaddresses_to_country_code(addresses.iter()) {
node = node.with_country(country);
}
if let Some(provider) = self.multiaddresses_to_cloud_provider(addresses.iter())
{
node = node.with_cloud_provider(provider);
}
self.node_stores.get_mut(&name).unwrap().observed_node(node);
self.metrics
@ -251,6 +259,33 @@ impl Exporter {
}
}
fn multiaddresses_to_cloud_provider<'a>(
&self,
addresses: impl Iterator<Item = &'a Multiaddr>,
) -> Option<String> {
for address in addresses {
let provider = self.multiaddress_to_cloud_provider(address);
if provider.is_some() {
return provider;
}
}
None
}
fn multiaddress_to_cloud_provider(&self, address: &Multiaddr) -> Option<String> {
let ip_address = match address.iter().next()? {
Protocol::Ip4(addr) => Some(addr),
_ => None,
}?;
if let Some(db) = &self.cloud_provider_db {
return db.get_provider(ip_address);
}
None
}
fn multiaddresses_to_country_code<'a>(
&self,
addresses: impl Iterator<Item = &'a Multiaddr>,

View File

@ -56,36 +56,41 @@ impl NodeStore {
// Seen within
//
let mut nodes_by_time_by_country = HashMap::<Duration, HashMap<String, u64>>::new();
let mut nodes_by_time_by_country_and_provider =
HashMap::<Duration, HashMap<(String, String), u64>>::new();
// Insert 3h, 6h, ... buckets.
for factor in &[3, 6, 12, 24] {
nodes_by_time_by_country.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new());
nodes_by_time_by_country_and_provider
.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new());
}
for node in self.nodes.values() {
let since_last_seen = now - node.last_seen;
for (time_barrier, countries) in &mut nodes_by_time_by_country {
for (time_barrier, countries) in &mut nodes_by_time_by_country_and_provider {
if since_last_seen < *time_barrier {
countries
.entry(
.entry((
node.country
.clone()
.unwrap_or_else(|| "unknown".to_string()),
)
node.provider
.clone()
.unwrap_or_else(|| "unknown".to_string()),
))
.and_modify(|v| *v += 1)
.or_insert(1);
}
}
}
for (time_barrier, countries) in nodes_by_time_by_country {
for (time_barrier, countries) in nodes_by_time_by_country_and_provider {
let last_seen_within = format!("{:?}h", time_barrier.as_secs() / 60 / 60);
for (country, count) in countries {
for ((country, provider), count) in countries {
self.metrics
.nodes_seen_within
.with_label_values(&[&self.dht, &country, &last_seen_within])
.with_label_values(&[&self.dht, &country, &provider, &last_seen_within])
.set(count as f64);
}
}
@ -94,18 +99,20 @@ impl NodeStore {
// Up since
//
let mut nodes_by_time_by_country = HashMap::<Duration, HashMap<String, u64>>::new();
let mut nodes_by_time_by_country_and_provider =
HashMap::<Duration, HashMap<(String, String), u64>>::new();
// Insert 3h, 6h, ... buckets.
for factor in &[3, 6, 12, 24, 48, 96] {
nodes_by_time_by_country.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new());
nodes_by_time_by_country_and_provider
.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new());
}
for node in self.nodes.values() {
// Safeguard in case exporter is behind on probing every nodes
// uptime.
if Instant::now() - node.last_seen > Duration::from_secs(60 * 60) {
continue
continue;
}
let up_since = match node.up_since {
@ -113,27 +120,30 @@ impl NodeStore {
None => continue,
};
for (time_barrier, countries) in &mut nodes_by_time_by_country {
for (time_barrier, countries) in &mut nodes_by_time_by_country_and_provider {
if Instant::now() - up_since > *time_barrier {
countries
.entry(
.entry((
node.country
.clone()
.unwrap_or_else(|| "unknown".to_string()),
)
node.provider
.clone()
.unwrap_or_else(|| "unknown".to_string()),
))
.and_modify(|v| *v += 1)
.or_insert(1);
}
}
}
for (time_barrier, countries) in nodes_by_time_by_country {
for (time_barrier, countries) in nodes_by_time_by_country_and_provider {
let up_since = format!("{:?}h", time_barrier.as_secs() / 60 / 60);
for (country, count) in countries {
for ((country, provider), count) in countries {
self.metrics
.nodes_up_since
.with_label_values(&[&self.dht, &country, &up_since])
.with_label_values(&[&self.dht, &country, &provider, &up_since])
.set(count as f64);
}
}
@ -151,6 +161,7 @@ impl NodeStore {
pub struct Node {
pub peer_id: PeerId,
pub country: Option<String>,
pub provider: Option<String>,
last_seen: Instant,
up_since: Option<Instant>,
}
@ -160,6 +171,7 @@ impl Node {
Node {
peer_id,
country: None,
provider: None,
last_seen: Instant::now(),
up_since: Some(Instant::now()),
}
@ -170,6 +182,11 @@ impl Node {
self
}
pub fn with_cloud_provider(mut self, provider: String) -> Self {
self.provider = Some(provider);
self
}
fn merge(&mut self, other: Node) {
self.country = self.country.take().or(other.country);
self.up_since = self.up_since.take().or(other.up_since);
@ -195,7 +212,7 @@ impl Metrics {
"nodes_seen_within",
"Unique nodes discovered within the time bound through the Dht.",
),
&["dht", "country", "last_seen_within"],
&["dht", "country", "cloud_provider", "last_seen_within"],
)
.unwrap();
registry
@ -207,7 +224,7 @@ impl Metrics {
"nodes_up_since",
"Unique nodes discovered through the Dht and up since timebound.",
),
&["dht", "country", "up_since"],
&["dht", "country", "cloud_provider", "up_since"],
)
.unwrap();
registry.register(Box::new(nodes_up_since.clone())).unwrap();

View File

@ -10,6 +10,7 @@ use std::{
};
use structopt::StructOpt;
mod cloud_provider_db;
mod exporter;
#[derive(Debug, StructOpt)]
@ -25,6 +26,8 @@ struct Opt {
#[structopt(long)]
max_mind_db: Option<PathBuf>,
#[structopt(long)]
cloud_provider_db: Option<PathBuf>,
}
fn main() -> Result<(), Box<dyn Error>> {
@ -49,12 +52,16 @@ fn main() -> Result<(), Box<dyn Error>> {
let ip_db = opt
.max_mind_db
.map(|path| maxminddb::Reader::open_readfile(path).expect("Failed to open max mind db."));
let cloud_provider_db = opt
.cloud_provider_db
.map(|path| cloud_provider_db::Db::new(path).expect("Failed to parse cloud provider db."));
let exporter = exporter::Exporter::new(
opt.dht_name
.into_iter()
.zip(opt.dht_bootnode.into_iter())
.collect(),
ip_db,
cloud_provider_db,
&registry,
)?;