protocols/gossipsub: Add mesh metrics (#2316)

Enable instrumenting mesh through metrics and add gossipsub to
misc/metrics.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Divma 2021-11-16 08:59:39 -05:00 committed by GitHub
parent c4f7877853
commit 2066a192ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 529 additions and 27 deletions

View File

@ -39,7 +39,7 @@ dns-tokio = ["libp2p-dns", "libp2p-dns/tokio"]
floodsub = ["libp2p-floodsub"]
identify = ["libp2p-identify", "libp2p-metrics/identify"]
kad = ["libp2p-kad", "libp2p-metrics/kad"]
gossipsub = ["libp2p-gossipsub"]
gossipsub = ["libp2p-gossipsub", "libp2p-metrics/gossipsub"]
metrics = ["libp2p-metrics"]
mdns = ["libp2p-mdns"]
mplex = ["libp2p-mplex"]

View File

@ -1,7 +1,11 @@
## Version 0.2.0 [unreleased]
# 0.2.0 [unreleased]
- Include gossipsub metrics (see [PR 2316]).
- Update dependencies.
## Version 0.1.0 [2021-11-01]
[PR 2316]: https://github.com/libp2p/rust-libp2p/pull/2316
- Add initial version.
# 0.1.0 [2021-11-01]
- Add initial version.

View File

@ -10,12 +10,14 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[features]
gossipsub = ["libp2p-gossipsub"]
identify = ["libp2p-identify"]
kad = ["libp2p-kad"]
ping = ["libp2p-ping"]
[dependencies]
libp2p-core = { version = "0.30.0", path = "../../core" }
libp2p-gossipsub = { version = "0.34.0", path = "../../protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.32.0", path = "../../protocols/identify", optional = true }
libp2p-kad = { version = "0.33.0", path = "../../protocols/kad", optional = true }
libp2p-ping = { version = "0.32.0", path = "../../protocols/ping", optional = true }

View File

@ -0,0 +1,52 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use open_metrics_client::metrics::counter::Counter;
use open_metrics_client::registry::Registry;
pub struct Metrics {
messages: Counter,
}
impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("gossipsub");
let messages = Counter::default();
sub_registry.register(
"messages",
"Number of messages received",
Box::new(messages.clone()),
);
Self { messages }
}
}
impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for super::Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
match event {
libp2p_gossipsub::GossipsubEvent::Message { .. } => {
self.gossipsub.messages.inc();
}
_ => {}
}
}
}

View File

@ -25,6 +25,8 @@
//!
//! See `examples` directory for more.
#[cfg(feature = "gossipsub")]
mod gossipsub;
#[cfg(feature = "identify")]
mod identify;
#[cfg(feature = "kad")]
@ -37,6 +39,8 @@ use open_metrics_client::registry::Registry;
/// Set of Swarm and protocol metrics derived from emitted events.
pub struct Metrics {
#[cfg(feature = "gossipsub")]
gossipsub: gossipsub::Metrics,
#[cfg(feature = "identify")]
identify: identify::Metrics,
#[cfg(feature = "kad")]
@ -58,6 +62,8 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("libp2p");
Self {
#[cfg(feature = "gossipsub")]
gossipsub: gossipsub::Metrics::new(sub_registry),
#[cfg(feature = "identify")]
identify: identify::Metrics::new(sub_registry),
#[cfg(feature = "kad")]

View File

@ -1,5 +1,7 @@
# 0.34.0 [unreleased]
- Add topic and mesh metrics (see [PR 2316]).
- Fix bug in internal peer's topics tracking (see [PR 2325]).
- Use `instant` and `futures-timer` instead of `wasm-timer` (see [PR 2245]).
@ -8,6 +10,7 @@
[PR 2245]: https://github.com/libp2p/rust-libp2p/pull/2245
[PR 2325]: https://github.com/libp2p/rust-libp2p/pull/2325
[PR 2316]: https://github.com/libp2p/rust-libp2p/pull/2316
# 0.33.0 [2021-11-01]

View File

@ -29,6 +29,8 @@ regex = "1.4.0"
futures-timer = "3.0.2"
pin-project = "1.0.8"
instant = "0.1.11"
# Metrics dependencies
open-metrics-client = "0.12"
[dev-dependencies]
async-std = "1.6.3"

View File

@ -32,6 +32,7 @@ use std::{
use futures::StreamExt;
use log::{debug, error, trace, warn};
use open_metrics_client::registry::Registry;
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
@ -50,6 +51,7 @@ use crate::error::{PublishError, SubscriptionError, ValidationError};
use crate::gossip_promises::GossipPromises;
use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
@ -302,6 +304,9 @@ pub struct Gossipsub<
/// calculating the message-id and sending to the application. This is designed to allow the
/// user to implement arbitrary topic-based compression algorithms.
data_transform: D,
/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,
}
impl<D, F> Gossipsub<D, F>
@ -318,6 +323,25 @@ where
Self::new_with_subscription_filter_and_transform(
privacy,
config,
None,
F::default(),
D::default(),
)
}
/// Creates a [`Gossipsub`] struct given a set of parameters specified via a
/// [`GossipsubConfig`]. This has no subscription filter and uses no compression.
/// Metrics can be evaluated by passing a reference to a [`Registry`].
pub fn new_with_metrics(
privacy: MessageAuthenticity,
config: GossipsubConfig,
metrics_registry: &mut Registry,
metrics_config: MetricsConfig,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
Some((metrics_registry, metrics_config)),
F::default(),
D::default(),
)
@ -334,11 +358,13 @@ where
pub fn new_with_subscription_filter(
privacy: MessageAuthenticity,
config: GossipsubConfig,
metrics: Option<(&mut Registry, MetricsConfig)>,
subscription_filter: F,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
metrics,
subscription_filter,
D::default(),
)
@ -355,11 +381,13 @@ where
pub fn new_with_transform(
privacy: MessageAuthenticity,
config: GossipsubConfig,
metrics: Option<(&mut Registry, MetricsConfig)>,
data_transform: D,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
metrics,
F::default(),
data_transform,
)
@ -376,6 +404,7 @@ where
pub fn new_with_subscription_filter_and_transform(
privacy: MessageAuthenticity,
config: GossipsubConfig,
metrics: Option<(&mut Registry, MetricsConfig)>,
subscription_filter: F,
data_transform: D,
) -> Result<Self, &'static str> {
@ -385,9 +414,8 @@ where
// were received locally.
validate_config(&privacy, config.validation_mode())?;
// Set up message publishing parameters.
Ok(Gossipsub {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
@ -604,6 +632,7 @@ where
debug!("Publishing message: {:?}", msg_id);
let topic_hash = raw_message.topic.clone();
let msg_bytes = raw_message.data.len();
// If we are not flood publishing forward the message to mesh peers.
let mesh_peers_sent =
@ -700,6 +729,10 @@ where
for peer_id in recipient_peers.iter() {
debug!("Sending message to peer: {:?}", peer_id);
self.send_message(*peer_id, event.clone())?;
if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&topic_hash, msg_bytes);
}
}
debug!("Published message: {:?}", &msg_id);
@ -868,6 +901,10 @@ where
let mut added_peers = HashSet::new();
if let Some(m) = self.metrics.as_mut() {
m.joined(topic_hash)
}
// check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
// removing the fanout entry.
if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
@ -899,10 +936,16 @@ where
topic_hash.clone(),
peers.into_iter().take(add_peers).collect(),
);
// remove the last published time
self.fanout_last_pub.remove(topic_hash);
}
let fanaout_added = added_peers.len();
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Fanaout, fanaout_added)
}
// check if we need to get more peers, which we randomly select
if added_peers.len() < self.config.mesh_n() {
// get the peers
@ -931,6 +974,11 @@ where
mesh_peers.extend(new_peers);
}
let random_added = added_peers.len() - fanaout_added;
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Random, random_added)
}
for peer_id in added_peers {
// Send a GRAFT control message
debug!("JOIN: Sending Graft message to peer: {:?}", peer_id);
@ -955,6 +1003,12 @@ where
&self.connected_peers,
);
}
let mesh_peers = self.mesh_peers(topic_hash).count();
if let Some(m) = self.metrics.as_mut() {
m.set_mesh_peers(topic_hash, mesh_peers)
}
debug!("Completed JOIN for topic: {:?}", topic_hash);
}
@ -1020,6 +1074,9 @@ where
// If our mesh contains the topic, send prune to peers and delete it from the mesh
if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
if let Some(m) = self.metrics.as_mut() {
m.left(topic_hash)
}
for peer in peers {
// Send a PRUNE control message
debug!("LEAVE: Sending PRUNE to peer: {:?}", peer);
@ -1218,7 +1275,18 @@ where
if !cached_messages.is_empty() {
debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
// Send the messages to the peer
let message_list = cached_messages.into_iter().map(|entry| entry.1).collect();
let message_list: Vec<_> = cached_messages.into_iter().map(|entry| entry.1).collect();
let mut topic_msgs = HashMap::<TopicHash, Vec<usize>>::default();
if self.metrics.is_some() {
for msg in message_list.iter() {
topic_msgs
.entry(msg.topic.clone())
.or_default()
.push(msg.data.len());
}
}
if self
.send_message(
*peer_id,
@ -1232,6 +1300,13 @@ where
.is_err()
{
error!("Failed to send cached messages. Messages too large");
} else if let Some(m) = self.metrics.as_mut() {
// Sending of messages succeeded, register them on the internal metrics.
for (topic, msg_bytes_vec) in topic_msgs.into_iter() {
for msg_bytes in msg_bytes_vec {
m.msg_sent(&topic, msg_bytes);
}
}
}
}
debug!("Completed IWANT handling for peer: {}", peer_id);
@ -1338,7 +1413,13 @@ where
"GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
peer_id, &topic_hash
);
peers.insert(*peer_id);
if peers.insert(*peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
}
}
// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
*peer_id,
@ -1401,6 +1482,7 @@ where
topic_hash: &TopicHash,
backoff: Option<u64>,
always_update_backoff: bool,
reason: Churn,
) {
let mut update_backoff = always_update_backoff;
if let Some(peers) = self.mesh.get_mut(topic_hash) {
@ -1411,6 +1493,9 @@ where
peer_id.to_string(),
topic_hash
);
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic_hash, reason, 1)
}
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer_id, topic_hash.clone());
@ -1450,7 +1535,7 @@ where
let (below_threshold, score) =
self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
for (topic_hash, px, backoff) in prune_data {
self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true);
self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);
if self.mesh.contains_key(&topic_hash) {
//connect to px peers
@ -1779,9 +1864,10 @@ where
for subscription in filtered_topics {
// get the peers from the mapping, or insert empty lists if the topic doesn't exist
let topic_hash = &subscription.topic_hash;
let peer_list = self
.topic_peers
.entry(subscription.topic_hash.clone())
.entry(topic_hash.clone())
.or_insert_with(Default::default);
match subscription.action {
@ -1790,12 +1876,12 @@ where
debug!(
"SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
propagation_source.to_string(),
subscription.topic_hash
topic_hash
);
}
// add to the peer_topics mapping
subscribed_topics.insert(subscription.topic_hash.clone());
subscribed_topics.insert(topic_hash.clone());
// if the mesh needs peers add the peer to the mesh
if !self.explicit_peers.contains(propagation_source)
@ -1813,28 +1899,30 @@ where
.0
&& !self
.backoffs
.is_backoff_with_slack(&subscription.topic_hash, propagation_source)
.is_backoff_with_slack(topic_hash, propagation_source)
{
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
if let Some(peers) = self.mesh.get_mut(topic_hash) {
if peers.len() < self.config.mesh_n_low()
&& peers.insert(*propagation_source)
{
debug!(
"SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
propagation_source.to_string(),
subscription.topic_hash
topic_hash
);
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Subscribed, 1)
}
// send graft to the peer
debug!(
"Sending GRAFT to peer {} for topic {:?}",
propagation_source.to_string(),
subscription.topic_hash
topic_hash
);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score
.graft(propagation_source, subscription.topic_hash.clone());
peer_score.graft(propagation_source, topic_hash.clone());
}
topics_to_graft.push(subscription.topic_hash.clone());
topics_to_graft.push(topic_hash.clone());
}
}
}
@ -1842,7 +1930,7 @@ where
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Subscribed {
peer_id: *propagation_source,
topic: subscription.topic_hash.clone(),
topic: topic_hash.clone(),
},
));
}
@ -1851,26 +1939,31 @@ where
debug!(
"SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
propagation_source.to_string(),
subscription.topic_hash
topic_hash
);
}
// remove topic from the peer_topics mapping
subscribed_topics.remove(&subscription.topic_hash);
unsubscribed_peers.push((*propagation_source, subscription.topic_hash.clone()));
subscribed_topics.remove(topic_hash);
unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
// generate an unsubscribe event to be polled
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Unsubscribed {
peer_id: *propagation_source,
topic: subscription.topic_hash.clone(),
topic: topic_hash.clone(),
},
));
}
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic_hash, peer_list.len());
}
}
// remove unsubscribed peers from the mesh if it exists
for (peer_id, topic_hash) in unsubscribed_peers {
self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
}
// Potentially inform the handler if we have added this peer to a mesh for the first time.
@ -1994,6 +2087,11 @@ where
})
.cloned()
.collect();
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic_hash, Churn::BadScore, to_remove.len())
}
for peer in to_remove {
peers.remove(&peer);
}
@ -2026,6 +2124,9 @@ where
}
// update the mesh
debug!("Updating mesh, new mesh: {:?}", peer_list);
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
}
peers.extend(peer_list);
}
@ -2080,6 +2181,10 @@ where
current_topic.push(topic_hash.clone());
removed += 1;
}
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic_hash, Churn::Excess, removed)
}
}
// do we have enough outbound peers?
@ -2109,6 +2214,9 @@ where
}
// update the mesh
debug!("Updating mesh, new mesh: {:?}", peer_list);
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
}
peers.extend(peer_list);
}
}
@ -2168,10 +2276,17 @@ where
"Opportunistically graft in topic {} with peers {:?}",
topic_hash, peer_list
);
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
}
peers.extend(peer_list);
}
}
}
// Register the final count of peers in the mesh
if let Some(m) = self.metrics.as_mut() {
m.set_mesh_peers(topic_hash, peers.len())
}
}
// remove expired fanout topics
@ -2523,6 +2638,9 @@ where
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.send_message(*peer, event.clone())?;
if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&message.topic, message.data.len());
}
}
debug!("Completed forwarding message");
Ok(true)
@ -2887,7 +3005,12 @@ where
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
mesh_peers.remove(peer_id);
if mesh_peers.remove(peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic, Churn::Dc, 1);
m.set_mesh_peers(topic, mesh_peers.len());
}
};
}
// remove from topic_peers
@ -2899,6 +3022,9 @@ where
peer_id
);
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic, peer_list.len())
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",

View File

@ -72,6 +72,7 @@ mod tests {
let mut gs: Gossipsub<D, F> = Gossipsub::new_with_subscription_filter_and_transform(
MessageAuthenticity::Signed(keypair),
self.gs_config,
None,
self.subscription_filter,
self.data_transform,
)

View File

@ -132,6 +132,7 @@ mod gossip_promises;
mod handler;
mod interval;
mod mcache;
pub mod metrics;
mod peer_score;
pub mod subscription_filter;
pub mod time_cache;

View File

@ -0,0 +1,304 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! A set of metrics used to help track and diagnose the network behaviour of the gossipsub
//! protocol.
use std::collections::HashMap;
use open_metrics_client::encoding::text::Encode;
use open_metrics_client::metrics::counter::Counter;
use open_metrics_client::metrics::family::Family;
use open_metrics_client::metrics::gauge::Gauge;
use open_metrics_client::registry::Registry;
use crate::topic::TopicHash;
// Default value that limits for how many topics do we store metrics.
const DEFAULT_MAX_TOPICS: usize = 300;
// Default value that limits how many topics for which there has never been a subscription do we
// store metrics.
const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50;
pub struct Config {
/// This provides an upper bound to the number of mesh topics we create metrics for. It
/// prevents unbounded labels being created in the metrics.
pub max_topics: usize,
/// Mesh topics are controlled by the user via subscriptions whereas non-mesh topics are
/// determined by users on the network. This limit permits a fixed amount of topics to allow,
/// in-addition to the mesh topics.
pub max_never_subscribed_topics: usize,
}
impl Default for Config {
fn default() -> Self {
Config {
max_topics: DEFAULT_MAX_TOPICS,
max_never_subscribed_topics: DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS,
}
}
}
/// Whether we have ever been subscribed to this topic.
type EverSubscribed = bool;
/// Reasons why a peer was included in the mesh.
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
pub enum Inclusion {
/// Peer was a fanaout peer.
Fanaout,
/// Included from random selection.
Random,
/// Peer subscribed.
Subscribed,
/// Peer was included to fill the outbound quota.
Outbound,
}
/// Reasons why a peer was removed from the mesh.
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
pub enum Churn {
/// Peer disconnected.
Dc,
/// Peer had a bad score.
BadScore,
/// Peer sent a PRUNE.
Prune,
/// Peer unsubscribed.
Unsub,
/// Too many peers.
Excess,
}
/// Label for the mesh inclusion event metrics.
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
struct InclusionLabel {
topic: TopicHash,
reason: Inclusion,
}
/// Label for the mesh churn event metrics.
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
struct ChurnLabel {
topic: TopicHash,
reason: Churn,
}
/// A collection of metrics used throughout the Gossipsub behaviour.
pub struct Metrics {
/* Configuration parameters */
/// Maximum number of topics for which we store metrics. This helps keep the metrics bounded.
max_topics: usize,
/// Maximum number of topics for which we store metrics, where the topic in not one to which we
/// have subscribed at some point. This helps keep the metrics bounded, since these topics come
/// from received messages and not explicit application subscriptions.
max_never_subscribed_topics: usize,
/* Auxiliary variables */
/// Information needed to decide if a topic is allowed or not.
topic_info: HashMap<TopicHash, EverSubscribed>,
/* Metrics per known topic */
/// Status of our subscription to this topic. This metric allows analyzing other topic metrics
/// filtered by our current subscription status.
topic_subscription_status: Family<TopicHash, Gauge>,
/// Number of peers subscribed to each topic. This allows us to analyze a topic's behaviour
/// regardless of our subscription status.
topic_peers_count: Family<TopicHash, Gauge>,
/* Metrics regarding mesh state */
/// Number of peers in our mesh. This metric should be updated with the count of peers for a
/// topic in the mesh regardless of inclusion and churn events.
mesh_peer_counts: Family<TopicHash, Gauge>,
/// Number of times we include peers in a topic mesh for different reasons.
mesh_peer_inclusion_events: Family<InclusionLabel, Counter>,
/// Number of times we remove peers in a topic mesh for different reasons.
mesh_peer_churn_events: Family<ChurnLabel, Counter>,
/* Metrics regarding messages sent */
/// Number of gossip messages sent to each topic.
topic_msg_sent_counts: Family<TopicHash, Counter>,
/// Bytes from gossip messages sent to each topic .
topic_msg_sent_bytes: Family<TopicHash, Counter>,
}
impl Metrics {
pub fn new(registry: &mut Registry, config: Config) -> Self {
// Destructure the config to be sure everything is used.
let Config {
max_topics,
max_never_subscribed_topics,
} = config;
macro_rules! register_family {
($name:expr, $help:expr) => {{
let fam = Family::default();
registry.register($name, $help, Box::new(fam.clone()));
fam
}};
}
let topic_subscription_status = register_family!(
"topic_subscription_status",
"Subscription status per known topic"
);
let topic_peers_count = register_family!(
"topic_peers_counts",
"Number of peers subscribed to each topic"
);
let mesh_peer_counts = register_family!(
"mesh_peer_counts",
"Number of peers in each topic in our mesh"
);
let mesh_peer_inclusion_events = register_family!(
"mesh_peer_inclusion_events",
"Number of times a peer gets added to our mesh for different reasons"
);
let mesh_peer_churn_events = register_family!(
"mesh_peer_churn_events",
"Number of times a peer gets removed from our mesh for different reasons"
);
let topic_msg_sent_counts = register_family!(
"topic_msg_sent_counts",
"Number of gossip messages sent to each topic."
);
let topic_msg_sent_bytes = register_family!(
"topic_msg_sent_bytes",
"Bytes from gossip messages sent to each topic."
);
Self {
max_topics,
max_never_subscribed_topics,
topic_info: HashMap::default(),
topic_subscription_status,
topic_peers_count,
mesh_peer_counts,
mesh_peer_inclusion_events,
mesh_peer_churn_events,
topic_msg_sent_counts,
topic_msg_sent_bytes,
}
}
fn non_subscription_topics_count(&self) -> usize {
self.topic_info
.values()
.filter(|&ever_subscribed| !ever_subscribed)
.count()
}
/// Registers a topic if not already known and if the bounds allow it.
fn register_topic(&mut self, topic: &TopicHash) -> Result<(), ()> {
if self.topic_info.contains_key(topic) {
Ok(())
} else if self.topic_info.len() < self.max_topics
&& self.non_subscription_topics_count() < self.max_never_subscribed_topics
{
// This is a topic without an explicit subscription and we register it if we are within
// the configured bounds.
self.topic_info.entry(topic.clone()).or_insert(false);
self.topic_subscription_status.get_or_create(topic).set(0);
Ok(())
} else {
// We don't know this topic and there is no space left to store it
Err(())
}
}
/// Register how many peers do we known are subscribed to this topic.
pub fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count
.get_or_create(topic)
.set(count as u64);
}
}
/* Mesh related methods */
/// Registers the subscription to a topic if the configured limits allow it.
/// Sets the registered number of peers in the mesh to 0.
pub fn joined(&mut self, topic: &TopicHash) {
if self.topic_info.contains_key(topic) || self.topic_info.len() < self.max_topics {
self.topic_info.insert(topic.clone(), true);
let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(1);
debug_assert_eq!(was_subscribed, 0);
self.mesh_peer_counts.get_or_create(topic).set(0);
}
}
/// Registers the unsubscription to a topic if the topic was previously allowed.
/// Sets the registered number of peers in the mesh to 0.
pub fn left(&mut self, topic: &TopicHash) {
if self.topic_info.contains_key(topic) {
// Depending on the configured topic bounds we could miss a mesh topic.
// So, check first if the topic was previously allowed.
let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(0);
debug_assert_eq!(was_subscribed, 1);
self.mesh_peer_counts.get_or_create(topic).set(0);
}
}
/// Register the inclusion of peers in our mesh due to some reason.
pub fn peers_included(&mut self, topic: &TopicHash, reason: Inclusion, count: usize) {
if self.register_topic(topic).is_ok() {
self.mesh_peer_inclusion_events
.get_or_create(&InclusionLabel {
topic: topic.clone(),
reason,
})
.inc_by(count as u64);
}
}
/// Register the removal of peers in our mesh due to some reason.
pub fn peers_removed(&mut self, topic: &TopicHash, reason: Churn, count: usize) {
if self.register_topic(topic).is_ok() {
self.mesh_peer_churn_events
.get_or_create(&ChurnLabel {
topic: topic.clone(),
reason,
})
.inc_by(count as u64);
}
}
/// Register the current number of peers in our mesh for this topic.
pub fn set_mesh_peers(&mut self, topic: &TopicHash, count: usize) {
if self.register_topic(topic).is_ok() {
// Due to limits, this topic could have not been allowed, so we check.
self.mesh_peer_counts.get_or_create(topic).set(count as u64);
}
}
/// Register sending a message over a topic.
pub fn msg_sent(&mut self, topic: &TopicHash, bytes: usize) {
if self.register_topic(topic).is_ok() {
self.topic_msg_sent_counts.get_or_create(topic).inc();
self.topic_msg_sent_bytes
.get_or_create(topic)
.inc_by(bytes as u64);
}
}
}

View File

@ -20,6 +20,7 @@
use crate::rpc_proto;
use base64::encode;
use open_metrics_client::encoding::text::Encode;
use prost::Message;
use sha2::{Digest, Sha256};
use std::fmt;
@ -60,7 +61,7 @@ impl Hasher for Sha256Hash {
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Encode)]
pub struct TopicHash {
/// The topic hash. Stored as a string to align with the protobuf API.
hash: String,