mirror of
https://github.com/fluencelabs/kademlia-exporter
synced 2025-04-24 22:02:13 +00:00
src/exporter: Periodically reconnect to each discovered node
This commit is contained in:
parent
b7fbc96ba3
commit
5f75e3b222
@ -8,6 +8,7 @@ use libp2p::{
|
||||
ping::{PingEvent, PingSuccess},
|
||||
PeerId,
|
||||
};
|
||||
use log::info;
|
||||
use maxminddb::{geoip2, Reader};
|
||||
use node_store::{Node, NodeStore};
|
||||
use prometheus::{
|
||||
@ -39,6 +40,9 @@ pub(crate) struct Exporter {
|
||||
in_flight_lookups: HashMap<PeerId, HistogramTimer>,
|
||||
tick: Delay,
|
||||
metrics: Metrics,
|
||||
/// An exporter periodically reconnects to each discovered node to probe
|
||||
/// whether it is still online.
|
||||
nodes_to_probe_periodically: HashMap<String, Vec<PeerId>>,
|
||||
}
|
||||
|
||||
impl Exporter {
|
||||
@ -57,6 +61,7 @@ impl Exporter {
|
||||
|
||||
let node_store_metrics = node_store::Metrics::register(registry);
|
||||
let node_stores = dhts
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|(name, _)| {
|
||||
(
|
||||
@ -66,6 +71,9 @@ impl Exporter {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let nodes_to_probe_periodically =
|
||||
dhts.into_iter().map(|(name, _)| (name, vec![])).collect();
|
||||
|
||||
Ok(Exporter {
|
||||
clients,
|
||||
metrics,
|
||||
@ -75,6 +83,7 @@ impl Exporter {
|
||||
tick: futures_timer::Delay::new(TICK_INTERVAL),
|
||||
|
||||
in_flight_lookups: HashMap::new(),
|
||||
nodes_to_probe_periodically,
|
||||
})
|
||||
}
|
||||
|
||||
@ -186,7 +195,7 @@ impl Exporter {
|
||||
}
|
||||
// Note: Do not interpret Discovered event as a proof of a node
|
||||
// being online.
|
||||
KademliaEvent::Discovered { peer_id, .. } => {
|
||||
KademliaEvent::Discovered { .. } => {
|
||||
self.metrics
|
||||
.event_counter
|
||||
.with_label_values(&[&name, "kad", "discovered"])
|
||||
@ -261,6 +270,31 @@ impl Future for Exporter {
|
||||
node_store.update_metrics();
|
||||
}
|
||||
|
||||
for (dht, nodes) in &mut this.nodes_to_probe_periodically {
|
||||
match nodes.pop() {
|
||||
Some(peer_id) => {
|
||||
info!("Checking if {:?} is still online.", &peer_id);
|
||||
if this.clients.get_mut(dht).unwrap().dial(&peer_id).is_err() {
|
||||
// Connection limit reached. Retry later.
|
||||
nodes.insert(0, peer_id);
|
||||
}
|
||||
}
|
||||
// List is empty. Reconnected to every peer. Refill the
|
||||
// list.
|
||||
None => {
|
||||
nodes.append(
|
||||
&mut this
|
||||
.node_stores
|
||||
.get(dht)
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|n| n.peer_id.clone())
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger a random lookup for each client.
|
||||
for (name, client) in &mut this.clients {
|
||||
let random_peer = PeerId::random();
|
||||
|
@ -1,6 +1,7 @@
|
||||
use futures::prelude::*;
|
||||
use libp2p::{
|
||||
core::{
|
||||
connection::ConnectionLimit,
|
||||
self, either::EitherError, either::EitherOutput, multiaddr::Protocol,
|
||||
muxing::StreamMuxerBox, transport::boxed::Boxed, transport::Transport, upgrade, Multiaddr,
|
||||
},
|
||||
@ -64,6 +65,10 @@ impl Client {
|
||||
pub fn get_closest_peers(&mut self, peer_id: PeerId) {
|
||||
self.swarm.kademlia.get_closest_peers(peer_id);
|
||||
}
|
||||
|
||||
pub fn dial(&mut self, peer_id: &PeerId) -> Result<bool, ConnectionLimit> {
|
||||
Swarm::dial(&mut self.swarm, peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this should be a stream instead.
|
||||
|
@ -69,10 +69,14 @@ impl NodeStore {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &Node> {
|
||||
self.nodes.values()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Node {
|
||||
peer_id: PeerId,
|
||||
pub peer_id: PeerId,
|
||||
country: Option<String>,
|
||||
last_seen: Instant,
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user