mirror of
https://github.com/fluencelabs/kademlia-exporter
synced 2025-04-24 13:52:13 +00:00
src: Run fmt and clippy
This commit is contained in:
parent
2efff13ee7
commit
6da53e4520
@ -1,5 +1,5 @@
|
||||
use client::Client;
|
||||
use futures::{prelude::*};
|
||||
use futures::prelude::*;
|
||||
use libp2p::{core::Multiaddr, identify::IdentifyEvent, kad::KademliaEvent};
|
||||
use prometheus::{CounterVec, GaugeVec, Opts, Registry};
|
||||
use std::{
|
||||
@ -19,14 +19,17 @@ impl Exporter {
|
||||
pub(crate) fn new(dhts: Vec<Multiaddr>, registry: &Registry) -> Result<Self, Box<dyn Error>> {
|
||||
let metrics = Metrics::register(registry);
|
||||
|
||||
let clients = dhts.into_iter().map(|addr| {
|
||||
(addr.iter().next().unwrap().to_string(), client::Client::new(addr).unwrap())
|
||||
}).collect();
|
||||
let clients = dhts
|
||||
.into_iter()
|
||||
.map(|addr| {
|
||||
(
|
||||
addr.iter().next().unwrap().to_string(),
|
||||
client::Client::new(addr).unwrap(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Exporter {
|
||||
clients,
|
||||
metrics,
|
||||
})
|
||||
Ok(Exporter { clients, metrics })
|
||||
}
|
||||
|
||||
fn record_event(&self, name: String, event: client::Event) {
|
||||
@ -58,7 +61,7 @@ impl Exporter {
|
||||
}
|
||||
},
|
||||
client::Event::Kademlia(event) => {
|
||||
match event {
|
||||
match *event {
|
||||
KademliaEvent::BootstrapResult(_) => {
|
||||
self.metrics
|
||||
.event_counter
|
||||
@ -175,10 +178,10 @@ impl Metrics {
|
||||
.unwrap();
|
||||
registry.register(Box::new(event_counter.clone())).unwrap();
|
||||
|
||||
let bucket_size = GaugeVec::new(Opts::new(
|
||||
"kad_kbuckets_size",
|
||||
"Libp2p Kademlia K-Buckets size.",
|
||||
), &["dht"])
|
||||
let bucket_size = GaugeVec::new(
|
||||
Opts::new("kad_kbuckets_size", "Libp2p Kademlia K-Buckets size."),
|
||||
&["dht"],
|
||||
)
|
||||
.unwrap();
|
||||
registry.register(Box::new(bucket_size.clone())).unwrap();
|
||||
Metrics {
|
||||
|
@ -1,31 +1,29 @@
|
||||
use futures::prelude::*;
|
||||
use libp2p::{
|
||||
core::{
|
||||
self, muxing::StreamMuxerBox, transport::boxed::Boxed, transport::Transport, Multiaddr, multiaddr::Protocol,
|
||||
either::EitherError, either::EitherOutput, upgrade,
|
||||
self, either::EitherError, either::EitherOutput, multiaddr::Protocol,
|
||||
muxing::StreamMuxerBox, transport::boxed::Boxed, transport::Transport, upgrade, Multiaddr,
|
||||
},
|
||||
dns,
|
||||
mplex,
|
||||
identify::{Identify, IdentifyEvent},
|
||||
identity::Keypair,
|
||||
kad::{record::store::MemoryStore, Kademlia, KademliaConfig, KademliaEvent, protocol::KademliaProtocolConfig},
|
||||
noise,
|
||||
kad::{
|
||||
record::store::MemoryStore, Kademlia, KademliaConfig,
|
||||
KademliaEvent,
|
||||
},
|
||||
mplex, noise,
|
||||
ping::{Ping, PingConfig, PingEvent},
|
||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
|
||||
tcp, yamux, NetworkBehaviour, PeerId, Swarm,
|
||||
secio,
|
||||
InboundUpgradeExt,
|
||||
OutboundUpgradeExt,
|
||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
|
||||
tcp, yamux, InboundUpgradeExt, NetworkBehaviour, OutboundUpgradeExt, PeerId, Swarm,
|
||||
};
|
||||
use std::{
|
||||
io,
|
||||
usize,
|
||||
str::FromStr,
|
||||
convert::TryInto,
|
||||
error::Error,
|
||||
io,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
usize,
|
||||
};
|
||||
|
||||
pub struct Client {
|
||||
@ -39,8 +37,6 @@ impl Client {
|
||||
let local_key = Keypair::generate_ed25519();
|
||||
let local_peer_id = PeerId::from(local_key.public());
|
||||
|
||||
&local_peer_id;
|
||||
|
||||
let behaviour = MyBehaviour::new(local_key.clone())?;
|
||||
let transport = build_transport(local_key);
|
||||
let mut swarm = Swarm::new(transport, behaviour, local_peer_id);
|
||||
@ -48,15 +44,12 @@ impl Client {
|
||||
// Listen on all interfaces and whatever port the OS assigns.
|
||||
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?;
|
||||
|
||||
|
||||
let bootnode_peer_id = if let Protocol::P2p(hash) = bootnode.pop().unwrap() {
|
||||
PeerId::from_multihash(hash).unwrap()
|
||||
} else {
|
||||
panic!("expected peer id");
|
||||
};
|
||||
|
||||
|
||||
|
||||
swarm.kademlia.add_address(&bootnode_peer_id, bootnode);
|
||||
swarm.kademlia.bootstrap();
|
||||
|
||||
@ -103,7 +96,7 @@ pub(crate) struct MyBehaviour {
|
||||
pub enum Event {
|
||||
Ping(PingEvent),
|
||||
Identify(Box<IdentifyEvent>),
|
||||
Kademlia(KademliaEvent),
|
||||
Kademlia(Box<KademliaEvent>),
|
||||
}
|
||||
|
||||
impl MyBehaviour {
|
||||
@ -165,7 +158,7 @@ impl NetworkBehaviourEventProcess<IdentifyEvent> for MyBehaviour {
|
||||
|
||||
impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
|
||||
fn inject_event(&mut self, event: KademliaEvent) {
|
||||
self.event_buffer.push(Event::Kademlia(event));
|
||||
self.event_buffer.push(Event::Kademlia(Box::new(event)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -175,45 +168,49 @@ fn build_transport(keypair: Keypair) -> Boxed<(PeerId, StreamMuxerBox), impl Err
|
||||
|
||||
let noise_keypair = noise::Keypair::new().into_authentic(&keypair).unwrap();
|
||||
let noise_config = noise::NoiseConfig::ix(noise_keypair);
|
||||
let secio_config = secio::SecioConfig::new(keypair).max_frame_len(1024 * 1024);
|
||||
let secio_config = secio::SecioConfig::new(keypair).max_frame_len(1024 * 1024);
|
||||
|
||||
let transport = transport.and_then(move |stream, endpoint| {
|
||||
let upgrade = core::upgrade::SelectUpgrade::new(noise_config, secio_config);
|
||||
core::upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
|
||||
.map(|out| match out? {
|
||||
// We negotiated noise
|
||||
EitherOutput::First((remote_id, out)) => {
|
||||
let remote_key = match remote_id {
|
||||
noise::RemoteIdentity::IdentityKey(key) => key,
|
||||
_ => return Err(upgrade::UpgradeError::Apply(EitherError::A(noise::NoiseError::InvalidKey)))
|
||||
};
|
||||
Ok((EitherOutput::First(out), remote_key.into_peer_id()))
|
||||
}
|
||||
// We negotiated secio
|
||||
EitherOutput::Second((remote_id, out)) =>
|
||||
Ok((EitherOutput::Second(out), remote_id))
|
||||
})
|
||||
});
|
||||
let upgrade = core::upgrade::SelectUpgrade::new(noise_config, secio_config);
|
||||
core::upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1).map(
|
||||
|out| match out? {
|
||||
// We negotiated noise
|
||||
EitherOutput::First((remote_id, out)) => {
|
||||
let remote_key = match remote_id {
|
||||
noise::RemoteIdentity::IdentityKey(key) => key,
|
||||
_ => {
|
||||
return Err(upgrade::UpgradeError::Apply(EitherError::A(
|
||||
noise::NoiseError::InvalidKey,
|
||||
)))
|
||||
}
|
||||
};
|
||||
Ok((EitherOutput::First(out), remote_key.into_peer_id()))
|
||||
}
|
||||
// We negotiated secio
|
||||
EitherOutput::Second((remote_id, out)) => {
|
||||
Ok((EitherOutput::Second(out), remote_id))
|
||||
}
|
||||
},
|
||||
)
|
||||
});
|
||||
|
||||
let mut mplex_config = mplex::MplexConfig::new();
|
||||
mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block);
|
||||
mplex_config.max_buffer_len(usize::MAX);
|
||||
let yamux_config = yamux::Config::default();
|
||||
|
||||
// Multiplexing
|
||||
let transport = transport.and_then(move |(stream, peer_id), endpoint| {
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
|
||||
.map_inbound(move |muxer| (peer_id, muxer))
|
||||
.map_outbound(move |muxer| (peer_id2, muxer));
|
||||
|
||||
core::upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
|
||||
.map_ok(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
|
||||
})
|
||||
|
||||
.timeout(Duration::from_secs(20))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
.boxed();
|
||||
let mut mplex_config = mplex::MplexConfig::new();
|
||||
mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block);
|
||||
mplex_config.max_buffer_len(usize::MAX);
|
||||
let yamux_config = yamux::Config::default();
|
||||
|
||||
// Multiplexing
|
||||
transport
|
||||
.and_then(move |(stream, peer_id), endpoint| {
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
|
||||
.map_inbound(move |muxer| (peer_id, muxer))
|
||||
.map_outbound(move |muxer| (peer_id2, muxer));
|
||||
|
||||
core::upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
|
||||
.map_ok(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
|
||||
})
|
||||
.timeout(Duration::from_secs(20))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
.boxed()
|
||||
}
|
||||
|
@ -10,7 +10,10 @@ use structopt::StructOpt;
|
||||
mod exporter;
|
||||
|
||||
#[derive(Debug, StructOpt)]
|
||||
#[structopt(name = "Kademlia exporter", about = "Monitor the state of a Kademlia Dht.")]
|
||||
#[structopt(
|
||||
name = "Kademlia exporter",
|
||||
about = "Monitor the state of a Kademlia Dht."
|
||||
)]
|
||||
struct Opt {
|
||||
#[structopt(long)]
|
||||
dht: Vec<Multiaddr>,
|
||||
|
Loading…
x
Reference in New Issue
Block a user