From 2066a192ad936a8dd531c9bb93b7fa75cda9e5a7 Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Tue, 16 Nov 2021 08:59:39 -0500 Subject: [PATCH] protocols/gossipsub: Add mesh metrics (#2316) Enable instrumenting mesh through metrics and add gossipsub to misc/metrics. Co-authored-by: Max Inden --- Cargo.toml | 2 +- misc/metrics/CHANGELOG.md | 10 +- misc/metrics/Cargo.toml | 2 + misc/metrics/src/gossipsub.rs | 52 ++++ misc/metrics/src/lib.rs | 6 + protocols/gossipsub/CHANGELOG.md | 3 + protocols/gossipsub/Cargo.toml | 2 + protocols/gossipsub/src/behaviour.rs | 170 ++++++++++-- protocols/gossipsub/src/behaviour/tests.rs | 1 + protocols/gossipsub/src/lib.rs | 1 + protocols/gossipsub/src/metrics.rs | 304 +++++++++++++++++++++ protocols/gossipsub/src/topic.rs | 3 +- 12 files changed, 529 insertions(+), 27 deletions(-) create mode 100644 misc/metrics/src/gossipsub.rs create mode 100644 protocols/gossipsub/src/metrics.rs diff --git a/Cargo.toml b/Cargo.toml index 806e92ae..a8d93817 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ dns-tokio = ["libp2p-dns", "libp2p-dns/tokio"] floodsub = ["libp2p-floodsub"] identify = ["libp2p-identify", "libp2p-metrics/identify"] kad = ["libp2p-kad", "libp2p-metrics/kad"] -gossipsub = ["libp2p-gossipsub"] +gossipsub = ["libp2p-gossipsub", "libp2p-metrics/gossipsub"] metrics = ["libp2p-metrics"] mdns = ["libp2p-mdns"] mplex = ["libp2p-mplex"] diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index f25070ee..3694c75b 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,7 +1,11 @@ -## Version 0.2.0 [unreleased] +# 0.2.0 [unreleased] + +- Include gossipsub metrics (see [PR 2316]). - Update dependencies. -## Version 0.1.0 [2021-11-01] +[PR 2316]: https://github.com/libp2p/rust-libp2p/pull/2316 -- Add initial version. \ No newline at end of file +# 0.1.0 [2021-11-01] + +- Add initial version. diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index a2674de5..86e5d042 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -10,12 +10,14 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [features] +gossipsub = ["libp2p-gossipsub"] identify = ["libp2p-identify"] kad = ["libp2p-kad"] ping = ["libp2p-ping"] [dependencies] libp2p-core = { version = "0.30.0", path = "../../core" } +libp2p-gossipsub = { version = "0.34.0", path = "../../protocols/gossipsub", optional = true } libp2p-identify = { version = "0.32.0", path = "../../protocols/identify", optional = true } libp2p-kad = { version = "0.33.0", path = "../../protocols/kad", optional = true } libp2p-ping = { version = "0.32.0", path = "../../protocols/ping", optional = true } diff --git a/misc/metrics/src/gossipsub.rs b/misc/metrics/src/gossipsub.rs new file mode 100644 index 00000000..1e71416a --- /dev/null +++ b/misc/metrics/src/gossipsub.rs @@ -0,0 +1,52 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use open_metrics_client::metrics::counter::Counter; +use open_metrics_client::registry::Registry; + +pub struct Metrics { + messages: Counter, +} + +impl Metrics { + pub fn new(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("gossipsub"); + + let messages = Counter::default(); + sub_registry.register( + "messages", + "Number of messages received", + Box::new(messages.clone()), + ); + + Self { messages } + } +} + +impl super::Recorder for super::Metrics { + fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) { + match event { + libp2p_gossipsub::GossipsubEvent::Message { .. } => { + self.gossipsub.messages.inc(); + } + _ => {} + } + } +} diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 4582904c..c3aa376b 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -25,6 +25,8 @@ //! //! See `examples` directory for more. +#[cfg(feature = "gossipsub")] +mod gossipsub; #[cfg(feature = "identify")] mod identify; #[cfg(feature = "kad")] @@ -37,6 +39,8 @@ use open_metrics_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. pub struct Metrics { + #[cfg(feature = "gossipsub")] + gossipsub: gossipsub::Metrics, #[cfg(feature = "identify")] identify: identify::Metrics, #[cfg(feature = "kad")] @@ -58,6 +62,8 @@ impl Metrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("libp2p"); Self { + #[cfg(feature = "gossipsub")] + gossipsub: gossipsub::Metrics::new(sub_registry), #[cfg(feature = "identify")] identify: identify::Metrics::new(sub_registry), #[cfg(feature = "kad")] diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 46c830d2..78f13a74 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,7 @@ # 0.34.0 [unreleased] +- Add topic and mesh metrics (see [PR 2316]). + - Fix bug in internal peer's topics tracking (see [PR 2325]). - Use `instant` and `futures-timer` instead of `wasm-timer` (see [PR 2245]). @@ -8,6 +10,7 @@ [PR 2245]: https://github.com/libp2p/rust-libp2p/pull/2245 [PR 2325]: https://github.com/libp2p/rust-libp2p/pull/2325 +[PR 2316]: https://github.com/libp2p/rust-libp2p/pull/2316 # 0.33.0 [2021-11-01] diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 66c00067..1d496f2c 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -29,6 +29,8 @@ regex = "1.4.0" futures-timer = "3.0.2" pin-project = "1.0.8" instant = "0.1.11" +# Metrics dependencies +open-metrics-client = "0.12" [dev-dependencies] async-std = "1.6.3" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 7bb186d1..9daddc5d 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -32,6 +32,7 @@ use std::{ use futures::StreamExt; use log::{debug, error, trace, warn}; +use open_metrics_client::registry::Registry; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; @@ -50,6 +51,7 @@ use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; +use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; @@ -302,6 +304,9 @@ pub struct Gossipsub< /// calculating the message-id and sending to the application. This is designed to allow the /// user to implement arbitrary topic-based compression algorithms. data_transform: D, + + /// Keep track of a set of internal metrics relating to gossipsub. + metrics: Option, } impl Gossipsub @@ -318,6 +323,25 @@ where Self::new_with_subscription_filter_and_transform( privacy, config, + None, + F::default(), + D::default(), + ) + } + + /// Creates a [`Gossipsub`] struct given a set of parameters specified via a + /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. + /// Metrics can be evaluated by passing a reference to a [`Registry`]. + pub fn new_with_metrics( + privacy: MessageAuthenticity, + config: GossipsubConfig, + metrics_registry: &mut Registry, + metrics_config: MetricsConfig, + ) -> Result { + Self::new_with_subscription_filter_and_transform( + privacy, + config, + Some((metrics_registry, metrics_config)), F::default(), D::default(), ) @@ -334,11 +358,13 @@ where pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, + metrics, subscription_filter, D::default(), ) @@ -355,11 +381,13 @@ where pub fn new_with_transform( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<(&mut Registry, MetricsConfig)>, data_transform: D, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, + metrics, F::default(), data_transform, ) @@ -376,6 +404,7 @@ where pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, data_transform: D, ) -> Result { @@ -385,9 +414,8 @@ where // were received locally. validate_config(&privacy, config.validation_mode())?; - // Set up message publishing parameters. - Ok(Gossipsub { + metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)), events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), @@ -604,6 +632,7 @@ where debug!("Publishing message: {:?}", msg_id); let topic_hash = raw_message.topic.clone(); + let msg_bytes = raw_message.data.len(); // If we are not flood publishing forward the message to mesh peers. let mesh_peers_sent = @@ -700,6 +729,10 @@ where for peer_id in recipient_peers.iter() { debug!("Sending message to peer: {:?}", peer_id); self.send_message(*peer_id, event.clone())?; + + if let Some(m) = self.metrics.as_mut() { + m.msg_sent(&topic_hash, msg_bytes); + } } debug!("Published message: {:?}", &msg_id); @@ -868,6 +901,10 @@ where let mut added_peers = HashSet::new(); + if let Some(m) = self.metrics.as_mut() { + m.joined(topic_hash) + } + // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do, // removing the fanout entry. if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) { @@ -899,10 +936,16 @@ where topic_hash.clone(), peers.into_iter().take(add_peers).collect(), ); + // remove the last published time self.fanout_last_pub.remove(topic_hash); } + let fanaout_added = added_peers.len(); + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Fanaout, fanaout_added) + } + // check if we need to get more peers, which we randomly select if added_peers.len() < self.config.mesh_n() { // get the peers @@ -931,6 +974,11 @@ where mesh_peers.extend(new_peers); } + let random_added = added_peers.len() - fanaout_added; + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Random, random_added) + } + for peer_id in added_peers { // Send a GRAFT control message debug!("JOIN: Sending Graft message to peer: {:?}", peer_id); @@ -955,6 +1003,12 @@ where &self.connected_peers, ); } + + let mesh_peers = self.mesh_peers(topic_hash).count(); + if let Some(m) = self.metrics.as_mut() { + m.set_mesh_peers(topic_hash, mesh_peers) + } + debug!("Completed JOIN for topic: {:?}", topic_hash); } @@ -1020,6 +1074,9 @@ where // If our mesh contains the topic, send prune to peers and delete it from the mesh if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) { + if let Some(m) = self.metrics.as_mut() { + m.left(topic_hash) + } for peer in peers { // Send a PRUNE control message debug!("LEAVE: Sending PRUNE to peer: {:?}", peer); @@ -1218,7 +1275,18 @@ where if !cached_messages.is_empty() { debug!("IWANT: Sending cached messages to peer: {:?}", peer_id); // Send the messages to the peer - let message_list = cached_messages.into_iter().map(|entry| entry.1).collect(); + let message_list: Vec<_> = cached_messages.into_iter().map(|entry| entry.1).collect(); + + let mut topic_msgs = HashMap::>::default(); + if self.metrics.is_some() { + for msg in message_list.iter() { + topic_msgs + .entry(msg.topic.clone()) + .or_default() + .push(msg.data.len()); + } + } + if self .send_message( *peer_id, @@ -1232,6 +1300,13 @@ where .is_err() { error!("Failed to send cached messages. Messages too large"); + } else if let Some(m) = self.metrics.as_mut() { + // Sending of messages succeeded, register them on the internal metrics. + for (topic, msg_bytes_vec) in topic_msgs.into_iter() { + for msg_bytes in msg_bytes_vec { + m.msg_sent(&topic, msg_bytes); + } + } } } debug!("Completed IWANT handling for peer: {}", peer_id); @@ -1338,7 +1413,13 @@ where "GRAFT: Mesh link added for peer: {:?} in topic: {:?}", peer_id, &topic_hash ); - peers.insert(*peer_id); + + if peers.insert(*peer_id) { + if let Some(m) = self.metrics.as_mut() { + m.peers_included(&topic_hash, Inclusion::Subscribed, 1) + } + } + // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( *peer_id, @@ -1401,6 +1482,7 @@ where topic_hash: &TopicHash, backoff: Option, always_update_backoff: bool, + reason: Churn, ) { let mut update_backoff = always_update_backoff; if let Some(peers) = self.mesh.get_mut(topic_hash) { @@ -1411,6 +1493,9 @@ where peer_id.to_string(), topic_hash ); + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic_hash, reason, 1) + } if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer_id, topic_hash.clone()); @@ -1450,7 +1535,7 @@ where let (below_threshold, score) = self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { - self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); + self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune); if self.mesh.contains_key(&topic_hash) { //connect to px peers @@ -1779,9 +1864,10 @@ where for subscription in filtered_topics { // get the peers from the mapping, or insert empty lists if the topic doesn't exist + let topic_hash = &subscription.topic_hash; let peer_list = self .topic_peers - .entry(subscription.topic_hash.clone()) + .entry(topic_hash.clone()) .or_insert_with(Default::default); match subscription.action { @@ -1790,12 +1876,12 @@ where debug!( "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}", propagation_source.to_string(), - subscription.topic_hash + topic_hash ); } // add to the peer_topics mapping - subscribed_topics.insert(subscription.topic_hash.clone()); + subscribed_topics.insert(topic_hash.clone()); // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) @@ -1813,28 +1899,30 @@ where .0 && !self .backoffs - .is_backoff_with_slack(&subscription.topic_hash, propagation_source) + .is_backoff_with_slack(topic_hash, propagation_source) { - if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { + if let Some(peers) = self.mesh.get_mut(topic_hash) { if peers.len() < self.config.mesh_n_low() && peers.insert(*propagation_source) { debug!( "SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}", propagation_source.to_string(), - subscription.topic_hash + topic_hash ); + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Subscribed, 1) + } // send graft to the peer debug!( "Sending GRAFT to peer {} for topic {:?}", propagation_source.to_string(), - subscription.topic_hash + topic_hash ); if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score - .graft(propagation_source, subscription.topic_hash.clone()); + peer_score.graft(propagation_source, topic_hash.clone()); } - topics_to_graft.push(subscription.topic_hash.clone()); + topics_to_graft.push(topic_hash.clone()); } } } @@ -1842,7 +1930,7 @@ where application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Subscribed { peer_id: *propagation_source, - topic: subscription.topic_hash.clone(), + topic: topic_hash.clone(), }, )); } @@ -1851,26 +1939,31 @@ where debug!( "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}", propagation_source.to_string(), - subscription.topic_hash + topic_hash ); } + // remove topic from the peer_topics mapping - subscribed_topics.remove(&subscription.topic_hash); - unsubscribed_peers.push((*propagation_source, subscription.topic_hash.clone())); + subscribed_topics.remove(topic_hash); + unsubscribed_peers.push((*propagation_source, topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Unsubscribed { peer_id: *propagation_source, - topic: subscription.topic_hash.clone(), + topic: topic_hash.clone(), }, )); } } + + if let Some(m) = self.metrics.as_mut() { + m.set_topic_peers(topic_hash, peer_list.len()); + } } // remove unsubscribed peers from the mesh if it exists for (peer_id, topic_hash) in unsubscribed_peers { - self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); + self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub); } // Potentially inform the handler if we have added this peer to a mesh for the first time. @@ -1994,6 +2087,11 @@ where }) .cloned() .collect(); + + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic_hash, Churn::BadScore, to_remove.len()) + } + for peer in to_remove { peers.remove(&peer); } @@ -2026,6 +2124,9 @@ where } // update the mesh debug!("Updating mesh, new mesh: {:?}", peer_list); + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) + } peers.extend(peer_list); } @@ -2080,6 +2181,10 @@ where current_topic.push(topic_hash.clone()); removed += 1; } + + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic_hash, Churn::Excess, removed) + } } // do we have enough outbound peers? @@ -2109,6 +2214,9 @@ where } // update the mesh debug!("Updating mesh, new mesh: {:?}", peer_list); + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len()) + } peers.extend(peer_list); } } @@ -2168,10 +2276,17 @@ where "Opportunistically graft in topic {} with peers {:?}", topic_hash, peer_list ); + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) + } peers.extend(peer_list); } } } + // Register the final count of peers in the mesh + if let Some(m) = self.metrics.as_mut() { + m.set_mesh_peers(topic_hash, peers.len()) + } } // remove expired fanout topics @@ -2523,6 +2638,9 @@ where for peer in recipient_peers.iter() { debug!("Sending message: {:?} to peer {:?}", msg_id, peer); self.send_message(*peer, event.clone())?; + if let Some(m) = self.metrics.as_mut() { + m.msg_sent(&message.topic, message.data.len()); + } } debug!("Completed forwarding message"); Ok(true) @@ -2887,7 +3005,12 @@ where // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(topic) { // check if the peer is in the mesh and remove it - mesh_peers.remove(peer_id); + if mesh_peers.remove(peer_id) { + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic, Churn::Dc, 1); + m.set_mesh_peers(topic, mesh_peers.len()); + } + }; } // remove from topic_peers @@ -2899,6 +3022,9 @@ where peer_id ); } + if let Some(m) = self.metrics.as_mut() { + m.set_topic_peers(topic, peer_list.len()) + } } else { warn!( "Disconnected node: {} with topic: {:?} not in topic_peers", diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 55d5284a..b05df666 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -72,6 +72,7 @@ mod tests { let mut gs: Gossipsub = Gossipsub::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, + None, self.subscription_filter, self.data_transform, ) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 5a426d51..a05c8180 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -132,6 +132,7 @@ mod gossip_promises; mod handler; mod interval; mod mcache; +pub mod metrics; mod peer_score; pub mod subscription_filter; pub mod time_cache; diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs new file mode 100644 index 00000000..8e4e16f0 --- /dev/null +++ b/protocols/gossipsub/src/metrics.rs @@ -0,0 +1,304 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A set of metrics used to help track and diagnose the network behaviour of the gossipsub +//! protocol. + +use std::collections::HashMap; + +use open_metrics_client::encoding::text::Encode; +use open_metrics_client::metrics::counter::Counter; +use open_metrics_client::metrics::family::Family; +use open_metrics_client::metrics::gauge::Gauge; +use open_metrics_client::registry::Registry; + +use crate::topic::TopicHash; + +// Default value that limits for how many topics do we store metrics. +const DEFAULT_MAX_TOPICS: usize = 300; + +// Default value that limits how many topics for which there has never been a subscription do we +// store metrics. +const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50; + +pub struct Config { + /// This provides an upper bound to the number of mesh topics we create metrics for. It + /// prevents unbounded labels being created in the metrics. + pub max_topics: usize, + /// Mesh topics are controlled by the user via subscriptions whereas non-mesh topics are + /// determined by users on the network. This limit permits a fixed amount of topics to allow, + /// in-addition to the mesh topics. + pub max_never_subscribed_topics: usize, +} + +impl Default for Config { + fn default() -> Self { + Config { + max_topics: DEFAULT_MAX_TOPICS, + max_never_subscribed_topics: DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS, + } + } +} + +/// Whether we have ever been subscribed to this topic. +type EverSubscribed = bool; + +/// Reasons why a peer was included in the mesh. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +pub enum Inclusion { + /// Peer was a fanaout peer. + Fanaout, + /// Included from random selection. + Random, + /// Peer subscribed. + Subscribed, + /// Peer was included to fill the outbound quota. + Outbound, +} + +/// Reasons why a peer was removed from the mesh. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +pub enum Churn { + /// Peer disconnected. + Dc, + /// Peer had a bad score. + BadScore, + /// Peer sent a PRUNE. + Prune, + /// Peer unsubscribed. + Unsub, + /// Too many peers. + Excess, +} + +/// Label for the mesh inclusion event metrics. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +struct InclusionLabel { + topic: TopicHash, + reason: Inclusion, +} + +/// Label for the mesh churn event metrics. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +struct ChurnLabel { + topic: TopicHash, + reason: Churn, +} + +/// A collection of metrics used throughout the Gossipsub behaviour. +pub struct Metrics { + /* Configuration parameters */ + /// Maximum number of topics for which we store metrics. This helps keep the metrics bounded. + max_topics: usize, + /// Maximum number of topics for which we store metrics, where the topic in not one to which we + /// have subscribed at some point. This helps keep the metrics bounded, since these topics come + /// from received messages and not explicit application subscriptions. + max_never_subscribed_topics: usize, + + /* Auxiliary variables */ + /// Information needed to decide if a topic is allowed or not. + topic_info: HashMap, + + /* Metrics per known topic */ + /// Status of our subscription to this topic. This metric allows analyzing other topic metrics + /// filtered by our current subscription status. + topic_subscription_status: Family, + /// Number of peers subscribed to each topic. This allows us to analyze a topic's behaviour + /// regardless of our subscription status. + topic_peers_count: Family, + + /* Metrics regarding mesh state */ + /// Number of peers in our mesh. This metric should be updated with the count of peers for a + /// topic in the mesh regardless of inclusion and churn events. + mesh_peer_counts: Family, + /// Number of times we include peers in a topic mesh for different reasons. + mesh_peer_inclusion_events: Family, + /// Number of times we remove peers in a topic mesh for different reasons. + mesh_peer_churn_events: Family, + + /* Metrics regarding messages sent */ + /// Number of gossip messages sent to each topic. + topic_msg_sent_counts: Family, + /// Bytes from gossip messages sent to each topic . + topic_msg_sent_bytes: Family, +} + +impl Metrics { + pub fn new(registry: &mut Registry, config: Config) -> Self { + // Destructure the config to be sure everything is used. + let Config { + max_topics, + max_never_subscribed_topics, + } = config; + + macro_rules! register_family { + ($name:expr, $help:expr) => {{ + let fam = Family::default(); + registry.register($name, $help, Box::new(fam.clone())); + fam + }}; + } + + let topic_subscription_status = register_family!( + "topic_subscription_status", + "Subscription status per known topic" + ); + let topic_peers_count = register_family!( + "topic_peers_counts", + "Number of peers subscribed to each topic" + ); + + let mesh_peer_counts = register_family!( + "mesh_peer_counts", + "Number of peers in each topic in our mesh" + ); + let mesh_peer_inclusion_events = register_family!( + "mesh_peer_inclusion_events", + "Number of times a peer gets added to our mesh for different reasons" + ); + let mesh_peer_churn_events = register_family!( + "mesh_peer_churn_events", + "Number of times a peer gets removed from our mesh for different reasons" + ); + + let topic_msg_sent_counts = register_family!( + "topic_msg_sent_counts", + "Number of gossip messages sent to each topic." + ); + let topic_msg_sent_bytes = register_family!( + "topic_msg_sent_bytes", + "Bytes from gossip messages sent to each topic." + ); + + Self { + max_topics, + max_never_subscribed_topics, + topic_info: HashMap::default(), + topic_subscription_status, + topic_peers_count, + mesh_peer_counts, + mesh_peer_inclusion_events, + mesh_peer_churn_events, + topic_msg_sent_counts, + topic_msg_sent_bytes, + } + } + + fn non_subscription_topics_count(&self) -> usize { + self.topic_info + .values() + .filter(|&ever_subscribed| !ever_subscribed) + .count() + } + + /// Registers a topic if not already known and if the bounds allow it. + fn register_topic(&mut self, topic: &TopicHash) -> Result<(), ()> { + if self.topic_info.contains_key(topic) { + Ok(()) + } else if self.topic_info.len() < self.max_topics + && self.non_subscription_topics_count() < self.max_never_subscribed_topics + { + // This is a topic without an explicit subscription and we register it if we are within + // the configured bounds. + self.topic_info.entry(topic.clone()).or_insert(false); + self.topic_subscription_status.get_or_create(topic).set(0); + Ok(()) + } else { + // We don't know this topic and there is no space left to store it + Err(()) + } + } + + /// Register how many peers do we known are subscribed to this topic. + pub fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) { + if self.register_topic(topic).is_ok() { + self.topic_peers_count + .get_or_create(topic) + .set(count as u64); + } + } + + /* Mesh related methods */ + + /// Registers the subscription to a topic if the configured limits allow it. + /// Sets the registered number of peers in the mesh to 0. + pub fn joined(&mut self, topic: &TopicHash) { + if self.topic_info.contains_key(topic) || self.topic_info.len() < self.max_topics { + self.topic_info.insert(topic.clone(), true); + let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(1); + debug_assert_eq!(was_subscribed, 0); + self.mesh_peer_counts.get_or_create(topic).set(0); + } + } + + /// Registers the unsubscription to a topic if the topic was previously allowed. + /// Sets the registered number of peers in the mesh to 0. + pub fn left(&mut self, topic: &TopicHash) { + if self.topic_info.contains_key(topic) { + // Depending on the configured topic bounds we could miss a mesh topic. + // So, check first if the topic was previously allowed. + let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(0); + debug_assert_eq!(was_subscribed, 1); + self.mesh_peer_counts.get_or_create(topic).set(0); + } + } + + /// Register the inclusion of peers in our mesh due to some reason. + pub fn peers_included(&mut self, topic: &TopicHash, reason: Inclusion, count: usize) { + if self.register_topic(topic).is_ok() { + self.mesh_peer_inclusion_events + .get_or_create(&InclusionLabel { + topic: topic.clone(), + reason, + }) + .inc_by(count as u64); + } + } + + /// Register the removal of peers in our mesh due to some reason. + pub fn peers_removed(&mut self, topic: &TopicHash, reason: Churn, count: usize) { + if self.register_topic(topic).is_ok() { + self.mesh_peer_churn_events + .get_or_create(&ChurnLabel { + topic: topic.clone(), + reason, + }) + .inc_by(count as u64); + } + } + + /// Register the current number of peers in our mesh for this topic. + pub fn set_mesh_peers(&mut self, topic: &TopicHash, count: usize) { + if self.register_topic(topic).is_ok() { + // Due to limits, this topic could have not been allowed, so we check. + self.mesh_peer_counts.get_or_create(topic).set(count as u64); + } + } + + /// Register sending a message over a topic. + pub fn msg_sent(&mut self, topic: &TopicHash, bytes: usize) { + if self.register_topic(topic).is_ok() { + self.topic_msg_sent_counts.get_or_create(topic).inc(); + self.topic_msg_sent_bytes + .get_or_create(topic) + .inc_by(bytes as u64); + } + } +} diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index 7e8afca2..f737056f 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -20,6 +20,7 @@ use crate::rpc_proto; use base64::encode; +use open_metrics_client::encoding::text::Encode; use prost::Message; use sha2::{Digest, Sha256}; use std::fmt; @@ -60,7 +61,7 @@ impl Hasher for Sha256Hash { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Encode)] pub struct TopicHash { /// The topic hash. Stored as a string to align with the protobuf API. hash: String,