misc/metrics: Add protocols label to address-specific metrics (#2982)

Previously, we would only track the metrics like the number of open connections. With this patch, we extend these metrics with a `protocols` label that contains a "protocol stack". A protocol stack is a multi-address with all variable parts removed. For example, `/ip4/127.0.0.1/tcp/1234` turns into `/ip4/tcp`.

Resolves https://github.com/libp2p/rust-libp2p/issues/2758.
This commit is contained in:
John Turpish
2022-11-15 15:45:14 -05:00
committed by GitHub
parent d5ea93dd71
commit 69efe63229
6 changed files with 138 additions and 26 deletions

View File

@ -16,6 +16,10 @@
- Update to `libp2p-gossipsub` `v0.43.0`. - Update to `libp2p-gossipsub` `v0.43.0`.
- Add `protocol_stack` metrics. See [PR 2982].
[PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/
# 0.10.0 # 0.10.0
- Update to `libp2p-swarm` `v0.40.0`. - Update to `libp2p-swarm` `v0.40.0`.

View File

@ -54,7 +54,7 @@ use futures::stream::StreamExt;
use libp2p::core::Multiaddr; use libp2p::core::Multiaddr;
use libp2p::metrics::{Metrics, Recorder}; use libp2p::metrics::{Metrics, Recorder};
use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{identity, ping, PeerId, Swarm}; use libp2p::{identify, identity, ping, PeerId, Swarm};
use libp2p_swarm::keep_alive; use libp2p_swarm::keep_alive;
use log::info; use log::info;
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
@ -68,11 +68,12 @@ fn main() -> Result<(), Box<dyn Error>> {
let local_key = identity::Keypair::generate_ed25519(); let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public()); let local_peer_id = PeerId::from(local_key.public());
let local_pub_key = local_key.public();
info!("Local peer id: {:?}", local_peer_id); info!("Local peer id: {:?}", local_peer_id);
let mut swarm = Swarm::without_executor( let mut swarm = Swarm::without_executor(
block_on(libp2p::development_transport(local_key))?, block_on(libp2p::development_transport(local_key))?,
Behaviour::default(), Behaviour::new(local_pub_key),
local_peer_id, local_peer_id,
); );
@ -95,6 +96,10 @@ fn main() -> Result<(), Box<dyn Error>> {
info!("{:?}", ping_event); info!("{:?}", ping_event);
metrics.record(&ping_event); metrics.record(&ping_event);
} }
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify_event)) => {
info!("{:?}", identify_event);
metrics.record(&identify_event);
}
swarm_event => { swarm_event => {
info!("{:?}", swarm_event); info!("{:?}", swarm_event);
metrics.record(&swarm_event); metrics.record(&swarm_event);
@ -109,8 +114,22 @@ fn main() -> Result<(), Box<dyn Error>> {
/// ///
/// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen /// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen
/// and can be observed via the metrics. /// and can be observed via the metrics.
#[derive(NetworkBehaviour, Default)] #[derive(NetworkBehaviour)]
struct Behaviour { struct Behaviour {
identify: identify::Behaviour,
keep_alive: keep_alive::Behaviour, keep_alive: keep_alive::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,
} }
impl Behaviour {
fn new(local_pub_key: libp2p::identity::PublicKey) -> Self {
Self {
ping: ping::Behaviour::default(),
identify: identify::Behaviour::new(identify::Config::new(
"/ipfs/0.1.0".into(),
local_pub_key,
)),
keep_alive: keep_alive::Behaviour::default(),
}
}
}

View File

@ -18,9 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::protocol_stack;
use libp2p_core::PeerId; use libp2p_core::PeerId;
use prometheus_client::encoding::text::{EncodeMetric, Encoder}; use prometheus_client::encoding::text::{Encode, EncodeMetric, Encoder};
use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::metrics::MetricType; use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
@ -36,6 +38,7 @@ pub struct Metrics {
received_info_listen_addrs: Histogram, received_info_listen_addrs: Histogram,
received_info_protocols: Histogram, received_info_protocols: Histogram,
sent: Counter, sent: Counter,
listen_addresses: Family<AddressLabels, Counter>,
} }
impl Metrics { impl Metrics {
@ -100,6 +103,13 @@ impl Metrics {
Box::new(sent.clone()), Box::new(sent.clone()),
); );
let listen_addresses = Family::default();
sub_registry.register(
"listen_addresses",
"Number of listen addresses for remote peer per protocol stack",
Box::new(listen_addresses.clone()),
);
Self { Self {
protocols, protocols,
error, error,
@ -108,6 +118,7 @@ impl Metrics {
received_info_listen_addrs, received_info_listen_addrs,
received_info_protocols, received_info_protocols,
sent, sent,
listen_addresses,
} }
} }
} }
@ -167,6 +178,13 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
.observe(info.protocols.len() as f64); .observe(info.protocols.len() as f64);
self.received_info_listen_addrs self.received_info_listen_addrs
.observe(info.listen_addrs.len() as f64); .observe(info.listen_addrs.len() as f64);
for listen_addr in &info.listen_addrs {
self.listen_addresses
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(listen_addr),
})
.inc();
}
} }
libp2p_identify::Event::Sent { .. } => { libp2p_identify::Event::Sent { .. } => {
self.sent.inc(); self.sent.inc();
@ -190,6 +208,11 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
} }
} }
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct AddressLabels {
protocols: String,
}
#[derive(Default, Clone)] #[derive(Default, Clone)]
struct Protocols { struct Protocols {
peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>, peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>,

View File

@ -38,6 +38,7 @@ mod identify;
mod kad; mod kad;
#[cfg(feature = "ping")] #[cfg(feature = "ping")]
mod ping; mod ping;
mod protocol_stack;
#[cfg(feature = "relay")] #[cfg(feature = "relay")]
mod relay; mod relay;
mod swarm; mod swarm;

View File

@ -0,0 +1,27 @@
use libp2p_core::multiaddr::Multiaddr;
pub fn as_string(ma: &Multiaddr) -> String {
let len = ma
.protocol_stack()
.fold(0, |acc, proto| acc + proto.len() + 1);
let mut protocols = String::with_capacity(len);
for proto_tag in ma.protocol_stack() {
protocols.push('/');
protocols.push_str(proto_tag);
}
protocols
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ip6_tcp_wss_p2p() {
let ma = Multiaddr::try_from("/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095/tcp/8000/wss/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC").expect("testbad");
let protocol_stack = as_string(&ma);
assert_eq!(protocol_stack, "/ip6/tcp/wss/p2p");
}
}

View File

@ -18,37 +18,38 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::protocol_stack;
use prometheus_client::encoding::text::Encode; use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family; use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
pub struct Metrics { pub struct Metrics {
connections_incoming: Counter, connections_incoming: Family<AddressLabels, Counter>,
connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>, connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
connections_established: Family<ConnectionEstablishedLabels, Counter>, connections_established: Family<ConnectionEstablishedLabels, Counter>,
connections_closed: Family<ConnectionClosedLabels, Counter>, connections_closed: Family<ConnectionClosedLabels, Counter>,
new_listen_addr: Counter, new_listen_addr: Family<AddressLabels, Counter>,
expired_listen_addr: Counter, expired_listen_addr: Family<AddressLabels, Counter>,
listener_closed: Counter, listener_closed: Family<AddressLabels, Counter>,
listener_error: Counter, listener_error: Counter,
dial_attempt: Counter, dial_attempt: Counter,
outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>, outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,
connected_to_banned_peer: Counter, connected_to_banned_peer: Family<AddressLabels, Counter>,
} }
impl Metrics { impl Metrics {
pub fn new(registry: &mut Registry) -> Self { pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("swarm"); let sub_registry = registry.sub_registry_with_prefix("swarm");
let connections_incoming = Counter::default(); let connections_incoming = Family::default();
sub_registry.register( sub_registry.register(
"connections_incoming", "connections_incoming",
"Number of incoming connections", "Number of incoming connections per address stack",
Box::new(connections_incoming.clone()), Box::new(connections_incoming.clone()),
); );
@ -59,21 +60,21 @@ impl Metrics {
Box::new(connections_incoming_error.clone()), Box::new(connections_incoming_error.clone()),
); );
let new_listen_addr = Counter::default(); let new_listen_addr = Family::default();
sub_registry.register( sub_registry.register(
"new_listen_addr", "new_listen_addr",
"Number of new listen addresses", "Number of new listen addresses",
Box::new(new_listen_addr.clone()), Box::new(new_listen_addr.clone()),
); );
let expired_listen_addr = Counter::default(); let expired_listen_addr = Family::default();
sub_registry.register( sub_registry.register(
"expired_listen_addr", "expired_listen_addr",
"Number of expired listen addresses", "Number of expired listen addresses",
Box::new(expired_listen_addr.clone()), Box::new(expired_listen_addr.clone()),
); );
let listener_closed = Counter::default(); let listener_closed = Family::default();
sub_registry.register( sub_registry.register(
"listener_closed", "listener_closed",
"Number of listeners closed", "Number of listeners closed",
@ -101,7 +102,7 @@ impl Metrics {
Box::new(outgoing_connection_error.clone()), Box::new(outgoing_connection_error.clone()),
); );
let connected_to_banned_peer = Counter::default(); let connected_to_banned_peer = Family::default();
sub_registry.register( sub_registry.register(
"connected_to_banned_peer", "connected_to_banned_peer",
"Number of connection attempts to banned peer", "Number of connection attempts to banned peer",
@ -146,6 +147,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
self.connections_established self.connections_established
.get_or_create(&ConnectionEstablishedLabels { .get_or_create(&ConnectionEstablishedLabels {
role: endpoint.into(), role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
}) })
.inc(); .inc();
} }
@ -153,16 +155,26 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
self.connections_closed self.connections_closed
.get_or_create(&ConnectionClosedLabels { .get_or_create(&ConnectionClosedLabels {
role: endpoint.into(), role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
}) })
.inc(); .inc();
} }
libp2p_swarm::SwarmEvent::IncomingConnection { .. } => { libp2p_swarm::SwarmEvent::IncomingConnection { send_back_addr, .. } => {
self.connections_incoming.inc(); self.connections_incoming
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(send_back_addr),
})
.inc();
} }
libp2p_swarm::SwarmEvent::IncomingConnectionError { error, .. } => { libp2p_swarm::SwarmEvent::IncomingConnectionError {
error,
send_back_addr,
..
} => {
self.connections_incoming_error self.connections_incoming_error
.get_or_create(&IncomingConnectionErrorLabels { .get_or_create(&IncomingConnectionErrorLabels {
error: error.into(), error: error.into(),
protocols: protocol_stack::as_string(send_back_addr),
}) })
.inc(); .inc();
} }
@ -221,17 +233,35 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
} }
}; };
} }
libp2p_swarm::SwarmEvent::BannedPeer { .. } => { libp2p_swarm::SwarmEvent::BannedPeer { endpoint, .. } => {
self.connected_to_banned_peer.inc(); self.connected_to_banned_peer
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
})
.inc();
} }
libp2p_swarm::SwarmEvent::NewListenAddr { .. } => { libp2p_swarm::SwarmEvent::NewListenAddr { address, .. } => {
self.new_listen_addr.inc(); self.new_listen_addr
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
} }
libp2p_swarm::SwarmEvent::ExpiredListenAddr { .. } => { libp2p_swarm::SwarmEvent::ExpiredListenAddr { address, .. } => {
self.expired_listen_addr.inc(); self.expired_listen_addr
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
} }
libp2p_swarm::SwarmEvent::ListenerClosed { .. } => { libp2p_swarm::SwarmEvent::ListenerClosed { addresses, .. } => {
self.listener_closed.inc(); for address in addresses {
self.listener_closed
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
}
} }
libp2p_swarm::SwarmEvent::ListenerError { .. } => { libp2p_swarm::SwarmEvent::ListenerError { .. } => {
self.listener_error.inc(); self.listener_error.inc();
@ -246,11 +276,18 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
#[derive(Encode, Hash, Clone, Eq, PartialEq)] #[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct ConnectionEstablishedLabels { struct ConnectionEstablishedLabels {
role: Role, role: Role,
protocols: String,
} }
#[derive(Encode, Hash, Clone, Eq, PartialEq)] #[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct ConnectionClosedLabels { struct ConnectionClosedLabels {
role: Role, role: Role,
protocols: String,
}
#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct AddressLabels {
protocols: String,
} }
#[derive(Encode, Hash, Clone, Eq, PartialEq)] #[derive(Encode, Hash, Clone, Eq, PartialEq)]
@ -298,6 +335,7 @@ enum OutgoingConnectionErrorError {
#[derive(Encode, Hash, Clone, Eq, PartialEq)] #[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct IncomingConnectionErrorLabels { struct IncomingConnectionErrorLabels {
error: PendingInboundConnectionError, error: PendingInboundConnectionError,
protocols: String,
} }
#[derive(Encode, Hash, Clone, Eq, PartialEq)] #[derive(Encode, Hash, Clone, Eq, PartialEq)]