mirror of
https://github.com/fluencelabs/kademlia-exporter
synced 2025-04-24 13:52:13 +00:00
src: Run fmt
This commit is contained in:
parent
f69244d5dd
commit
975b1db733
@ -1,7 +1,12 @@
|
||||
use client::Client;
|
||||
use futures::prelude::*;
|
||||
use libp2p::{multiaddr::{Multiaddr, Protocol}, identify::IdentifyEvent, kad::KademliaEvent};
|
||||
use libp2p::{
|
||||
identify::IdentifyEvent,
|
||||
kad::KademliaEvent,
|
||||
multiaddr::{Multiaddr, Protocol},
|
||||
};
|
||||
|
||||
use maxminddb::{geoip2, Reader};
|
||||
use prometheus::{CounterVec, GaugeVec, Opts, Registry};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
@ -9,7 +14,6 @@ use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use maxminddb::{geoip2, Reader};
|
||||
|
||||
mod client;
|
||||
|
||||
@ -19,21 +23,23 @@ pub(crate) struct Exporter {
|
||||
ip_db: Option<Reader<Vec<u8>>>,
|
||||
}
|
||||
impl Exporter {
|
||||
pub(crate) fn new(dhts: Vec<(String, Multiaddr)>, ip_db: Option<Reader<Vec<u8>>>, registry: &Registry) -> Result<Self, Box<dyn Error>> {
|
||||
pub(crate) fn new(
|
||||
dhts: Vec<(String, Multiaddr)>,
|
||||
ip_db: Option<Reader<Vec<u8>>>,
|
||||
registry: &Registry,
|
||||
) -> Result<Self, Box<dyn Error>> {
|
||||
let metrics = Metrics::register(registry);
|
||||
|
||||
let clients = dhts
|
||||
.into_iter()
|
||||
.map(|(name, bootnode)| {
|
||||
(
|
||||
name,
|
||||
client::Client::new(bootnode).unwrap(),
|
||||
)
|
||||
})
|
||||
.map(|(name, bootnode)| (name, client::Client::new(bootnode).unwrap()))
|
||||
.collect();
|
||||
|
||||
|
||||
Ok(Exporter { clients, metrics, ip_db })
|
||||
Ok(Exporter {
|
||||
clients,
|
||||
metrics,
|
||||
ip_db,
|
||||
})
|
||||
}
|
||||
|
||||
fn record_event(&self, name: String, event: client::Event) {
|
||||
@ -120,12 +126,21 @@ impl Exporter {
|
||||
.with_label_values(&[&name, "kad", "discovered"])
|
||||
.inc();
|
||||
}
|
||||
KademliaEvent::RoutingUpdated { old_peer, addresses, .. } => {
|
||||
let country = self.multiaddresses_to_country_code(addresses.iter()).unwrap_or("unknown".to_string());
|
||||
KademliaEvent::RoutingUpdated {
|
||||
old_peer,
|
||||
addresses,
|
||||
..
|
||||
} => {
|
||||
let country = self
|
||||
.multiaddresses_to_country_code(addresses.iter())
|
||||
.unwrap_or("unknown".to_string());
|
||||
|
||||
// Check if it is a new node, or just an update to a node.
|
||||
if old_peer.is_none() {
|
||||
self.metrics.nodes.with_label_values(&[&name, &country]).inc();
|
||||
self.metrics
|
||||
.nodes
|
||||
.with_label_values(&[&name, &country])
|
||||
.inc();
|
||||
}
|
||||
self.metrics
|
||||
.event_counter
|
||||
@ -143,11 +158,14 @@ impl Exporter {
|
||||
}
|
||||
}
|
||||
|
||||
fn multiaddresses_to_country_code<'a>(&self, addresses: impl Iterator<Item = &'a Multiaddr>) -> Option<String> {
|
||||
fn multiaddresses_to_country_code<'a>(
|
||||
&self,
|
||||
addresses: impl Iterator<Item = &'a Multiaddr>,
|
||||
) -> Option<String> {
|
||||
for address in addresses {
|
||||
let country = self.multiaddress_to_country_code(address);
|
||||
if country.is_some() {
|
||||
return country
|
||||
return country;
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,10 +176,14 @@ impl Exporter {
|
||||
match address.iter().next()? {
|
||||
Protocol::Ip4(addr) => {
|
||||
if let Some(ip_db) = &self.ip_db {
|
||||
return ip_db.lookup::<geoip2::City>(std::net::IpAddr::V4(addr)).ok()?.country?.iso_code;
|
||||
return ip_db
|
||||
.lookup::<geoip2::City>(std::net::IpAddr::V4(addr))
|
||||
.ok()?
|
||||
.country?
|
||||
.iso_code;
|
||||
}
|
||||
}
|
||||
_ => {},
|
||||
_ => {}
|
||||
}
|
||||
|
||||
None
|
||||
|
@ -43,7 +43,10 @@ impl Client {
|
||||
|
||||
let behaviour = MyBehaviour::new(local_key.clone())?;
|
||||
let transport = build_transport(local_key);
|
||||
let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id).incoming_connection_limit(10).outgoing_connection_limit(10).build();
|
||||
let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id)
|
||||
.incoming_connection_limit(10)
|
||||
.outgoing_connection_limit(10)
|
||||
.build();
|
||||
|
||||
// Listen on all interfaces and whatever port the OS assigns.
|
||||
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?;
|
||||
|
13
src/main.rs
13
src/main.rs
@ -46,8 +46,17 @@ fn main() -> Result<(), Box<dyn Error>> {
|
||||
.unwrap();
|
||||
|
||||
let registry = Registry::new();
|
||||
let ip_db = opt.max_mind_db.map(|path| maxminddb::Reader::open_readfile(path).expect("Failed to open max mind db."));
|
||||
let exporter = exporter::Exporter::new(opt.dht_name.into_iter().zip(opt.dht_bootnode.into_iter()).collect(), ip_db, ®istry)?;
|
||||
let ip_db = opt
|
||||
.max_mind_db
|
||||
.map(|path| maxminddb::Reader::open_readfile(path).expect("Failed to open max mind db."));
|
||||
let exporter = exporter::Exporter::new(
|
||||
opt.dht_name
|
||||
.into_iter()
|
||||
.zip(opt.dht_bootnode.into_iter())
|
||||
.collect(),
|
||||
ip_db,
|
||||
®istry,
|
||||
)?;
|
||||
|
||||
let exit_clone = exit.clone();
|
||||
let metrics_server = std::thread::spawn(move || {
|
||||
|
Loading…
x
Reference in New Issue
Block a user