mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-18 20:41:25 +00:00
feat(metrics)!: expose identify metrics for connected peers only
Previously we would increase a counter / gauge / histogram on each received identify information. These metrics are missleading, as e.g. they depend on the identify interval and don't represent the set of currently connected peers. With this commit, identify information is tracked for the currently connected peers only. Instead of an increase on each received identify information, metrics represent the status quo (Gauge). Example: ``` \# HELP libp2p_libp2p_identify_remote_protocols Number of connected nodes supporting a specific protocol, with "unrecognized" for each peer supporting one or more unrecognized protocols... \# TYPE libp2p_libp2p_identify_remote_protocols gauge libp2p_libp2p_identify_remote_protocols_total{protocol="/ipfs/id/push/1.0.0"} 1 libp2p_libp2p_identify_remote_protocols_total{protocol="/ipfs/id/1.0.0"} 1 libp2p_libp2p_identify_remote_protocols_total{protocol="/ipfs/ping/1.0.0"} 1 libp2p_libp2p_identify_remote_protocols_total{protocol="unrecognized"} 1 \# HELP libp2p_libp2p_identify_remote_listen_addresses Number of connected nodes advertising a specific listen address... \# TYPE libp2p_libp2p_identify_remote_listen_addresses gauge libp2p_libp2p_identify_remote_listen_addresses_total{listen_address="/ip4/tcp"} 1 libp2p_libp2p_identify_remote_listen_addresses_total{listen_address="/ip4/udp/quic"} 1 \# HELP libp2p_libp2p_identify_local_observed_addresses Number of connected nodes observing the local node at a specific address... \# TYPE libp2p_libp2p_identify_local_observed_addresses gauge libp2p_libp2p_identify_local_observed_addresses_total{observed_address="/ip4/tcp"} 1 ``` Pull-Request: #3325.
This commit is contained in:
5
Cargo.lock
generated
5
Cargo.lock
generated
@ -2732,6 +2732,7 @@ dependencies = [
|
||||
"libp2p-ping",
|
||||
"libp2p-relay",
|
||||
"libp2p-swarm",
|
||||
"once_cell",
|
||||
"prometheus-client",
|
||||
]
|
||||
|
||||
@ -4019,9 +4020,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "prometheus-client"
|
||||
version = "0.20.0"
|
||||
version = "0.21.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e227aeb6c2cfec819e999c4773b35f8c7fa37298a203ff46420095458eee567e"
|
||||
checksum = "38974b1966bd5b6c7c823a20c1e07d5b84b171db20bac601e9b529720f7299f8"
|
||||
dependencies = [
|
||||
"dtoa",
|
||||
"itoa",
|
||||
|
@ -12,4 +12,4 @@ hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
|
||||
libp2p = { path = "../../libp2p", features = ["async-std", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] }
|
||||
log = "0.4.0"
|
||||
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||
prometheus-client = "0.20.0"
|
||||
prometheus-client = "0.21.0"
|
||||
|
@ -33,7 +33,7 @@ const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;v
|
||||
|
||||
pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> {
|
||||
// Serve on localhost.
|
||||
let addr = ([127, 0, 0, 1], 0).into();
|
||||
let addr = ([127, 0, 0, 1], 8080).into();
|
||||
|
||||
// Use the tokio runtime to run the hyper server.
|
||||
let rt = tokio::runtime::Runtime::new()?;
|
||||
|
@ -1,9 +1,29 @@
|
||||
## 0.13.0 - unreleased
|
||||
|
||||
- Previously `libp2p-metrics::identify` would increase a counter / gauge / histogram on each
|
||||
received identify information. These metrics are misleading, as e.g. they depend on the identify
|
||||
interval and don't represent the set of currently connected peers. With this change, identify
|
||||
information is tracked for the currently connected peers only. Instead of an increase on each
|
||||
received identify information, metrics represent the status quo (Gauge).
|
||||
|
||||
Metrics removed:
|
||||
- `libp2p_identify_protocols`
|
||||
- `libp2p_identify_received_info_listen_addrs`
|
||||
- `libp2p_identify_received_info_protocols`
|
||||
- `libp2p_identify_listen_addresses`
|
||||
|
||||
Metrics added:
|
||||
- `libp2p_identify_remote_protocols`
|
||||
- `libp2p_identify_remote_listen_addresses`
|
||||
- `libp2p_identify_local_observed_addresses`
|
||||
|
||||
See [PR 3325].
|
||||
|
||||
- Raise MSRV to 1.65.
|
||||
See [PR 3715].
|
||||
|
||||
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
|
||||
[PR 3325]: https://github.com/libp2p/rust-libp2p/pull/3325
|
||||
|
||||
## 0.12.0
|
||||
|
||||
|
@ -27,7 +27,8 @@ libp2p-ping = { workspace = true, optional = true }
|
||||
libp2p-relay = { workspace = true, optional = true }
|
||||
libp2p-swarm = { workspace = true }
|
||||
libp2p-identity = { workspace = true }
|
||||
prometheus-client = "0.20.0"
|
||||
prometheus-client = { version = "0.21.0" }
|
||||
once_cell = "1.16.0"
|
||||
|
||||
[target.'cfg(not(target_os = "unknown"))'.dependencies]
|
||||
libp2p-gossipsub = { workspace = true, optional = true }
|
||||
|
@ -21,39 +21,78 @@
|
||||
use crate::protocol_stack;
|
||||
use libp2p_identity::PeerId;
|
||||
use libp2p_swarm::StreamProtocol;
|
||||
use prometheus_client::encoding::{EncodeLabelSet, EncodeMetric, MetricEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus_client::collector::Collector;
|
||||
use prometheus_client::encoding::EncodeLabelSet;
|
||||
use prometheus_client::metrics::counter::Counter;
|
||||
use prometheus_client::metrics::family::Family;
|
||||
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
|
||||
use prometheus_client::metrics::MetricType;
|
||||
use prometheus_client::registry::Registry;
|
||||
use prometheus_client::metrics::family::ConstFamily;
|
||||
use prometheus_client::metrics::gauge::ConstGauge;
|
||||
use prometheus_client::registry::{Descriptor, LocalMetric, Registry};
|
||||
use prometheus_client::MaybeOwned;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::iter;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
static PROTOCOLS_DESCRIPTOR: Lazy<Descriptor> = Lazy::new(|| {
|
||||
Descriptor::new(
|
||||
"remote_protocols",
|
||||
r#"Number of connected nodes supporting a specific protocol, with "unrecognized" for each
|
||||
peer supporting one or more unrecognized protocols"#,
|
||||
None,
|
||||
None,
|
||||
vec![],
|
||||
)
|
||||
});
|
||||
static LISTEN_ADDRESSES_DESCRIPTOR: Lazy<Descriptor> = Lazy::new(|| {
|
||||
Descriptor::new(
|
||||
"remote_listen_addresses",
|
||||
"Number of connected nodes advertising a specific listen address",
|
||||
None,
|
||||
None,
|
||||
vec![],
|
||||
)
|
||||
});
|
||||
static OBSERVED_ADDRESSES_DESCRIPTOR: Lazy<Descriptor> = Lazy::new(|| {
|
||||
Descriptor::new(
|
||||
"local_observed_addresses",
|
||||
"Number of connected nodes observing the local node at a specific address",
|
||||
None,
|
||||
None,
|
||||
vec![],
|
||||
)
|
||||
});
|
||||
const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[
|
||||
#[cfg(feature = "dcutr")]
|
||||
libp2p_dcutr::PROTOCOL_NAME,
|
||||
// #[cfg(feature = "gossipsub")]
|
||||
// #[cfg(not(target_os = "unknown"))]
|
||||
// TODO: Add Gossipsub protocol name
|
||||
libp2p_identify::PROTOCOL_NAME,
|
||||
libp2p_identify::PUSH_PROTOCOL_NAME,
|
||||
#[cfg(feature = "kad")]
|
||||
libp2p_kad::PROTOCOL_NAME,
|
||||
#[cfg(feature = "ping")]
|
||||
libp2p_ping::PROTOCOL_NAME,
|
||||
#[cfg(feature = "relay")]
|
||||
libp2p_relay::STOP_PROTOCOL_NAME,
|
||||
#[cfg(feature = "relay")]
|
||||
libp2p_relay::HOP_PROTOCOL_NAME,
|
||||
];
|
||||
|
||||
pub(crate) struct Metrics {
|
||||
protocols: Protocols,
|
||||
peers: Peers,
|
||||
error: Counter,
|
||||
pushed: Counter,
|
||||
received: Counter,
|
||||
received_info_listen_addrs: Histogram,
|
||||
received_info_protocols: Histogram,
|
||||
sent: Counter,
|
||||
listen_addresses: Family<AddressLabels, Counter>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub(crate) fn new(registry: &mut Registry) -> Self {
|
||||
let sub_registry = registry.sub_registry_with_prefix("identify");
|
||||
|
||||
let protocols = Protocols::default();
|
||||
sub_registry.register(
|
||||
"protocols",
|
||||
"Number of connected nodes supporting a specific protocol, with \
|
||||
\"unrecognized\" for each peer supporting one or more unrecognized \
|
||||
protocols",
|
||||
protocols.clone(),
|
||||
);
|
||||
let peers = Peers::default();
|
||||
sub_registry.register_collector(Box::new(peers.clone()));
|
||||
|
||||
let error = Counter::default();
|
||||
sub_registry.register(
|
||||
@ -78,24 +117,6 @@ impl Metrics {
|
||||
received.clone(),
|
||||
);
|
||||
|
||||
let received_info_listen_addrs =
|
||||
Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9)));
|
||||
sub_registry.register(
|
||||
"received_info_listen_addrs",
|
||||
"Number of listen addresses for remote peer received in \
|
||||
identification information",
|
||||
received_info_listen_addrs.clone(),
|
||||
);
|
||||
|
||||
let received_info_protocols =
|
||||
Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9)));
|
||||
sub_registry.register(
|
||||
"received_info_protocols",
|
||||
"Number of protocols supported by the remote peer received in \
|
||||
identification information",
|
||||
received_info_protocols.clone(),
|
||||
);
|
||||
|
||||
let sent = Counter::default();
|
||||
sub_registry.register(
|
||||
"sent",
|
||||
@ -104,22 +125,12 @@ impl Metrics {
|
||||
sent.clone(),
|
||||
);
|
||||
|
||||
let listen_addresses = Family::default();
|
||||
sub_registry.register(
|
||||
"listen_addresses",
|
||||
"Number of listen addresses for remote peer per protocol stack",
|
||||
listen_addresses.clone(),
|
||||
);
|
||||
|
||||
Self {
|
||||
protocols,
|
||||
peers,
|
||||
error,
|
||||
pushed,
|
||||
received,
|
||||
received_info_listen_addrs,
|
||||
received_info_protocols,
|
||||
sent,
|
||||
listen_addresses,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -134,58 +145,8 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
|
||||
self.pushed.inc();
|
||||
}
|
||||
libp2p_identify::Event::Received { peer_id, info, .. } => {
|
||||
{
|
||||
let mut protocols = info
|
||||
.protocols
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
let allowed_protocols: &[StreamProtocol] = &[
|
||||
#[cfg(feature = "dcutr")]
|
||||
libp2p_dcutr::PROTOCOL_NAME,
|
||||
// #[cfg(feature = "gossipsub")]
|
||||
// #[cfg(not(target_os = "unknown"))]
|
||||
// TODO: Add Gossipsub protocol name
|
||||
libp2p_identify::PROTOCOL_NAME,
|
||||
libp2p_identify::PUSH_PROTOCOL_NAME,
|
||||
#[cfg(feature = "kad")]
|
||||
libp2p_kad::PROTOCOL_NAME,
|
||||
#[cfg(feature = "ping")]
|
||||
libp2p_ping::PROTOCOL_NAME,
|
||||
#[cfg(feature = "relay")]
|
||||
libp2p_relay::STOP_PROTOCOL_NAME,
|
||||
#[cfg(feature = "relay")]
|
||||
libp2p_relay::HOP_PROTOCOL_NAME,
|
||||
];
|
||||
|
||||
allowed_protocols.contains(p)
|
||||
})
|
||||
.map(|p| p.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Signal via an additional label value that one or more
|
||||
// protocols of the remote peer have not been recognized.
|
||||
if protocols.len() < info.protocols.len() {
|
||||
protocols.push("unrecognized".to_string());
|
||||
}
|
||||
|
||||
protocols.sort_unstable();
|
||||
protocols.dedup();
|
||||
|
||||
self.protocols.add(*peer_id, protocols);
|
||||
}
|
||||
|
||||
self.received.inc();
|
||||
self.received_info_protocols
|
||||
.observe(info.protocols.len() as f64);
|
||||
self.received_info_listen_addrs
|
||||
.observe(info.listen_addrs.len() as f64);
|
||||
for listen_addr in &info.listen_addrs {
|
||||
self.listen_addresses
|
||||
.get_or_create(&AddressLabels {
|
||||
protocols: protocol_stack::as_string(listen_addr),
|
||||
})
|
||||
.inc();
|
||||
}
|
||||
self.peers.record(*peer_id, info.clone());
|
||||
}
|
||||
libp2p_identify::Event::Sent { .. } => {
|
||||
self.sent.inc();
|
||||
@ -203,7 +164,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
|
||||
} = event
|
||||
{
|
||||
if *num_established == 0 {
|
||||
self.protocols.remove(*peer_id)
|
||||
self.peers.remove(*peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -214,55 +175,107 @@ struct AddressLabels {
|
||||
protocols: String,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
struct Protocols {
|
||||
peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>,
|
||||
}
|
||||
#[derive(Default, Debug, Clone)]
|
||||
struct Peers(Arc<Mutex<HashMap<PeerId, libp2p_identify::Info>>>);
|
||||
|
||||
impl Protocols {
|
||||
fn add(&self, peer: PeerId, protocols: Vec<String>) {
|
||||
self.peers
|
||||
.lock()
|
||||
.expect("Lock not to be poisoned")
|
||||
.insert(peer, protocols);
|
||||
impl Peers {
|
||||
fn record(&self, peer_id: PeerId, info: libp2p_identify::Info) {
|
||||
self.0.lock().unwrap().insert(peer_id, info);
|
||||
}
|
||||
|
||||
fn remove(&self, peer: PeerId) {
|
||||
self.peers
|
||||
.lock()
|
||||
.expect("Lock not to be poisoned")
|
||||
.remove(&peer);
|
||||
fn remove(&self, peer_id: PeerId) {
|
||||
self.0.lock().unwrap().remove(&peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeMetric for Protocols {
|
||||
fn encode(&self, mut encoder: MetricEncoder) -> Result<(), std::fmt::Error> {
|
||||
let count_by_protocol = self
|
||||
.peers
|
||||
.lock()
|
||||
.expect("Lock not to be poisoned")
|
||||
.iter()
|
||||
.fold(
|
||||
HashMap::<String, i64>::default(),
|
||||
|mut acc, (_, protocols)| {
|
||||
for protocol in protocols {
|
||||
let count = acc.entry(protocol.to_string()).or_default();
|
||||
*count += 1;
|
||||
}
|
||||
acc
|
||||
},
|
||||
);
|
||||
impl Collector for Peers {
|
||||
fn collect<'a>(
|
||||
&'a self,
|
||||
) -> Box<dyn Iterator<Item = (Cow<'a, Descriptor>, MaybeOwned<'a, Box<dyn LocalMetric>>)> + 'a>
|
||||
{
|
||||
let mut count_by_protocols: HashMap<String, i64> = Default::default();
|
||||
let mut count_by_listen_addresses: HashMap<String, i64> = Default::default();
|
||||
let mut count_by_observed_addresses: HashMap<String, i64> = Default::default();
|
||||
|
||||
for (protocol, count) in count_by_protocol {
|
||||
encoder
|
||||
.encode_family(&[("protocol", protocol)])?
|
||||
.encode_gauge(&count)?;
|
||||
for (_, peer_info) in self.0.lock().unwrap().iter() {
|
||||
{
|
||||
let mut protocols: Vec<_> = peer_info
|
||||
.protocols
|
||||
.iter()
|
||||
.map(|p| {
|
||||
if ALLOWED_PROTOCOLS.contains(&p) {
|
||||
p.to_string()
|
||||
} else {
|
||||
"unrecognized".to_string()
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
protocols.sort();
|
||||
protocols.dedup();
|
||||
|
||||
for protocol in protocols.into_iter() {
|
||||
let count = count_by_protocols.entry(protocol).or_default();
|
||||
*count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let mut addrs: Vec<_> = peer_info
|
||||
.listen_addrs
|
||||
.iter()
|
||||
.map(protocol_stack::as_string)
|
||||
.collect();
|
||||
addrs.sort();
|
||||
addrs.dedup();
|
||||
|
||||
for addr in addrs {
|
||||
let count = count_by_listen_addresses.entry(addr).or_default();
|
||||
*count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let count = count_by_observed_addresses
|
||||
.entry(protocol_stack::as_string(&peer_info.observed_addr))
|
||||
.or_default();
|
||||
*count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
let count_by_protocols: Box<dyn LocalMetric> =
|
||||
Box::new(ConstFamily::new(count_by_protocols.into_iter().map(
|
||||
|(protocol, count)| ([("protocol", protocol)], ConstGauge::new(count)),
|
||||
)));
|
||||
|
||||
fn metric_type(&self) -> MetricType {
|
||||
MetricType::Gauge
|
||||
let count_by_listen_addresses: Box<dyn LocalMetric> =
|
||||
Box::new(ConstFamily::new(count_by_listen_addresses.into_iter().map(
|
||||
|(protocol, count)| ([("listen_address", protocol)], ConstGauge::new(count)),
|
||||
)));
|
||||
|
||||
let count_by_observed_addresses: Box<dyn LocalMetric> = Box::new(ConstFamily::new(
|
||||
count_by_observed_addresses
|
||||
.into_iter()
|
||||
.map(|(protocol, count)| {
|
||||
([("observed_address", protocol)], ConstGauge::new(count))
|
||||
}),
|
||||
));
|
||||
|
||||
Box::new(
|
||||
[
|
||||
(
|
||||
Cow::Borrowed(&*PROTOCOLS_DESCRIPTOR),
|
||||
MaybeOwned::Owned(count_by_protocols),
|
||||
),
|
||||
(
|
||||
Cow::Borrowed(&*LISTEN_ADDRESSES_DESCRIPTOR),
|
||||
MaybeOwned::Owned(count_by_listen_addresses),
|
||||
),
|
||||
(
|
||||
Cow::Borrowed(&*OBSERVED_ADDRESSES_DESCRIPTOR),
|
||||
MaybeOwned::Owned(count_by_observed_addresses),
|
||||
),
|
||||
]
|
||||
.into_iter(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ wasm-timer = "0.2.5"
|
||||
instant = "0.1.11"
|
||||
void = "1.0.2"
|
||||
# Metrics dependencies
|
||||
prometheus-client = "0.20.0"
|
||||
prometheus-client = "0.21.0"
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = { version = "1.6.3", features = ["unstable"] }
|
||||
|
Reference in New Issue
Block a user