misc/metrics: Track # connected nodes supporting specific protocol (#2734)

* misc/metrics: Explicitly delegate event recording to each recorder

This allows delegating a single event to multiple `Recorder`s. That enables e.g. the
`identify::Metrics` `Recorder` to act both on `IdentifyEvent` and `SwarmEvent`. The latter enables
it to garbage collect per peer data on disconnects.

* protocols/dcutr: Expose PROTOCOL_NAME

* protocols/identify: Expose PROTOCOL_NAME and PUSH_PROTOCOL_NAME

* protocols/ping: Expose PROTOCOL_NAME

* protocols/relay: Expose HOP_PROTOCOL_NAME and STOP_PROTOCOL_NAME

* misc/metrics: Track # connected nodes supporting specific protocol

An example metric exposed with this patch:

```
libp2p_identify_protocols{protocol="/ipfs/ping/1.0.0"} 10
```

This implies that 10 of the currently connected nodes support the ping protocol.
This commit is contained in:
Max Inden
2022-07-15 09:16:03 +02:00
committed by GitHub
parent 7c8a97739f
commit d4f8ec2d48
21 changed files with 259 additions and 78 deletions

View File

@ -18,12 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use libp2p_core::PeerId;
use prometheus_client::encoding::text::{EncodeMetric, Encoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry;
use std::collections::HashMap;
use std::iter;
use std::sync::{Arc, Mutex};
pub struct Metrics {
protocols: Protocols,
error: Counter,
pushed: Counter,
received: Counter,
@ -36,6 +42,15 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("identify");
let protocols = Protocols::default();
sub_registry.register(
"protocols",
"Number of connected nodes supporting a specific protocol, with \
\"unrecognized\" for each peer supporting one or more unrecognized \
protocols",
Box::new(protocols.clone()),
);
let error = Counter::default();
sub_registry.register(
"errors",
@ -86,6 +101,7 @@ impl Metrics {
);
Self {
protocols,
error,
pushed,
received,
@ -96,27 +112,136 @@ impl Metrics {
}
}
impl super::Recorder<libp2p_identify::IdentifyEvent> for super::Metrics {
impl super::Recorder<libp2p_identify::IdentifyEvent> for Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
match event {
libp2p_identify::IdentifyEvent::Error { .. } => {
self.identify.error.inc();
self.error.inc();
}
libp2p_identify::IdentifyEvent::Pushed { .. } => {
self.identify.pushed.inc();
self.pushed.inc();
}
libp2p_identify::IdentifyEvent::Received { info, .. } => {
self.identify.received.inc();
self.identify
.received_info_protocols
libp2p_identify::IdentifyEvent::Received { peer_id, info, .. } => {
{
let mut protocols: Vec<String> = info
.protocols
.iter()
.filter(|p| {
let allowed_protocols: &[&[u8]] = &[
#[cfg(feature = "dcutr")]
libp2p_dcutr::PROTOCOL_NAME,
// #[cfg(feature = "gossipsub")]
// #[cfg(not(target_os = "unknown"))]
// TODO: Add Gossipsub protocol name
libp2p_identify::PROTOCOL_NAME,
libp2p_identify::PUSH_PROTOCOL_NAME,
#[cfg(feature = "kad")]
libp2p_kad::protocol::DEFAULT_PROTO_NAME,
#[cfg(feature = "ping")]
libp2p_ping::PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::STOP_PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::HOP_PROTOCOL_NAME,
];
allowed_protocols.contains(&p.as_bytes())
})
.cloned()
.collect();
// Signal via an additional label value that one or more
// protocols of the remote peer have not been recognized.
if protocols.len() < info.protocols.len() {
protocols.push("unrecognized".to_string());
}
protocols.sort_unstable();
protocols.dedup();
self.protocols.add(*peer_id, protocols);
}
self.received.inc();
self.received_info_protocols
.observe(info.protocols.len() as f64);
self.identify
.received_info_listen_addrs
self.received_info_listen_addrs
.observe(info.listen_addrs.len() as f64);
}
libp2p_identify::IdentifyEvent::Sent { .. } => {
self.identify.sent.inc();
self.sent.inc();
}
}
}
}
impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
peer_id,
num_established,
..
} = event
{
if *num_established == 0 {
self.protocols.remove(*peer_id)
}
}
}
}
#[derive(Default, Clone)]
struct Protocols {
peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>,
}
impl Protocols {
fn add(&self, peer: PeerId, protocols: Vec<String>) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.insert(peer, protocols);
}
fn remove(&self, peer: PeerId) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.remove(&peer);
}
}
impl EncodeMetric for Protocols {
fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> {
let count_by_protocol = self
.peers
.lock()
.expect("Lock not to be poisoned")
.iter()
.fold(
HashMap::<String, u64>::default(),
|mut acc, (_, protocols)| {
for protocol in protocols {
let count = acc.entry(protocol.to_string()).or_default();
*count = *count + 1;
}
acc
},
);
for (protocol, count) in count_by_protocol {
encoder
.with_label_set(&("protocol", protocol))
.no_suffix()?
.no_bucket()?
.encode_value(count)?
.no_exemplar()?;
}
Ok(())
}
fn metric_type(&self) -> MetricType {
MetricType::Gauge
}
}