// Copyright 2020 Sigma Prime Pty Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. //! A set of metrics used to help track and diagnose the network behaviour of the gossipsub //! protocol. use std::collections::HashMap; use prometheus_client::encoding::text::Encode; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::{Family, MetricConstructor}; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::metrics::histogram::{linear_buckets, Histogram}; use prometheus_client::registry::Registry; use crate::topic::TopicHash; use crate::types::{MessageAcceptance, PeerKind}; // Default value that limits for how many topics do we store metrics. const DEFAULT_MAX_TOPICS: usize = 300; // Default value that limits how many topics for which there has never been a subscription do we // store metrics. const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50; pub struct Config { /// This provides an upper bound to the number of mesh topics we create metrics for. It /// prevents unbounded labels being created in the metrics. pub max_topics: usize, /// Mesh topics are controlled by the user via subscriptions whereas non-mesh topics are /// determined by users on the network. This limit permits a fixed amount of topics to allow, /// in-addition to the mesh topics. pub max_never_subscribed_topics: usize, } impl Default for Config { fn default() -> Self { Config { max_topics: DEFAULT_MAX_TOPICS, max_never_subscribed_topics: DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS, } } } /// Whether we have ever been subscribed to this topic. type EverSubscribed = bool; /// A collection of metrics used throughout the Gossipsub behaviour. pub struct Metrics { /* Configuration parameters */ /// Maximum number of topics for which we store metrics. This helps keep the metrics bounded. max_topics: usize, /// Maximum number of topics for which we store metrics, where the topic in not one to which we /// have subscribed at some point. This helps keep the metrics bounded, since these topics come /// from received messages and not explicit application subscriptions. max_never_subscribed_topics: usize, /* Auxiliary variables */ /// Information needed to decide if a topic is allowed or not. topic_info: HashMap, /* Metrics per known topic */ /// Status of our subscription to this topic. This metric allows analyzing other topic metrics /// filtered by our current subscription status. topic_subscription_status: Family, /// Number of peers subscribed to each topic. This allows us to analyze a topic's behaviour /// regardless of our subscription status. topic_peers_count: Family, /// The number of invalid messages received for a given topic. invalid_messages: Family, /// The number of messages accepted by the application (validation result). accepted_messages: Family, /// The number of messages ignored by the application (validation result). ignored_messages: Family, /// The number of messages rejected by the application (validation result). rejected_messages: Family, /* Metrics regarding mesh state */ /// Number of peers in our mesh. This metric should be updated with the count of peers for a /// topic in the mesh regardless of inclusion and churn events. mesh_peer_counts: Family, /// Number of times we include peers in a topic mesh for different reasons. mesh_peer_inclusion_events: Family, /// Number of times we remove peers in a topic mesh for different reasons. mesh_peer_churn_events: Family, /* Metrics regarding messages sent/received */ /// Number of gossip messages sent to each topic. topic_msg_sent_counts: Family, /// Bytes from gossip messages sent to each topic. topic_msg_sent_bytes: Family, /// Number of gossipsub messages published to each topic. topic_msg_published: Family, /// Number of gossipsub messages received on each topic (without filtering duplicates). topic_msg_recv_counts_unfiltered: Family, /// Number of gossipsub messages received on each topic (after filtering duplicates). topic_msg_recv_counts: Family, /// Bytes received from gossip messages for each topic. topic_msg_recv_bytes: Family, /* Metrics related to scoring */ /// Histogram of the scores for each mesh topic. score_per_mesh: Family, /// A counter of the kind of penalties being applied to peers. scoring_penalties: Family, /* General Metrics */ /// Gossipsub supports floodsub, gossipsub v1.0 and gossipsub v1.1. Peers are classified based /// on which protocol they support. This metric keeps track of the number of peers that are /// connected of each type. peers_per_protocol: Family, /// The time it takes to complete one iteration of the heartbeat. heartbeat_duration: Histogram, /* Performance metrics */ /// When the user validates a message, it tries to re propagate it to its mesh peers. If the /// message expires from the memcache before it can be validated, we count this a cache miss /// and it is an indicator that the memcache size should be increased. memcache_misses: Counter, /// The number of times we have decided that an IWANT control message is required for this /// topic. A very high metric might indicate an underperforming network. topic_iwant_msgs: Family, } impl Metrics { pub fn new(registry: &mut Registry, config: Config) -> Self { // Destructure the config to be sure everything is used. let Config { max_topics, max_never_subscribed_topics, } = config; macro_rules! register_family { ($name:expr, $help:expr) => {{ let fam = Family::default(); registry.register($name, $help, Box::new(fam.clone())); fam }}; } let topic_subscription_status = register_family!( "topic_subscription_status", "Subscription status per known topic" ); let topic_peers_count = register_family!( "topic_peers_counts", "Number of peers subscribed to each topic" ); let invalid_messages = register_family!( "invalid_messages_per_topic", "Number of invalid messages received for each topic" ); let accepted_messages = register_family!( "accepted_messages_per_topic", "Number of accepted messages received for each topic" ); let ignored_messages = register_family!( "ignored_messages_per_topic", "Number of ignored messages received for each topic" ); let rejected_messages = register_family!( "accepted_messages_per_topic", "Number of rejected messages received for each topic" ); let mesh_peer_counts = register_family!( "mesh_peer_counts", "Number of peers in each topic in our mesh" ); let mesh_peer_inclusion_events = register_family!( "mesh_peer_inclusion_events", "Number of times a peer gets added to our mesh for different reasons" ); let mesh_peer_churn_events = register_family!( "mesh_peer_churn_events", "Number of times a peer gets removed from our mesh for different reasons" ); let topic_msg_sent_counts = register_family!( "topic_msg_sent_counts", "Number of gossip messages sent to each topic" ); let topic_msg_published = register_family!( "topic_msg_published", "Number of gossip messages published to each topic" ); let topic_msg_sent_bytes = register_family!( "topic_msg_sent_bytes", "Bytes from gossip messages sent to each topic" ); let topic_msg_recv_counts_unfiltered = register_family!( "topic_msg_recv_counts_unfiltered", "Number of gossip messages received on each topic (without duplicates being filtered)" ); let topic_msg_recv_counts = register_family!( "topic_msg_recv_counts", "Number of gossip messages received on each topic (after duplicates have been filtered)" ); let topic_msg_recv_bytes = register_family!( "topic_msg_recv_bytes", "Bytes received from gossip messages for each topic" ); // TODO: Update default variables once a builder pattern is used. let gossip_threshold = -4000.0; let publish_threshold = -8000.0; let greylist_threshold = -16000.0; let histogram_buckets: Vec = vec![ greylist_threshold, publish_threshold, gossip_threshold, gossip_threshold / 2.0, gossip_threshold / 4.0, 0.0, 1.0, 10.0, 100.0, ]; let hist_builder = HistBuilder { buckets: histogram_buckets, }; let score_per_mesh: Family<_, _, HistBuilder> = Family::new_with_constructor(hist_builder); registry.register( "score_per_mesh", "Histogram of scores per mesh topic", Box::new(score_per_mesh.clone()), ); let scoring_penalties = register_family!( "scoring_penalties", "Counter of types of scoring penalties given to peers" ); let peers_per_protocol = register_family!( "peers_per_protocol", "Number of connected peers by protocol type" ); let heartbeat_duration = Histogram::new(linear_buckets(0.0, 50.0, 10)); registry.register( "heartbeat_duration", "Histogram of observed heartbeat durations", Box::new(heartbeat_duration.clone()), ); let topic_iwant_msgs = register_family!( "topic_iwant_msgs", "Number of times we have decided an IWANT is required for this topic" ); let memcache_misses = { let metric = Counter::default(); registry.register( "memcache_misses", "Number of times a message is not found in the duplicate cache when validating", Box::new(metric.clone()), ); metric }; Self { max_topics, max_never_subscribed_topics, topic_info: HashMap::default(), topic_subscription_status, topic_peers_count, invalid_messages, accepted_messages, ignored_messages, rejected_messages, mesh_peer_counts, mesh_peer_inclusion_events, mesh_peer_churn_events, topic_msg_sent_counts, topic_msg_sent_bytes, topic_msg_published, topic_msg_recv_counts_unfiltered, topic_msg_recv_counts, topic_msg_recv_bytes, score_per_mesh, scoring_penalties, peers_per_protocol, heartbeat_duration, memcache_misses, topic_iwant_msgs, } } fn non_subscription_topics_count(&self) -> usize { self.topic_info .values() .filter(|&ever_subscribed| !ever_subscribed) .count() } /// Registers a topic if not already known and if the bounds allow it. fn register_topic(&mut self, topic: &TopicHash) -> Result<(), ()> { if self.topic_info.contains_key(topic) { Ok(()) } else if self.topic_info.len() < self.max_topics && self.non_subscription_topics_count() < self.max_never_subscribed_topics { // This is a topic without an explicit subscription and we register it if we are within // the configured bounds. self.topic_info.entry(topic.clone()).or_insert(false); self.topic_subscription_status.get_or_create(topic).set(0); Ok(()) } else { // We don't know this topic and there is no space left to store it Err(()) } } /// Register how many peers do we known are subscribed to this topic. pub fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) { if self.register_topic(topic).is_ok() { self.topic_peers_count .get_or_create(topic) .set(count as u64); } } /* Mesh related methods */ /// Registers the subscription to a topic if the configured limits allow it. /// Sets the registered number of peers in the mesh to 0. pub fn joined(&mut self, topic: &TopicHash) { if self.topic_info.contains_key(topic) || self.topic_info.len() < self.max_topics { self.topic_info.insert(topic.clone(), true); let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(1); debug_assert_eq!(was_subscribed, 0); self.mesh_peer_counts.get_or_create(topic).set(0); } } /// Registers the unsubscription to a topic if the topic was previously allowed. /// Sets the registered number of peers in the mesh to 0. pub fn left(&mut self, topic: &TopicHash) { if self.topic_info.contains_key(topic) { // Depending on the configured topic bounds we could miss a mesh topic. // So, check first if the topic was previously allowed. let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(0); debug_assert_eq!(was_subscribed, 1); self.mesh_peer_counts.get_or_create(topic).set(0); } } /// Register the inclusion of peers in our mesh due to some reason. pub fn peers_included(&mut self, topic: &TopicHash, reason: Inclusion, count: usize) { if self.register_topic(topic).is_ok() { self.mesh_peer_inclusion_events .get_or_create(&InclusionLabel { hash: topic.to_string(), reason, }) .inc_by(count as u64); } } /// Register the removal of peers in our mesh due to some reason. pub fn peers_removed(&mut self, topic: &TopicHash, reason: Churn, count: usize) { if self.register_topic(topic).is_ok() { self.mesh_peer_churn_events .get_or_create(&ChurnLabel { hash: topic.to_string(), reason, }) .inc_by(count as u64); } } /// Register the current number of peers in our mesh for this topic. pub fn set_mesh_peers(&mut self, topic: &TopicHash, count: usize) { if self.register_topic(topic).is_ok() { // Due to limits, this topic could have not been allowed, so we check. self.mesh_peer_counts.get_or_create(topic).set(count as u64); } } /// Register that an invalid message was received on a specific topic. pub fn register_invalid_message(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { self.invalid_messages.get_or_create(topic).inc(); } } /// Register a score penalty. pub fn register_score_penalty(&mut self, penalty: Penalty) { self.scoring_penalties .get_or_create(&PenaltyLabel { penalty }) .inc(); } /// Registers that a message was published on a specific topic. pub fn register_published_message(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { self.topic_msg_published.get_or_create(topic).inc(); } } /// Register sending a message over a topic. pub fn msg_sent(&mut self, topic: &TopicHash, bytes: usize) { if self.register_topic(topic).is_ok() { self.topic_msg_sent_counts.get_or_create(topic).inc(); self.topic_msg_sent_bytes .get_or_create(topic) .inc_by(bytes as u64); } } /// Register that a message was received (and was not a duplicate). pub fn msg_recvd(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { self.topic_msg_recv_counts.get_or_create(topic).inc(); } } /// Register that a message was received (could have been a duplicate). pub fn msg_recvd_unfiltered(&mut self, topic: &TopicHash, bytes: usize) { if self.register_topic(topic).is_ok() { self.topic_msg_recv_counts_unfiltered .get_or_create(topic) .inc(); self.topic_msg_recv_bytes .get_or_create(topic) .inc_by(bytes as u64); } } pub fn register_msg_validation(&mut self, topic: &TopicHash, validation: &MessageAcceptance) { if self.register_topic(topic).is_ok() { match validation { MessageAcceptance::Accept => self.accepted_messages.get_or_create(topic).inc(), MessageAcceptance::Ignore => self.ignored_messages.get_or_create(topic).inc(), MessageAcceptance::Reject => self.rejected_messages.get_or_create(topic).inc(), }; } } /// Register a memcache miss. pub fn memcache_miss(&mut self) { self.memcache_misses.inc(); } /// Register sending an IWANT msg for this topic. pub fn register_iwant(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { self.topic_iwant_msgs.get_or_create(topic).inc(); } } /// Observes a heartbeat duration. pub fn observe_heartbeat_duration(&mut self, millis: u64) { self.heartbeat_duration.observe(millis as f64); } /// Observe a score of a mesh peer. pub fn observe_mesh_peers_score(&mut self, topic: &TopicHash, score: f64) { if self.register_topic(topic).is_ok() { self.score_per_mesh.get_or_create(topic).observe(score); } } /// Register a new peers connection based on its protocol. pub fn peer_protocol_connected(&mut self, kind: PeerKind) { self.peers_per_protocol .get_or_create(&ProtocolLabel { protocol: kind }) .inc(); } /// Removes a peer from the counter based on its protocol when it disconnects. pub fn peer_protocol_disconnected(&mut self, kind: PeerKind) { let metric = self .peers_per_protocol .get_or_create(&ProtocolLabel { protocol: kind }); if metric.get() == 0 { return; } else { // decrement the counter metric.set(metric.get() - 1); } } } /// Reasons why a peer was included in the mesh. #[derive(PartialEq, Eq, Hash, Encode, Clone)] pub enum Inclusion { /// Peer was a fanaout peer. Fanout, /// Included from random selection. Random, /// Peer subscribed. Subscribed, /// Peer was included to fill the outbound quota. Outbound, } /// Reasons why a peer was removed from the mesh. #[derive(PartialEq, Eq, Hash, Encode, Clone)] pub enum Churn { /// Peer disconnected. Dc, /// Peer had a bad score. BadScore, /// Peer sent a PRUNE. Prune, /// Peer unsubscribed. Unsub, /// Too many peers. Excess, } /// Kinds of reasons a peer's score has been penalized #[derive(PartialEq, Eq, Hash, Encode, Clone)] pub enum Penalty { /// A peer grafted before waiting the back-off time. GraftBackoff, /// A Peer did not respond to an IWANT request in time. BrokenPromise, /// A Peer did not send enough messages as expected. MessageDeficit, /// Too many peers under one IP address. IPColocation, } /// Label for the mesh inclusion event metrics. #[derive(PartialEq, Eq, Hash, Encode, Clone)] struct InclusionLabel { hash: String, reason: Inclusion, } /// Label for the mesh churn event metrics. #[derive(PartialEq, Eq, Hash, Encode, Clone)] struct ChurnLabel { hash: String, reason: Churn, } /// Label for the kinds of protocols peers can connect as. #[derive(PartialEq, Eq, Hash, Encode, Clone)] struct ProtocolLabel { protocol: PeerKind, } /// Label for the kinds of scoring penalties that can occur #[derive(PartialEq, Eq, Hash, Encode, Clone)] struct PenaltyLabel { penalty: Penalty, } #[derive(Clone)] struct HistBuilder { buckets: Vec, } impl MetricConstructor for HistBuilder { fn new_metric(&self) -> Histogram { Histogram::new(self.buckets.clone().into_iter()) } }