misc/metrics: Add auxiliary crate to record events as OpenMetrics (#2063)

This commit adds an auxiliary crate recording protocol and Swarm events
and exposing them as metrics in the OpenMetrics format.
This commit is contained in:
Max Inden
2021-08-13 22:51:54 +02:00
committed by GitHub
parent ce23cbe76a
commit 98bc5e6486
11 changed files with 1152 additions and 4 deletions

View File

@ -36,6 +36,7 @@
## Utilities
- [`libp2p-metrics` CHANGELOG](misc/metrics/CHANGELOG.md)
- [`multistream-select` CHANGELOG](misc/multistream-select/CHANGELOG.md)
# `libp2p` facade crate
@ -67,6 +68,8 @@
- Re-export the `wasm-bindgen` feature from `parking_lot`, so
`libp2p` users can opt-in to that crate's Wasm support. See [PR 2180].
- Add `libp2p-metrics`.
[PR 2180]: https://github.com/libp2p/rust-libp2p/pull/2180/
## Version 0.39.1 [2021-07-12]

View File

@ -36,13 +36,14 @@ deflate = ["libp2p-deflate"]
dns-async-std = ["libp2p-dns", "libp2p-dns/async-std"]
dns-tokio = ["libp2p-dns", "libp2p-dns/tokio"]
floodsub = ["libp2p-floodsub"]
identify = ["libp2p-identify"]
kad = ["libp2p-kad"]
identify = ["libp2p-identify", "libp2p-metrics/identify"]
kad = ["libp2p-kad", "libp2p-metrics/kad"]
gossipsub = ["libp2p-gossipsub"]
metrics = ["libp2p-metrics"]
mdns = ["libp2p-mdns"]
mplex = ["libp2p-mplex"]
noise = ["libp2p-noise"]
ping = ["libp2p-ping"]
ping = ["libp2p-ping", "libp2p-metrics/ping"]
plaintext = ["libp2p-plaintext"]
pnet = ["libp2p-pnet"]
relay = ["libp2p-relay"]
@ -70,6 +71,7 @@ libp2p-floodsub = { version = "0.31.0", path = "protocols/floodsub", optional =
libp2p-gossipsub = { version = "0.33.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.31.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.32.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.1.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.30.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.33.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.31.0", path = "protocols/ping", optional = true }
@ -104,6 +106,7 @@ tokio = { version = "1.0.1", features = ["io-util", "io-std", "macros", "rt", "r
resolver = "2"
members = [
"core",
"misc/metrics",
"misc/multistream-select",
"misc/peer-id-generator",
"muxers/mplex",

View File

@ -0,0 +1,3 @@
## Version 0.1.0 [unreleased]
- Add initial version.

29
misc/metrics/Cargo.toml Normal file
View File

@ -0,0 +1,29 @@
[package]
name = "libp2p-metrics"
edition = "2018"
description = "Metrics for libp2p"
version = "0.1.0"
authors = ["Max Inden <mail@max-inden.de>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[features]
identify = ["libp2p-identify"]
kad = ["libp2p-kad"]
ping = ["libp2p-ping"]
[dependencies]
libp2p-core= { version = "0.30.0", path = "../../core" }
libp2p-identify = { version = "0.31.0", path = "../../protocols/identify", optional = true }
libp2p-kad = { version = "0.32.0", path = "../../protocols/kad", optional = true }
libp2p-ping = { version = "0.31.0", path = "../../protocols/ping", optional = true }
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
open-metrics-client = "0.12.0"
[dev-dependencies]
env_logger = "0.8.1"
futures = "0.3.1"
libp2p = { path = "../../", features = ["metrics"] }
tide = "0.16"

View File

@ -0,0 +1,115 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Example demonstrating `libp2p-metrics`.
//!
//! In one terminal run:
//!
//! ```
//! cargo run --example metrics
//! ```
//!
//! In a second terminal run:
//!
//! ```
//! cargo run --example metrics -- <listen-addr-of-first-node>
//! ```
//!
//! Where `<listen-addr-of-first-node>` is replaced by the listen address of the
//! first node reported in the first terminal. Look for `NewListenAddr`.
//!
//! In a third terminal run:
//!
//! ```
//! curl localhost:<metrics-port-of-first-or-second-node>/metrics
//! ```
//!
//! Where `<metrics-port-of-first-or-second-node>` is replaced by the listen
//! port of the metrics server of the first or the second node. Look for
//! `tide::server Server listening on`.
//!
//! You should see a long list of metrics printed to the terminal. Check the
//! `libp2p_ping` metrics, they should be `>0`.
use futures::executor::block_on;
use futures::stream::StreamExt;
use libp2p::metrics::{Metrics, Recorder};
use libp2p::ping::{Ping, PingConfig};
use libp2p::swarm::SwarmEvent;
use libp2p::{identity, PeerId, Swarm};
use open_metrics_client::encoding::text::encode;
use open_metrics_client::registry::Registry;
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::thread;
fn main() -> Result<(), Box<dyn Error>> {
tide::log::start();
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
tide::log::info!("Local peer id: {:?}", local_peer_id);
let mut swarm = Swarm::new(
block_on(libp2p::development_transport(local_key))?,
Ping::new(PingConfig::new().with_keep_alive(true)),
local_peer_id,
);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
if let Some(addr) = std::env::args().nth(1) {
let remote = addr.parse()?;
swarm.dial_addr(remote)?;
tide::log::info!("Dialed {}", addr)
}
let mut metric_registry = Registry::default();
let metrics = Metrics::new(&mut metric_registry);
thread::spawn(move || block_on(metrics_server(metric_registry)));
block_on(async {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(ping_event) => {
tide::log::info!("{:?}", ping_event);
metrics.record(&ping_event);
}
swarm_event => {
tide::log::info!("{:?}", swarm_event);
metrics.record(&swarm_event);
}
}
}
})
}
pub async fn metrics_server(registry: Registry) -> std::result::Result<(), std::io::Error> {
let mut app = tide::with_state(Arc::new(Mutex::new(registry)));
app.at("/metrics")
.get(|req: tide::Request<Arc<Mutex<Registry>>>| async move {
let mut encoded = Vec::new();
encode(&mut encoded, &req.state().lock().unwrap()).unwrap();
Ok(String::from_utf8(encoded).unwrap())
});
app.listen("0.0.0.0:0").await?;
Ok(())
}

View File

@ -0,0 +1,122 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use open_metrics_client::metrics::counter::Counter;
use open_metrics_client::metrics::histogram::{exponential_buckets, Histogram};
use open_metrics_client::registry::Registry;
use std::iter;
pub struct Metrics {
error: Counter,
pushed: Counter,
received: Counter,
received_info_listen_addrs: Histogram,
received_info_protocols: Histogram,
sent: Counter,
}
impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("identify");
let error = Counter::default();
sub_registry.register(
"errors",
"Number of errors while attempting to identify the remote",
Box::new(error.clone()),
);
let pushed = Counter::default();
sub_registry.register(
"pushed",
"Number of times identification information of the local node has \
been actively pushed to a peer.",
Box::new(pushed.clone()),
);
let received = Counter::default();
sub_registry.register(
"received",
"Number of times identification information has been received from \
a peer",
Box::new(received.clone()),
);
let received_info_listen_addrs =
Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9)));
sub_registry.register(
"received_info_listen_addrs",
"Number of listen addresses for remote peer received in \
identification information",
Box::new(received_info_listen_addrs.clone()),
);
let received_info_protocols =
Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9)));
sub_registry.register(
"received_info_protocols",
"Number of protocols supported by the remote peer received in \
identification information",
Box::new(received_info_protocols.clone()),
);
let sent = Counter::default();
sub_registry.register(
"sent",
"Number of times identification information of the local node has \
been sent to a peer in response to an identification request",
Box::new(sent.clone()),
);
Self {
error,
pushed,
received,
received_info_listen_addrs,
received_info_protocols,
sent,
}
}
}
impl super::Recorder<libp2p_identify::IdentifyEvent> for super::Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
match event {
libp2p_identify::IdentifyEvent::Error { .. } => {
self.identify.error.inc();
}
libp2p_identify::IdentifyEvent::Pushed { .. } => {
self.identify.pushed.inc();
}
libp2p_identify::IdentifyEvent::Received { info, .. } => {
self.identify.received.inc();
self.identify
.received_info_protocols
.observe(info.protocols.len() as f64);
self.identify
.received_info_listen_addrs
.observe(info.listen_addrs.len() as f64);
}
libp2p_identify::IdentifyEvent::Sent { .. } => {
self.identify.sent.inc();
}
}
}
}

427
misc/metrics/src/kad.rs Normal file
View File

@ -0,0 +1,427 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use open_metrics_client::encoding::text::Encode;
use open_metrics_client::metrics::counter::Counter;
use open_metrics_client::metrics::family::Family;
use open_metrics_client::metrics::histogram::{exponential_buckets, Histogram};
use open_metrics_client::registry::{Registry, Unit};
pub struct Metrics {
query_result_get_record_ok: Histogram,
query_result_get_record_error: Family<GetRecordResult, Counter>,
query_result_get_closest_peers_ok: Histogram,
query_result_get_closest_peers_error: Family<GetClosestPeersResult, Counter>,
query_result_get_providers_ok: Histogram,
query_result_get_providers_error: Family<GetProvidersResult, Counter>,
query_result_num_requests: Family<QueryResult, Histogram>,
query_result_num_success: Family<QueryResult, Histogram>,
query_result_num_failure: Family<QueryResult, Histogram>,
query_result_duration: Family<QueryResult, Histogram>,
routing_updated: Family<RoutingUpdated, Counter>,
inbound_requests: Family<InboundRequest, Counter>,
}
impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("kad");
let query_result_get_record_ok = Histogram::new(exponential_buckets(1.0, 2.0, 10));
sub_registry.register(
"query_result_get_record_ok",
"Number of records returned by a successful Kademlia get record query.",
Box::new(query_result_get_record_ok.clone()),
);
let query_result_get_record_error = Family::default();
sub_registry.register(
"query_result_get_record_error",
"Number of failed Kademlia get record queries.",
Box::new(query_result_get_record_error.clone()),
);
let query_result_get_closest_peers_ok = Histogram::new(exponential_buckets(1.0, 2.0, 10));
sub_registry.register(
"query_result_get_closest_peers_ok",
"Number of closest peers returned by a successful Kademlia get closest peers query.",
Box::new(query_result_get_closest_peers_ok.clone()),
);
let query_result_get_closest_peers_error = Family::default();
sub_registry.register(
"query_result_get_closest_peers_error",
"Number of failed Kademlia get closest peers queries.",
Box::new(query_result_get_closest_peers_error.clone()),
);
let query_result_get_providers_ok = Histogram::new(exponential_buckets(1.0, 2.0, 10));
sub_registry.register(
"query_result_get_providers_ok",
"Number of providers returned by a successful Kademlia get providers query.",
Box::new(query_result_get_providers_ok.clone()),
);
let query_result_get_providers_error = Family::default();
sub_registry.register(
"query_result_get_providers_error",
"Number of failed Kademlia get providers queries.",
Box::new(query_result_get_providers_error.clone()),
);
let query_result_num_requests =
Family::new_with_constructor(|| Histogram::new(exponential_buckets(1.0, 2.0, 10)));
sub_registry.register(
"query_result_num_requests",
"Number of requests started for a Kademlia query.",
Box::new(query_result_num_requests.clone()),
);
let query_result_num_success =
Family::new_with_constructor(|| Histogram::new(exponential_buckets(1.0, 2.0, 10)));
sub_registry.register(
"query_result_num_success",
"Number of successful requests of a Kademlia query.",
Box::new(query_result_num_success.clone()),
);
let query_result_num_failure =
Family::new_with_constructor(|| Histogram::new(exponential_buckets(1.0, 2.0, 10)));
sub_registry.register(
"query_result_num_failure",
"Number of failed requests of a Kademlia query.",
Box::new(query_result_num_failure.clone()),
);
let query_result_duration =
Family::new_with_constructor(|| Histogram::new(exponential_buckets(0.001, 2.0, 12)));
sub_registry.register_with_unit(
"query_result_duration",
"Duration of a Kademlia query.",
Unit::Seconds,
Box::new(query_result_duration.clone()),
);
let routing_updated = Family::default();
sub_registry.register(
"routing_updated",
"Number of peers added, updated or evicted to, in or from a specific kbucket in the routing table",
Box::new(routing_updated.clone()),
);
let inbound_requests = Family::default();
sub_registry.register(
"inbound_requests",
"Number of inbound requests",
Box::new(inbound_requests.clone()),
);
Self {
query_result_get_record_ok,
query_result_get_record_error,
query_result_get_closest_peers_ok,
query_result_get_closest_peers_error,
query_result_get_providers_ok,
query_result_get_providers_error,
query_result_num_requests,
query_result_num_success,
query_result_num_failure,
query_result_duration,
routing_updated,
inbound_requests,
}
}
}
impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
self.kad
.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
self.kad
.query_result_num_success
.get_or_create(&result.into())
.observe(stats.num_successes().into());
self.kad
.query_result_num_failure
.get_or_create(&result.into())
.observe(stats.num_failures().into());
if let Some(duration) = stats.duration() {
self.kad
.query_result_duration
.get_or_create(&result.into())
.observe(duration.as_secs_f64());
}
match result {
libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self
.kad
.query_result_get_record_ok
.observe(ok.records.len() as f64),
Err(error) => {
self.kad
.query_result_get_record_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetClosestPeers(result) => match result {
Ok(ok) => self
.kad
.query_result_get_closest_peers_ok
.observe(ok.peers.len() as f64),
Err(error) => {
self.kad
.query_result_get_closest_peers_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.kad
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Err(error) => {
self.kad
.query_result_get_providers_error
.get_or_create(&error.into())
.inc();
}
},
_ => {}
}
}
libp2p_kad::KademliaEvent::RoutingUpdated {
is_new_peer,
old_peer,
bucket_range: (low, _high),
..
} => {
let bucket = low.ilog2().unwrap_or(0);
if *is_new_peer {
self.kad
.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Added,
bucket,
})
.inc();
} else {
self.kad
.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Updated,
bucket,
})
.inc();
}
if old_peer.is_some() {
self.kad
.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Evicted,
bucket,
})
.inc();
}
}
libp2p_kad::KademliaEvent::InboundRequestServed { request } => {
self.kad
.inbound_requests
.get_or_create(&request.into())
.inc();
}
_ => {}
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct QueryResult {
r#type: QueryType,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum QueryType {
Bootstrap,
GetClosestPeers,
GetProviders,
StartProviding,
RepublishProvider,
GetRecord,
PutRecord,
RepublishRecord,
}
impl From<&libp2p_kad::QueryResult> for QueryResult {
fn from(result: &libp2p_kad::QueryResult) -> Self {
match result {
libp2p_kad::QueryResult::Bootstrap(_) => QueryResult {
r#type: QueryType::Bootstrap,
},
libp2p_kad::QueryResult::GetClosestPeers(_) => QueryResult {
r#type: QueryType::GetClosestPeers,
},
libp2p_kad::QueryResult::GetProviders(_) => QueryResult {
r#type: QueryType::GetProviders,
},
libp2p_kad::QueryResult::StartProviding(_) => QueryResult {
r#type: QueryType::StartProviding,
},
libp2p_kad::QueryResult::RepublishProvider(_) => QueryResult {
r#type: QueryType::RepublishProvider,
},
libp2p_kad::QueryResult::GetRecord(_) => QueryResult {
r#type: QueryType::GetRecord,
},
libp2p_kad::QueryResult::PutRecord(_) => QueryResult {
r#type: QueryType::PutRecord,
},
libp2p_kad::QueryResult::RepublishRecord(_) => QueryResult {
r#type: QueryType::RepublishRecord,
},
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct GetRecordResult {
error: GetRecordError,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum GetRecordError {
NotFound,
QuorumFailed,
Timeout,
}
impl From<&libp2p_kad::GetRecordError> for GetRecordResult {
fn from(error: &libp2p_kad::GetRecordError) -> Self {
match error {
libp2p_kad::GetRecordError::NotFound { .. } => GetRecordResult {
error: GetRecordError::NotFound,
},
libp2p_kad::GetRecordError::QuorumFailed { .. } => GetRecordResult {
error: GetRecordError::QuorumFailed,
},
libp2p_kad::GetRecordError::Timeout { .. } => GetRecordResult {
error: GetRecordError::Timeout,
},
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct GetClosestPeersResult {
error: GetClosestPeersError,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum GetClosestPeersError {
Timeout,
}
impl From<&libp2p_kad::GetClosestPeersError> for GetClosestPeersResult {
fn from(error: &libp2p_kad::GetClosestPeersError) -> Self {
match error {
libp2p_kad::GetClosestPeersError::Timeout { .. } => GetClosestPeersResult {
error: GetClosestPeersError::Timeout,
},
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct GetProvidersResult {
error: GetProvidersError,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum GetProvidersError {
Timeout,
}
impl From<&libp2p_kad::GetProvidersError> for GetProvidersResult {
fn from(error: &libp2p_kad::GetProvidersError) -> Self {
match error {
libp2p_kad::GetProvidersError::Timeout { .. } => GetProvidersResult {
error: GetProvidersError::Timeout,
},
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct RoutingUpdated {
action: RoutingAction,
bucket: u32,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum RoutingAction {
Added,
Updated,
Evicted,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct InboundRequest {
request: Request,
}
impl From<&libp2p_kad::InboundRequest> for InboundRequest {
fn from(request: &libp2p_kad::InboundRequest) -> Self {
Self {
request: match request {
libp2p_kad::InboundRequest::FindNode { .. } => Request::FindNode,
libp2p_kad::InboundRequest::GetProvider { .. } => Request::GetProvider,
libp2p_kad::InboundRequest::AddProvider { .. } => Request::AddProvider,
libp2p_kad::InboundRequest::GetRecord { .. } => Request::GetRecord,
libp2p_kad::InboundRequest::PutRecord { .. } => Request::PutRecord,
},
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum Request {
FindNode,
GetProvider,
AddProvider,
GetRecord,
PutRecord,
}

76
misc/metrics/src/lib.rs Normal file
View File

@ -0,0 +1,76 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Auxiliary crate recording protocol and Swarm events and exposing them as
//! metrics in the [OpenMetrics] format.
//!
//! [OpenMetrics]: https://github.com/OpenObservability/OpenMetrics/
//!
//! See `examples` directory for more.
#[cfg(feature = "identify")]
mod identify;
#[cfg(feature = "kad")]
mod kad;
#[cfg(feature = "ping")]
mod ping;
mod swarm;
use open_metrics_client::registry::Registry;
/// Set of Swarm and protocol metrics derived from emitted events.
pub struct Metrics {
#[cfg(feature = "identify")]
identify: identify::Metrics,
#[cfg(feature = "kad")]
kad: kad::Metrics,
#[cfg(feature = "ping")]
ping: ping::Metrics,
swarm: swarm::Metrics,
}
impl Metrics {
/// Create a new set of Swarm and protocol [`Metrics`].
///
/// ```
/// use open_metrics_client::registry::Registry;
/// use libp2p_metrics::Metrics;
/// let mut registry = Registry::default();
/// let metrics = Metrics::new(&mut registry);
/// ```
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("libp2p");
Self {
#[cfg(feature = "identify")]
identify: identify::Metrics::new(sub_registry),
#[cfg(feature = "kad")]
kad: kad::Metrics::new(sub_registry),
#[cfg(feature = "ping")]
ping: ping::Metrics::new(sub_registry),
swarm: swarm::Metrics::new(sub_registry),
}
}
}
/// Recorder that can record Swarm and protocol events.
pub trait Recorder<Event> {
/// Record the given event.
fn record(&self, event: &Event);
}

109
misc/metrics/src/ping.rs Normal file
View File

@ -0,0 +1,109 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use open_metrics_client::encoding::text::Encode;
use open_metrics_client::metrics::counter::Counter;
use open_metrics_client::metrics::family::Family;
use open_metrics_client::metrics::histogram::{exponential_buckets, Histogram};
use open_metrics_client::registry::{Registry, Unit};
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
struct FailureLabels {
reason: Failure,
}
impl From<&libp2p_ping::PingFailure> for FailureLabels {
fn from(failure: &libp2p_ping::PingFailure) -> Self {
match failure {
libp2p_ping::PingFailure::Timeout => FailureLabels {
reason: Failure::Timeout,
},
libp2p_ping::PingFailure::Unsupported => FailureLabels {
reason: Failure::Unsupported,
},
libp2p_ping::PingFailure::Other { .. } => FailureLabels {
reason: Failure::Other,
},
}
}
}
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
enum Failure {
Timeout,
Unsupported,
Other,
}
pub struct Metrics {
rtt: Histogram,
failure: Family<FailureLabels, Counter>,
pong_received: Counter,
}
impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("ping");
let rtt = Histogram::new(exponential_buckets(0.001, 2.0, 12));
sub_registry.register_with_unit(
"rtt",
"Round-trip time sending a 'ping' and receiving a 'pong'",
Unit::Seconds,
Box::new(rtt.clone()),
);
let failure = Family::default();
sub_registry.register(
"failure",
"Failure while sending a 'ping' or receiving a 'pong'",
Box::new(failure.clone()),
);
let pong_received = Counter::default();
sub_registry.register(
"pong_received",
"Number of 'pong's received",
Box::new(pong_received.clone()),
);
Self {
rtt,
failure,
pong_received,
}
}
}
impl super::Recorder<libp2p_ping::PingEvent> for super::Metrics {
fn record(&self, event: &libp2p_ping::PingEvent) {
match &event.result {
Ok(libp2p_ping::PingSuccess::Pong) => {
self.ping.pong_received.inc();
}
Ok(libp2p_ping::PingSuccess::Ping { rtt }) => {
self.ping.rtt.observe(rtt.as_secs_f64());
}
Err(failure) => {
self.ping.failure.get_or_create(&failure.into()).inc();
}
}
}
}

257
misc/metrics/src/swarm.rs Normal file
View File

@ -0,0 +1,257 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use open_metrics_client::encoding::text::Encode;
use open_metrics_client::metrics::counter::Counter;
use open_metrics_client::metrics::family::Family;
use open_metrics_client::registry::Registry;
pub struct Metrics {
connections_incoming: Counter,
connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
connections_established: Family<ConnectionEstablishedLabeles, Counter>,
connections_closed: Counter,
new_listen_addr: Counter,
expired_listen_addr: Counter,
listener_closed: Counter,
listener_error: Counter,
dial_attempt: Counter,
dial_unreachable_addr: Family<Vec<(String, String)>, Counter>,
connected_to_banned_peer: Counter,
}
impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("swarm");
let connections_incoming = Counter::default();
sub_registry.register(
"connections_incoming",
"Number of incoming connections",
Box::new(connections_incoming.clone()),
);
let connections_incoming_error = Family::default();
sub_registry.register(
"connections_incoming_error",
"Number of incoming connection errors",
Box::new(connections_incoming_error.clone()),
);
let new_listen_addr = Counter::default();
sub_registry.register(
"new_listen_addr",
"Number of new listen addresses",
Box::new(new_listen_addr.clone()),
);
let expired_listen_addr = Counter::default();
sub_registry.register(
"expired_listen_addr",
"Number of expired listen addresses",
Box::new(expired_listen_addr.clone()),
);
let listener_closed = Counter::default();
sub_registry.register(
"listener_closed",
"Number of listeners closed",
Box::new(listener_closed.clone()),
);
let listener_error = Counter::default();
sub_registry.register(
"listener_error",
"Number of listener errors",
Box::new(listener_error.clone()),
);
let dial_attempt = Counter::default();
sub_registry.register(
"dial_attempt",
"Number of dial attempts",
Box::new(dial_attempt.clone()),
);
let dial_unreachable_addr = Family::default();
sub_registry.register(
"dial_unreachable_addr",
"Number of unreachable addresses dialed",
Box::new(dial_unreachable_addr.clone()),
);
let connected_to_banned_peer = Counter::default();
sub_registry.register(
"connected_to_banned_peer",
"Number of connection attempts to banned peer",
Box::new(connected_to_banned_peer.clone()),
);
let connections_established = Family::default();
sub_registry.register(
"connections_established",
"Number of connections established",
Box::new(connections_established.clone()),
);
let connections_closed = Counter::default();
sub_registry.register(
"connections_closed",
"Number of connections closed",
Box::new(connections_closed.clone()),
);
Self {
connections_incoming,
connections_incoming_error,
connections_established,
connections_closed,
new_listen_addr,
expired_listen_addr,
listener_closed,
listener_error,
dial_attempt,
dial_unreachable_addr,
connected_to_banned_peer,
}
}
}
impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>>
for super::Metrics
{
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
match event {
libp2p_swarm::SwarmEvent::Behaviour(_) => {}
libp2p_swarm::SwarmEvent::ConnectionEstablished { endpoint, .. } => {
self.swarm
.connections_established
.get_or_create(&ConnectionEstablishedLabeles {
role: endpoint.into(),
})
.inc();
}
libp2p_swarm::SwarmEvent::ConnectionClosed { .. } => {
self.swarm.connections_closed.inc();
}
libp2p_swarm::SwarmEvent::IncomingConnection { .. } => {
self.swarm.connections_incoming.inc();
}
libp2p_swarm::SwarmEvent::IncomingConnectionError { error, .. } => {
self.swarm
.connections_incoming_error
.get_or_create(&IncomingConnectionErrorLabels {
error: error.into(),
})
.inc();
}
libp2p_swarm::SwarmEvent::BannedPeer { .. } => {
self.swarm.connected_to_banned_peer.inc();
}
libp2p_swarm::SwarmEvent::UnreachableAddr { .. } => {
self.swarm
.dial_unreachable_addr
.get_or_create(&vec![("peer".into(), "known".into())])
.inc();
}
libp2p_swarm::SwarmEvent::UnknownPeerUnreachableAddr { .. } => {
self.swarm
.dial_unreachable_addr
.get_or_create(&vec![("peer".into(), "unknown".into())])
.inc();
}
libp2p_swarm::SwarmEvent::NewListenAddr { .. } => {
self.swarm.new_listen_addr.inc();
}
libp2p_swarm::SwarmEvent::ExpiredListenAddr { .. } => {
self.swarm.expired_listen_addr.inc();
}
libp2p_swarm::SwarmEvent::ListenerClosed { .. } => {
self.swarm.listener_closed.inc();
}
libp2p_swarm::SwarmEvent::ListenerError { .. } => {
self.swarm.listener_error.inc();
}
libp2p_swarm::SwarmEvent::Dialing(_) => {
self.swarm.dial_attempt.inc();
}
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct ConnectionEstablishedLabeles {
role: Role,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum Role {
Dialer,
Listener,
}
impl From<&libp2p_core::ConnectedPoint> for Role {
fn from(point: &libp2p_core::ConnectedPoint) -> Self {
match point {
libp2p_core::ConnectedPoint::Dialer { .. } => Role::Dialer,
libp2p_core::ConnectedPoint::Listener { .. } => Role::Listener,
}
}
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct IncomingConnectionErrorLabels {
error: PendingConnectionError,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
enum PendingConnectionError {
InvalidPeerId,
TransportErrorMultiaddrNotSupported,
TransportErrorOther,
ConnectionLimit,
Io,
}
impl<TTransErr> From<&libp2p_core::connection::PendingConnectionError<TTransErr>>
for PendingConnectionError
{
fn from(point: &libp2p_core::connection::PendingConnectionError<TTransErr>) -> Self {
match point {
libp2p_core::connection::PendingConnectionError::InvalidPeerId => {
PendingConnectionError::InvalidPeerId
}
libp2p_core::connection::PendingConnectionError::Transport(
libp2p_core::transport::TransportError::MultiaddrNotSupported(_),
) => PendingConnectionError::TransportErrorMultiaddrNotSupported,
libp2p_core::connection::PendingConnectionError::Transport(
libp2p_core::transport::TransportError::Other(_),
) => PendingConnectionError::TransportErrorOther,
libp2p_core::connection::PendingConnectionError::ConnectionLimit(_) => {
PendingConnectionError::ConnectionLimit
}
libp2p_core::connection::PendingConnectionError::IO(_) => PendingConnectionError::Io,
}
}
}

View File

@ -76,6 +76,10 @@ pub use libp2p_kad as kad;
#[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]
#[doc(inline)]
pub use libp2p_mdns as mdns;
#[cfg(feature = "metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
#[doc(inline)]
pub use libp2p_metrics as metrics;
#[cfg(feature = "mplex")]
#[cfg_attr(docsrs, doc(cfg(feature = "mplex")))]
#[doc(inline)]