add performance metrics to gossipsub (#2346)

* add performance metrics to gossipsub

* add changelog entry

* Re-export open-client-metrics crate

* Added some extra metrics

* More metrics

* Additional metrics

* Correct topic encoding

* More manual encoding of metric labels

* Clean up metric labelling

* Improve heartbeat duration buckets

* Remove re-export

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
Divma
2021-12-21 17:31:19 -05:00
committed by GitHub
parent 379001a1d0
commit 17861d9cac
7 changed files with 539 additions and 177 deletions

View File

@ -51,7 +51,7 @@ use crate::error::{PublishError, SubscriptionError, ValidationError};
use crate::gossip_promises::GossipPromises;
use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics};
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
@ -635,10 +635,9 @@ where
return Err(PublishError::Duplicate);
}
debug!("Publishing message: {:?}", msg_id);
trace!("Publishing message: {:?}", msg_id);
let topic_hash = raw_message.topic.clone();
let msg_bytes = raw_message.data.len();
// If we are not flood publishing forward the message to mesh peers.
let mesh_peers_sent = !self.config.flood_publish()
@ -732,8 +731,9 @@ where
}
// Send to peers we know are subscribed to the topic.
let msg_bytes = event.encoded_len();
for peer_id in recipient_peers.iter() {
debug!("Sending message to peer: {:?}", peer_id);
trace!("Sending message to peer: {:?}", peer_id);
self.send_message(*peer_id, event.clone())?;
if let Some(m) = self.metrics.as_mut() {
@ -742,6 +742,11 @@ where
}
debug!("Published message: {:?}", &msg_id);
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_published_message(&topic_hash);
}
Ok(msg_id)
}
@ -784,6 +789,11 @@ where
return Ok(false);
}
};
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_msg_validation(&raw_message.topic, &acceptance);
}
self.forward_msg(
msg_id,
raw_message,
@ -797,6 +807,10 @@ where
};
if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_msg_validation(&raw_message.topic, &acceptance);
}
// Tell peer_score about reject
// Reject the original source, and any duplicates we've seen from other peers.
if let Some((peer_score, ..)) = &mut self.peer_score {
@ -960,7 +974,7 @@ where
let fanaout_added = added_peers.len();
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Fanaout, fanaout_added)
m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
}
// check if we need to get more peers, which we randomly select
@ -1191,7 +1205,7 @@ where
trace!("Handling IHAVE for peer: {:?}", peer_id);
// use a hashset to avoid duplicates efficiently
// use a hashmap to avoid duplicates efficiently
let mut iwant_ids = HashMap::new();
for (topic, ids) in ihave_msgs {
@ -1308,35 +1322,26 @@ where
// Send the messages to the peer
let message_list: Vec<_> = cached_messages.into_iter().map(|entry| entry.1).collect();
let mut topic_msgs = HashMap::<TopicHash, Vec<usize>>::default();
if self.metrics.is_some() {
for msg in message_list.iter() {
topic_msgs
.entry(msg.topic.clone())
.or_default()
.push(msg.data.len());
}
}
let topics = message_list
.iter()
.map(|message| message.topic.clone())
.collect::<HashSet<TopicHash>>();
if self
.send_message(
*peer_id,
GossipsubRpc {
subscriptions: Vec::new(),
messages: message_list,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
let message = GossipsubRpc {
subscriptions: Vec::new(),
messages: message_list,
control_msgs: Vec::new(),
}
.into_protobuf();
let msg_bytes = message.encoded_len();
if self.send_message(*peer_id, message).is_err() {
error!("Failed to send cached messages. Messages too large");
} else if let Some(m) = self.metrics.as_mut() {
// Sending of messages succeeded, register them on the internal metrics.
for (topic, msg_bytes_vec) in topic_msgs.into_iter() {
for msg_bytes in msg_bytes_vec {
m.msg_sent(&topic, msg_bytes);
}
for topic in topics.iter() {
m.msg_sent(&topic, msg_bytes);
}
}
}
@ -1391,11 +1396,14 @@ where
{
if backoff_time > now {
warn!(
"GRAFT: peer attempted graft within backoff time, penalizing {}",
"[Penalty] Peer attempted graft within backoff time, penalizing {}",
peer_id
);
// add behavioural penalty
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_score_penalty(Penalty::GraftBackoff);
}
peer_score.add_penalty(peer_id, 1);
// check the flood cutoff
@ -1668,15 +1676,11 @@ where
"Rejecting message from peer {} because of blacklisted source: {}",
propagation_source, source
);
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
RejectReason::BlackListedSource,
);
gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource);
}
self.handle_invalid_message(
propagation_source,
raw_message,
RejectReason::BlackListedSource,
);
return false;
}
}
@ -1702,15 +1706,7 @@ where
"Dropping message {} claiming to be from self but forwarded from {}",
msg_id, propagation_source
);
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
RejectReason::SelfOrigin,
);
gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin);
}
self.handle_invalid_message(propagation_source, raw_message, RejectReason::SelfOrigin);
return false;
}
@ -1725,6 +1721,11 @@ where
mut raw_message: RawGossipsubMessage,
propagation_source: &PeerId,
) {
// Record the received metric
if let Some(metrics) = self.metrics.as_mut() {
metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
}
let fast_message_id = self.config.fast_message_id(&raw_message);
if let Some(fast_message_id) = fast_message_id.as_ref() {
@ -1758,8 +1759,8 @@ where
// Reject the message and return
self.handle_invalid_message(
propagation_source,
raw_message,
ValidationError::TransformFailed,
&raw_message,
RejectReason::ValidationError(ValidationError::TransformFailed),
);
return;
}
@ -1796,6 +1797,11 @@ where
msg_id
);
// Record the received message with the metrics
if let Some(metrics) = self.metrics.as_mut() {
metrics.msg_recvd(&message.topic);
}
// Tells score that message arrived (but is maybe not fully validated yet).
// Consider the message as delivered for gossip promises.
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
@ -1845,19 +1851,28 @@ where
fn handle_invalid_message(
&mut self,
propagation_source: &PeerId,
raw_message: RawGossipsubMessage,
validation_error: ValidationError,
raw_message: &RawGossipsubMessage,
reject_reason: RejectReason,
) {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
let reason = RejectReason::ValidationError(validation_error);
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_invalid_message(&raw_message.topic);
}
let fast_message_id_cache = &self.fast_message_id_cache;
if let Some(msg_id) = self
.config
.fast_message_id(&raw_message)
.fast_message_id(raw_message)
.and_then(|id| fast_message_id_cache.get(&id))
{
peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason);
gossip_promises.reject_message(msg_id, &reason);
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
reject_reason,
);
gossip_promises.reject_message(msg_id, &reject_reason);
} else {
// The message is invalid, we reject it ignoring any gossip promises. If a peer is
// advertising this message via an IHAVE and it's invalid it will be double
@ -2067,6 +2082,9 @@ where
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
for (peer, count) in gossip_promises.get_broken_promises() {
peer_score.add_penalty(&peer, count);
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_score_penalty(Penalty::BrokenPromise);
}
}
}
}
@ -2074,6 +2092,7 @@ where
/// Heartbeat function which shifts the memcache and updates the mesh.
fn heartbeat(&mut self) {
debug!("Starting heartbeat");
let start = Instant::now();
self.heartbeat_ticks += 1;
@ -2098,13 +2117,15 @@ where
}
}
// cache scores throughout the heartbeat
let mut scores = HashMap::new();
let peer_score = &self.peer_score;
let mut score = |p: &PeerId| match peer_score {
Some((peer_score, ..)) => *scores.entry(*p).or_insert_with(|| peer_score.score(p)),
_ => 0.0,
};
// Cache the scores of all connected peers, and record metrics for current penalties.
let mut scores = HashMap::with_capacity(self.connected_peers.len());
if let Some((peer_score, ..)) = &self.peer_score {
for peer_id in self.connected_peers.keys() {
scores
.entry(peer_id)
.or_insert_with(|| peer_score.metric_score(&peer_id, self.metrics.as_mut()));
}
}
// maintain the mesh for each topic
for (topic_hash, peers) in self.mesh.iter_mut() {
@ -2116,35 +2137,35 @@ where
// drop all peers with negative score, without PX
// if there is at some point a stable retain method for BTreeSet the following can be
// written more efficiently with retain.
let to_remove: Vec<_> = peers
.iter()
.filter(|&p| {
if score(p) < 0.0 {
debug!(
"HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
let mut to_remove_peers = Vec::new();
for peer_id in peers.iter() {
let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
// Record the score per mesh
if let Some(metrics) = self.metrics.as_mut() {
metrics.observe_mesh_peers_score(topic_hash, peer_score);
}
if peer_score < 0.0 {
debug!(
"HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
{}]",
p,
score(p),
topic_hash
);
peer_id, peer_score, topic_hash
);
let current_topic = to_prune.entry(*p).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
no_px.insert(*p);
true
} else {
false
}
})
.cloned()
.collect();
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic_hash, Churn::BadScore, to_remove.len())
let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
no_px.insert(*peer_id);
to_remove_peers.push(*peer_id);
}
}
for peer in to_remove {
peers.remove(&peer);
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
}
for peer_id in to_remove_peers {
peers.remove(&peer_id);
}
// too little peers - add some
@ -2166,7 +2187,7 @@ where
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
&& score(peer) >= 0.0
&& *scores.get(peer).unwrap_or(&0.0) >= 0.0
},
);
for peer in &peer_list {
@ -2195,8 +2216,12 @@ where
let mut rng = thread_rng();
let mut shuffled = peers.iter().cloned().collect::<Vec<_>>();
shuffled.shuffle(&mut rng);
shuffled
.sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal));
shuffled.sort_by(|p1, p2| {
let score_p1 = *scores.get(p1).unwrap_or(&0.0);
let score_p2 = *scores.get(p2).unwrap_or(&0.0);
score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
});
// shuffle everything except the last retain_scores many peers (the best ones)
shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
@ -2255,7 +2280,7 @@ where
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
&& score(peer) >= 0.0
&& *scores.get(peer).unwrap_or(&0.0) >= 0.0
&& outbound_peers.contains(peer)
},
);
@ -2288,19 +2313,27 @@ where
// now compute the median peer score in the mesh
let mut peers_by_score: Vec<_> = peers.iter().collect();
peers_by_score
.sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal));
peers_by_score.sort_by(|p1, p2| {
let p1_score = *scores.get(p1).unwrap_or(&0.0);
let p2_score = *scores.get(p2).unwrap_or(&0.0);
p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
});
let middle = peers_by_score.len() / 2;
let median = if peers_by_score.len() % 2 == 0 {
(score(
*peers_by_score.get(middle - 1).expect(
"middle < vector length and middle > 0 since peers.len() > 0",
),
) + score(*peers_by_score.get(middle).expect("middle < vector length")))
* 0.5
let sub_middle_peer = *peers_by_score
.get(middle - 1)
.expect("middle < vector length and middle > 0 since peers.len() > 0");
let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0);
let middle_peer =
*peers_by_score.get(middle).expect("middle < vector length");
let middle_score = *scores.get(middle_peer).unwrap_or(&0.0);
(sub_middle_score + middle_score) * 0.5
} else {
score(*peers_by_score.get(middle).expect("middle < vector length"))
*scores
.get(*peers_by_score.get(middle).expect("middle < vector length"))
.unwrap_or(&0.0)
};
// if the median score is below the threshold, select a better peer (if any) and
@ -2311,11 +2344,11 @@ where
&self.connected_peers,
topic_hash,
self.config.opportunistic_graft_peers(),
|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
&& score(peer) > median
|peer_id| {
!peers.contains(peer_id)
&& !explicit_peers.contains(peer_id)
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
&& *scores.get(peer_id).unwrap_or(&0.0) > median
},
);
for peer in &peer_list {
@ -2367,9 +2400,10 @@ where
};
for peer in peers.iter() {
// is the peer still subscribed to the topic?
let peer_score = *scores.get(peer).unwrap_or(&0.0);
match self.peer_topics.get(peer) {
Some(topics) => {
if !topics.contains(topic_hash) || score(peer) < publish_threshold {
if !topics.contains(topic_hash) || peer_score < publish_threshold {
debug!(
"HEARTBEAT: Peer removed from fanout for topic: {:?}",
topic_hash
@ -2401,10 +2435,10 @@ where
&self.connected_peers,
topic_hash,
needed_peers,
|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& score(peer) < publish_threshold
|peer_id| {
!peers.contains(peer_id)
&& !explicit_peers.contains(peer_id)
&& *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold
},
);
peers.extend(new_peers);
@ -2412,12 +2446,6 @@ where
}
if self.peer_score.is_some() {
trace!("Peer_scores: {:?}", {
for peer in self.peer_topics.keys() {
score(peer);
}
scores
});
trace!("Mesh message deliveries: {:?}", {
self.mesh
.iter()
@ -2429,7 +2457,7 @@ where
.map(|p| {
(
*p,
peer_score
self.peer_score
.as_ref()
.expect("peer_score.is_some()")
.0
@ -2458,6 +2486,10 @@ where
self.mcache.shift();
debug!("Completed Heartbeat");
if let Some(metrics) = self.metrics.as_mut() {
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
metrics.observe_heartbeat_duration(duration);
}
}
/// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
@ -2695,11 +2727,12 @@ where
}
.into_protobuf();
let msg_bytes = event.encoded_len();
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.send_message(*peer, event.clone())?;
if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&message.topic, message.data.len());
m.msg_sent(&message.topic, msg_bytes);
}
}
debug!("Completed forwarding message");
@ -3110,6 +3143,17 @@ where
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(peer_id);
// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
}
self.connected_peers.remove(peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
@ -3257,6 +3301,11 @@ where
match handler_event {
HandlerEvent::PeerKind(kind) => {
// We have identified the protocol this peer is using
if let Some(metrics) = self.metrics.as_mut() {
metrics.peer_protocol_connected(kind.clone());
}
if let PeerKind::NotSupported = kind {
debug!(
"Peer does not support gossipsub protocols. {}",
@ -3304,8 +3353,8 @@ where
for (raw_message, validation_error) in invalid_messages {
self.handle_invalid_message(
&propagation_source,
raw_message,
validation_error,
&raw_message,
RejectReason::ValidationError(validation_error),
)
}
} else {