From 94acb400f25366092f1306ca2036183a9b14d92b Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 24 Apr 2020 19:34:27 +0200 Subject: [PATCH] *: Track cloud provider --- .assets/cidrs.r | 3 ++ .gitignore | 1 + Cargo.lock | 61 ++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 ++ src/cloud_provider_db.rs | 44 +++++++++++++++++++++++++++ src/exporter.rs | 35 ++++++++++++++++++++++ src/exporter/node_store.rs | 55 ++++++++++++++++++++++------------ src/main.rs | 7 +++++ 8 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 .assets/cidrs.r create mode 100644 src/cloud_provider_db.rs diff --git a/.assets/cidrs.r b/.assets/cidrs.r new file mode 100644 index 0000000..75d0d3c --- /dev/null +++ b/.assets/cidrs.r @@ -0,0 +1,3 @@ +library(cloudcidrs) + +write.csv(all_ranges(), file = "./cidrs.csv") diff --git a/.gitignore b/.gitignore index 53eaa21..9eb96d7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target **/*.rs.bk +.assets/cidrs.csv diff --git a/Cargo.lock b/Cargo.lock index fc8a7a5..9f70471 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -221,6 +221,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "bitstring" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e54f7b7a46d7b183eb41e2d82965261fa8a1597c68b50aced268ee1fc70272d" + [[package]] name = "blake2-rfc" version = "0.2.18" @@ -303,6 +309,18 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b170cd256a3f9fa6b9edae3e44a7dfdfc77e8124dbc3e2612d75f9c3e2396dae" +[[package]] +name = "bstr" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2889e6d50f394968c8bf4240dc3f2a7eb4680844d27308f798229ac9d4725f41" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bumpalo" version = "2.6.0" @@ -380,6 +398,16 @@ dependencies = [ "constant_time_eq", ] +[[package]] +name = "cidr" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2da1cf0f275bb8dc1867a7f40cdb3b746951db73a183048e6e37fa89ed81bd01" +dependencies = [ + "bitstring", + "serde", +] + [[package]] name = "clap" version = "2.33.0" @@ -537,6 +565,28 @@ dependencies = [ "subtle 1.0.0", ] +[[package]] +name = "csv" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.3.2" @@ -1183,6 +1233,8 @@ name = "kademlia-exporter" version = "0.1.0" dependencies = [ "async-std", + "cidr", + "csv", "ctrlc", "env_logger", "exit-future", @@ -2388,6 +2440,15 @@ dependencies = [ "thread_local", ] +[[package]] +name = "regex-automata" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +dependencies = [ + "byteorder 1.3.2", +] + [[package]] name = "regex-syntax" version = "0.6.12" diff --git a/Cargo.toml b/Cargo.toml index 2a5c725..c0a80c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,8 @@ edition = "2018" [dependencies] async-std = "1.0" +cidr = "0.1" +csv = "1" env_logger = "0.7.1" futures = "0.3.1" libp2p = "0.18.0" diff --git a/src/cloud_provider_db.rs b/src/cloud_provider_db.rs new file mode 100644 index 0000000..3ff3687 --- /dev/null +++ b/src/cloud_provider_db.rs @@ -0,0 +1,44 @@ +use cidr::{Cidr, Ipv4Cidr}; +use csv::Reader; +use std::collections::HashMap; +use std::error::Error; +use std::net::Ipv4Addr; +use std::path::PathBuf; +use std::str::FromStr; + +pub struct Db { + providers: HashMap>, +} + +impl Db { + pub fn new(path: PathBuf) -> Result> { + let mut providers = HashMap::>::new(); + + let mut reader = Reader::from_path(path)?; + for record in reader.records() { + let record = record?; + + let provider = &record[1]; + let cidr = Ipv4Cidr::from_str(&record[2]).unwrap(); + + providers + .entry(provider.to_string()) + .and_modify(|p| p.push(cidr.clone())) + .or_insert_with(|| vec![cidr]); + } + + Ok(Db { providers }) + } + + pub fn get_provider(&self, addr: Ipv4Addr) -> Option { + for (provider, cidrs) in self.providers.iter() { + for cidr in cidrs.iter() { + if cidr.contains(&addr) { + return Some(provider.clone()); + } + } + } + + None + } +} diff --git a/src/exporter.rs b/src/exporter.rs index 03e4f34..2d6ede9 100644 --- a/src/exporter.rs +++ b/src/exporter.rs @@ -1,3 +1,4 @@ +use crate::cloud_provider_db; use client::Client; use futures::prelude::*; use futures_timer::Delay; @@ -31,6 +32,7 @@ pub(crate) struct Exporter { clients: HashMap, node_stores: HashMap, ip_db: Option>>, + cloud_provider_db: Option, /// Set of in-flight random peer id lookups. /// /// When a lookup returns the entry is dropped and thus the duratation is @@ -47,6 +49,7 @@ impl Exporter { pub(crate) fn new( dhts: Vec<(String, Multiaddr)>, ip_db: Option>>, + cloud_provider_db: Option, registry: &Registry, ) -> Result> { let metrics = Metrics::register(registry); @@ -76,6 +79,7 @@ impl Exporter { clients, metrics, ip_db, + cloud_provider_db, node_stores, tick: futures_timer::Delay::new(TICK_INTERVAL), @@ -234,6 +238,10 @@ impl Exporter { if let Some(country) = self.multiaddresses_to_country_code(addresses.iter()) { node = node.with_country(country); } + if let Some(provider) = self.multiaddresses_to_cloud_provider(addresses.iter()) + { + node = node.with_cloud_provider(provider); + } self.node_stores.get_mut(&name).unwrap().observed_node(node); self.metrics @@ -251,6 +259,33 @@ impl Exporter { } } + fn multiaddresses_to_cloud_provider<'a>( + &self, + addresses: impl Iterator, + ) -> Option { + for address in addresses { + let provider = self.multiaddress_to_cloud_provider(address); + if provider.is_some() { + return provider; + } + } + + None + } + + fn multiaddress_to_cloud_provider(&self, address: &Multiaddr) -> Option { + let ip_address = match address.iter().next()? { + Protocol::Ip4(addr) => Some(addr), + _ => None, + }?; + + if let Some(db) = &self.cloud_provider_db { + return db.get_provider(ip_address); + } + + None + } + fn multiaddresses_to_country_code<'a>( &self, addresses: impl Iterator, diff --git a/src/exporter/node_store.rs b/src/exporter/node_store.rs index f44ea42..45c2282 100644 --- a/src/exporter/node_store.rs +++ b/src/exporter/node_store.rs @@ -56,36 +56,41 @@ impl NodeStore { // Seen within // - let mut nodes_by_time_by_country = HashMap::>::new(); + let mut nodes_by_time_by_country_and_provider = + HashMap::>::new(); // Insert 3h, 6h, ... buckets. for factor in &[3, 6, 12, 24] { - nodes_by_time_by_country.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new()); + nodes_by_time_by_country_and_provider + .insert(Duration::from_secs(60 * 60 * *factor), HashMap::new()); } for node in self.nodes.values() { let since_last_seen = now - node.last_seen; - for (time_barrier, countries) in &mut nodes_by_time_by_country { + for (time_barrier, countries) in &mut nodes_by_time_by_country_and_provider { if since_last_seen < *time_barrier { countries - .entry( + .entry(( node.country .clone() .unwrap_or_else(|| "unknown".to_string()), - ) + node.provider + .clone() + .unwrap_or_else(|| "unknown".to_string()), + )) .and_modify(|v| *v += 1) .or_insert(1); } } } - for (time_barrier, countries) in nodes_by_time_by_country { + for (time_barrier, countries) in nodes_by_time_by_country_and_provider { let last_seen_within = format!("{:?}h", time_barrier.as_secs() / 60 / 60); - for (country, count) in countries { + for ((country, provider), count) in countries { self.metrics .nodes_seen_within - .with_label_values(&[&self.dht, &country, &last_seen_within]) + .with_label_values(&[&self.dht, &country, &provider, &last_seen_within]) .set(count as f64); } } @@ -94,18 +99,20 @@ impl NodeStore { // Up since // - let mut nodes_by_time_by_country = HashMap::>::new(); + let mut nodes_by_time_by_country_and_provider = + HashMap::>::new(); // Insert 3h, 6h, ... buckets. for factor in &[3, 6, 12, 24, 48, 96] { - nodes_by_time_by_country.insert(Duration::from_secs(60 * 60 * *factor), HashMap::new()); + nodes_by_time_by_country_and_provider + .insert(Duration::from_secs(60 * 60 * *factor), HashMap::new()); } for node in self.nodes.values() { // Safeguard in case exporter is behind on probing every nodes // uptime. if Instant::now() - node.last_seen > Duration::from_secs(60 * 60) { - continue + continue; } let up_since = match node.up_since { @@ -113,27 +120,30 @@ impl NodeStore { None => continue, }; - for (time_barrier, countries) in &mut nodes_by_time_by_country { + for (time_barrier, countries) in &mut nodes_by_time_by_country_and_provider { if Instant::now() - up_since > *time_barrier { countries - .entry( + .entry(( node.country .clone() .unwrap_or_else(|| "unknown".to_string()), - ) + node.provider + .clone() + .unwrap_or_else(|| "unknown".to_string()), + )) .and_modify(|v| *v += 1) .or_insert(1); } } } - for (time_barrier, countries) in nodes_by_time_by_country { + for (time_barrier, countries) in nodes_by_time_by_country_and_provider { let up_since = format!("{:?}h", time_barrier.as_secs() / 60 / 60); - for (country, count) in countries { + for ((country, provider), count) in countries { self.metrics .nodes_up_since - .with_label_values(&[&self.dht, &country, &up_since]) + .with_label_values(&[&self.dht, &country, &provider, &up_since]) .set(count as f64); } } @@ -151,6 +161,7 @@ impl NodeStore { pub struct Node { pub peer_id: PeerId, pub country: Option, + pub provider: Option, last_seen: Instant, up_since: Option, } @@ -160,6 +171,7 @@ impl Node { Node { peer_id, country: None, + provider: None, last_seen: Instant::now(), up_since: Some(Instant::now()), } @@ -170,6 +182,11 @@ impl Node { self } + pub fn with_cloud_provider(mut self, provider: String) -> Self { + self.provider = Some(provider); + self + } + fn merge(&mut self, other: Node) { self.country = self.country.take().or(other.country); self.up_since = self.up_since.take().or(other.up_since); @@ -195,7 +212,7 @@ impl Metrics { "nodes_seen_within", "Unique nodes discovered within the time bound through the Dht.", ), - &["dht", "country", "last_seen_within"], + &["dht", "country", "cloud_provider", "last_seen_within"], ) .unwrap(); registry @@ -207,7 +224,7 @@ impl Metrics { "nodes_up_since", "Unique nodes discovered through the Dht and up since timebound.", ), - &["dht", "country", "up_since"], + &["dht", "country", "cloud_provider", "up_since"], ) .unwrap(); registry.register(Box::new(nodes_up_since.clone())).unwrap(); diff --git a/src/main.rs b/src/main.rs index a3db5c4..465e796 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use std::{ }; use structopt::StructOpt; +mod cloud_provider_db; mod exporter; #[derive(Debug, StructOpt)] @@ -25,6 +26,8 @@ struct Opt { #[structopt(long)] max_mind_db: Option, + #[structopt(long)] + cloud_provider_db: Option, } fn main() -> Result<(), Box> { @@ -49,12 +52,16 @@ fn main() -> Result<(), Box> { let ip_db = opt .max_mind_db .map(|path| maxminddb::Reader::open_readfile(path).expect("Failed to open max mind db.")); + let cloud_provider_db = opt + .cloud_provider_db + .map(|path| cloud_provider_db::Db::new(path).expect("Failed to parse cloud provider db.")); let exporter = exporter::Exporter::new( opt.dht_name .into_iter() .zip(opt.dht_bootnode.into_iter()) .collect(), ip_db, + cloud_provider_db, ®istry, )?;