diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 7f66d71d..e800d024 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -4,7 +4,11 @@ - Migrate to Rust edition 2021 (see [PR 2339]). +- Improve bandwidth performance by tracking IWANTs and reducing duplicate sends + (see [PR 2327]). + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 +[PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327 # 0.34.0 [2021-11-16] diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index d9373d89..0974466a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -231,7 +231,7 @@ pub struct Gossipsub< /// duplicates from being propagated to the application and on the network. duplicate_cache: DuplicateCache, - /// A set of connected peers, indexed by their [`PeerId`]. tracking both the [`PeerKind`] and + /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and /// the set of [`ConnectionId`]s. connected_peers: HashMap, @@ -291,12 +291,16 @@ pub struct Gossipsub< /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. count_sent_iwant: HashMap, - /// Short term cache for published messsage ids. This is used for penalizing peers sending + /// Keeps track of IWANT messages that we are awaiting to send. + /// This is used to prevent sending duplicate IWANT messages for the same message. + pending_iwant_msgs: HashSet, + + /// Short term cache for published message ids. This is used for penalizing peers sending /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, /// Short term cache for fast message ids mapping them to the real message ids - fast_messsage_id_cache: TimeCache, + fast_message_id_cache: TimeCache, /// The filter used to handle message subscriptions. subscription_filter: F, @@ -421,7 +425,7 @@ where control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()), + fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), peer_topics: HashMap::new(), explicit_peers: HashSet::new(), @@ -445,6 +449,7 @@ where peer_score: None, count_received_ihave: HashMap::new(), count_sent_iwant: HashMap::new(), + pending_iwant_msgs: HashSet::new(), connected_peers: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, @@ -636,8 +641,8 @@ where let msg_bytes = raw_message.data.len(); // If we are not flood publishing forward the message to mesh peers. - let mesh_peers_sent = - !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?; + let mesh_peers_sent = !self.config.flood_publish() + && self.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new())?; let mut recipient_peers = HashSet::new(); if let Some(set) = self.topic_peers.get(&topic_hash) { @@ -767,8 +772,10 @@ where ) -> Result { let reject_reason = match acceptance { MessageAcceptance::Accept => { - let raw_message = match self.mcache.validate(msg_id) { - Some(raw_message) => raw_message.clone(), + let (raw_message, originating_peers) = match self.mcache.validate(msg_id) { + Some((raw_message, originating_peers)) => { + (raw_message.clone(), originating_peers) + } None => { warn!( "Message not in cache. Ignoring forwarding. Message Id: {}", @@ -777,15 +784,21 @@ where return Ok(false); } }; - self.forward_msg(msg_id, raw_message, Some(propagation_source))?; + self.forward_msg( + msg_id, + raw_message, + Some(propagation_source), + originating_peers, + )?; return Ok(true); } MessageAcceptance::Reject => RejectReason::ValidationFailed, MessageAcceptance::Ignore => RejectReason::ValidationIgnored, }; - if let Some(raw_message) = self.mcache.remove(msg_id) { + if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) { // Tell peer_score about reject + // Reject the original source, and any duplicates we've seen from other peers. if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.reject_message( propagation_source, @@ -793,6 +806,9 @@ where &raw_message.topic, reject_reason, ); + for peer in originating_peers.iter() { + peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason); + } } Ok(true) } else { @@ -1173,10 +1189,10 @@ where } } - debug!("Handling IHAVE for peer: {:?}", peer_id); + trace!("Handling IHAVE for peer: {:?}", peer_id); // use a hashset to avoid duplicates efficiently - let mut iwant_ids = HashSet::new(); + let mut iwant_ids = HashMap::new(); for (topic, ids) in ihave_msgs { // only process the message if we are subscribed @@ -1189,9 +1205,16 @@ where } for id in ids { - if !self.duplicate_cache.contains(&id) { - // have not seen this message, request it - iwant_ids.insert(id); + if !self.duplicate_cache.contains(&id) && !self.pending_iwant_msgs.contains(&id) { + if self + .peer_score + .as_ref() + .map(|(_, _, _, promises)| !promises.contains(&id)) + .unwrap_or(true) + { + // have not seen this message and are not currently requesting it + iwant_ids.insert(id, topic.clone()); + } } } } @@ -1211,15 +1234,21 @@ where peer_id ); - //ask in random order - let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect(); + // Ask in random order + let mut iwant_ids_vec: Vec<_> = iwant_ids.keys().collect(); let mut rng = thread_rng(); iwant_ids_vec.partial_shuffle(&mut rng, iask as usize); iwant_ids_vec.truncate(iask as usize); *iasked += iask; - let message_ids = iwant_ids_vec.into_iter().cloned().collect::>(); + let mut message_ids = Vec::new(); + for message_id in iwant_ids_vec { + // Add all messages to the pending list + self.pending_iwant_msgs.insert(message_id.clone()); + message_ids.push(message_id.clone()); + } + if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( *peer_id, @@ -1227,9 +1256,10 @@ where Instant::now() + self.config.iwant_followup_time(), ); } - debug!( + trace!( "IHAVE: Asking for the following messages from {}: {:?}", - peer_id, message_ids + peer_id, + message_ids ); Self::control_pool_add( @@ -1238,7 +1268,7 @@ where GossipsubControlAction::IWant { message_ids }, ); } - debug!("Completed IHAVE handling for peer: {:?}", peer_id); + trace!("Completed IHAVE handling for peer: {:?}", peer_id); } /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is @@ -1696,13 +1726,26 @@ where propagation_source: &PeerId, ) { let fast_message_id = self.config.fast_message_id(&raw_message); + if let Some(fast_message_id) = fast_message_id.as_ref() { - if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) { + if let Some(msg_id) = self.fast_message_id_cache.get(fast_message_id) { let msg_id = msg_id.clone(); - self.message_is_valid(&msg_id, &mut raw_message, propagation_source); - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic); + // Report the duplicate + if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.duplicated_message( + propagation_source, + &msg_id, + &raw_message.topic, + ); + } + // Update the cache, informing that we have received a duplicate from another peer. + // The peers in this cache are used to prevent us forwarding redundant messages onto + // these peers. + self.mcache.observe_duplicate(&msg_id, propagation_source); } + + // This message has been seen previously. Ignore it return; } } @@ -1735,15 +1778,17 @@ where // Add the message to the duplicate caches if let Some(fast_message_id) = fast_message_id { // add id to cache - self.fast_messsage_id_cache + self.fast_message_id_cache .entry(fast_message_id) .or_insert_with(|| msg_id.clone()); } + if !self.duplicate_cache.insert(msg_id.clone()) { debug!("Message already received, ignoring. Message: {}", msg_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.duplicated_message(propagation_source, &msg_id, &message.topic); } + self.mcache.observe_duplicate(&msg_id, propagation_source); return; } debug!( @@ -1782,7 +1827,12 @@ where // forward the message to mesh peers, if no validation is required if !self.config.validate_messages() { if self - .forward_msg(&msg_id, raw_message, Some(propagation_source)) + .forward_msg( + &msg_id, + raw_message, + Some(propagation_source), + HashSet::new(), + ) .is_err() { error!("Failed to forward message. Too large"); @@ -1800,7 +1850,7 @@ where ) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { let reason = RejectReason::ValidationError(validation_error); - let fast_message_id_cache = &self.fast_messsage_id_cache; + let fast_message_id_cache = &self.fast_message_id_cache; if let Some(msg_id) = self .config .fast_message_id(&raw_message) @@ -2593,6 +2643,7 @@ where msg_id: &MessageId, message: RawGossipsubMessage, propagation_source: Option<&PeerId>, + originating_peers: HashSet, ) -> Result { // message is fully validated inform peer_score if let Some((peer_score, ..)) = &mut self.peer_score { @@ -2604,25 +2655,33 @@ where debug!("Forwarding message: {:?}", msg_id); let mut recipient_peers = HashSet::new(); - // add mesh peers - let topic = &message.topic; - // mesh - if let Some(mesh_peers) = self.mesh.get(topic) { - for peer_id in mesh_peers { - if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() { - recipient_peers.insert(*peer_id); + { + // Populate the recipient peers mapping + + // Add explicit peers + for peer_id in &self.explicit_peers { + if let Some(topics) = self.peer_topics.get(peer_id) { + if Some(peer_id) != propagation_source + && !originating_peers.contains(peer_id) + && Some(peer_id) != message.source.as_ref() + && topics.contains(&message.topic) + { + recipient_peers.insert(*peer_id); + } } } - } - // Add explicit peers - for p in &self.explicit_peers { - if let Some(topics) = self.peer_topics.get(p) { - if Some(p) != propagation_source - && Some(p) != message.source.as_ref() - && topics.contains(&message.topic) - { - recipient_peers.insert(*p); + // add mesh peers + let topic = &message.topic; + // mesh + if let Some(mesh_peers) = self.mesh.get(topic) { + for peer_id in mesh_peers { + if Some(peer_id) != propagation_source + && !originating_peers.contains(peer_id) + && Some(peer_id) != message.source.as_ref() + { + recipient_peers.insert(*peer_id); + } } } } @@ -2770,6 +2829,9 @@ where error!("Failed to flush control pool. Message too large"); } } + + // This clears all pending IWANT messages + self.pending_iwant_msgs.clear(); } /// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index b05df666..7e921c3e 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -4659,6 +4659,8 @@ mod tests { #[test] fn test_iwant_penalties() { + let _ = env_logger::try_init(); + let config = GossipsubConfigBuilder::default() .iwant_followup_time(Duration::from_secs(4)) .build() @@ -4666,9 +4668,9 @@ mod tests { let mut peer_score_params = PeerScoreParams::default(); peer_score_params.behaviour_penalty_weight = -1.0; - //fill the mesh + // fill the mesh let (mut gs, peers, topics) = inject_nodes1() - .peer_no(config.mesh_n_high()) + .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) .gs_config(config.clone()) @@ -4677,79 +4679,76 @@ mod tests { .scoring(Some((peer_score_params, PeerScoreThresholds::default()))) .create_network(); - //graft to all peers to really fill the mesh with all the peers + // graft to all peers to really fill the mesh with all the peers for peer in peers { gs.handle_graft(&peer, topics.clone()); } - //add 100 more peers + // add 100 more peers let other_peers: Vec<_> = (0..100) .map(|_| add_peer(&mut gs, &topics, false, false)) .collect(); - //each peer sends us two ihave containing each two message ids + // each peer sends us an ihave containing each two message ids let mut first_messages = Vec::new(); let mut second_messages = Vec::new(); let mut seq = 0; for peer in &other_peers { - for _ in 0..2 { - let msg1 = random_message(&mut seq, &topics); - let msg2 = random_message(&mut seq, &topics); + let msg1 = random_message(&mut seq, &topics); + let msg2 = random_message(&mut seq, &topics); - // Decompress the raw message and calculate the message id. - // Transform the inbound message - let message1 = &gs.data_transform.inbound_transform(msg1.clone()).unwrap(); + // Decompress the raw message and calculate the message id. + // Transform the inbound message + let message1 = &gs.data_transform.inbound_transform(msg1.clone()).unwrap(); - // Transform the inbound message - let message2 = &gs.data_transform.inbound_transform(msg2.clone()).unwrap(); + // Transform the inbound message + let message2 = &gs.data_transform.inbound_transform(msg2.clone()).unwrap(); - first_messages.push(msg1.clone()); - second_messages.push(msg2.clone()); - gs.handle_ihave( - peer, - vec![( - topics[0].clone(), - vec![config.message_id(&message1), config.message_id(&message2)], - )], - ); - } + first_messages.push(msg1.clone()); + second_messages.push(msg2.clone()); + gs.handle_ihave( + peer, + vec![( + topics[0].clone(), + vec![config.message_id(&message1), config.message_id(&message2)], + )], + ); } - //other peers send us all the first message ids in time - for message in first_messages { - gs.handle_received_message(message.clone(), &PeerId::random()); + // the peers send us all the first message ids in time + for (index, peer) in other_peers.iter().enumerate() { + gs.handle_received_message(first_messages[index].clone(), &peer); } - //now we do a heartbeat no penalization should have been applied yet + // now we do a heartbeat no penalization should have been applied yet gs.heartbeat(); for peer in &other_peers { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 0.0); } - //receive the first twenty of the second messages (that are the messages of the first 10 - // peers) - for message in second_messages.iter().take(20) { - gs.handle_received_message(message.clone(), &PeerId::random()); + // receive the first twenty of the other peers then send their response + for (index, peer) in other_peers.iter().enumerate().take(20) { + gs.handle_received_message(second_messages[index].clone(), &peer); } - //sleep for one second + // sleep for the promise duration sleep(Duration::from_secs(4)); - //now we do a heartbeat to apply penalization + // now we do a heartbeat to apply penalization gs.heartbeat(); - //now we get all the second messages - for message in second_messages { - gs.handle_received_message(message.clone(), &PeerId::random()); + // now we get the second messages from the last 80 peers. + for (index, peer) in other_peers.iter().enumerate() { + if index > 19 { + gs.handle_received_message(second_messages[index].clone(), &peer); + } } - //no further penalizations should get applied + // no further penalizations should get applied gs.heartbeat(); - //now randomly some peers got penalized, some may not, and some may got penalized twice - //with very high probability (> 1 - 10^-50) all three cases are present under the 100 peers - //but the first 10 peers should all not got penalized. + // Only the last 80 peers should be penalized for not responding in time let mut not_penalized = 0; let mut single_penalized = 0; let mut double_penalized = 0; @@ -4765,13 +4764,15 @@ mod tests { assert!(i > 9); double_penalized += 1 } else { + println!("{}", peer); + println!("{}", score); assert!(false, "Invalid score of peer") } } - assert!(not_penalized > 10); - assert!(single_penalized > 0); - assert!(double_penalized > 0); + assert_eq!(not_penalized, 20); + assert_eq!(single_penalized, 80); + assert_eq!(double_penalized, 0); } #[test] @@ -5138,7 +5139,7 @@ mod tests { gs.handle_received_message(message.clone(), &PeerId::random()); } - assert!(counters.fast_counter <= 5); + assert_eq!(counters.fast_counter, 5); assert_eq!(counters.slow_counter, 1); } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index c5e90bf0..24a8b0af 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -758,12 +758,12 @@ impl GossipsubConfigBuilder { ); } - if !(self.config.mesh_outbound_min < self.config.mesh_n_low + if !(self.config.mesh_outbound_min <= self.config.mesh_n_low && self.config.mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= self.config.mesh_n_high) { return Err("The following inequality doesn't hold \ - mesh_outbound_min < mesh_n_low <= mesh_n <= mesh_n_high"); + mesh_outbound_min <= mesh_n_low <= mesh_n <= mesh_n_high"); } if self.config.mesh_outbound_min * 2 > self.config.mesh_n { diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index 2904b152..4cd692e2 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -24,12 +24,9 @@ use crate::MessageId; use instant::Instant; use libp2p_core::PeerId; use log::debug; -use rand::seq::SliceRandom; -use rand::thread_rng; use std::collections::HashMap; -/// Tracks recently sent `IWANT` messages and checks if peers respond to them -/// for each `IWANT` message we track one random requested message id. +/// Tracks recently sent `IWANT` messages and checks if peers respond to them. #[derive(Default)] pub(crate) struct GossipPromises { /// Stores for each tracked message id and peer the instant when this promise expires. @@ -40,12 +37,15 @@ pub(crate) struct GossipPromises { } impl GossipPromises { + /// Returns true if the message id exists in the promises. + pub fn contains(&self, message: &MessageId) -> bool { + self.promises.contains_key(message) + } + /// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting. pub fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) { - // Randomly select a message id - let mut rng = thread_rng(); - if let Some(message_id) = messages.choose(&mut rng) { - // If a promise for this message id and peer already exists we don't update expires! + for message_id in messages { + // If a promise for this message id and peer already exists we don't update the expiry! self.promises .entry(message_id.clone()) .or_insert_with(HashMap::new) @@ -86,7 +86,7 @@ impl GossipPromises { let count = result.entry(*peer_id).or_insert(0); *count += 1; debug!( - "The peer {} broke the promise to deliver message {} in time!", + "[Penalty] The peer {} broke the promise to deliver message {} in time!", peer_id, msg ); false diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index d9b903ac..8185f0ea 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -21,9 +21,12 @@ use crate::topic::TopicHash; use crate::types::{MessageId, RawGossipsubMessage}; use libp2p_core::PeerId; -use log::debug; +use log::{debug, trace}; use std::fmt::Debug; -use std::{collections::HashMap, fmt}; +use std::{ + collections::{HashMap, HashSet}, + fmt, +}; /// CacheEntry stored in the history. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -35,12 +38,12 @@ pub struct CacheEntry { /// MessageCache struct holding history of messages. #[derive(Clone)] pub struct MessageCache { - msgs: HashMap, + msgs: HashMap)>, /// For every message and peer the number of times this peer asked for the message iwant_counts: HashMap>, history: Vec>, - /// The number of indices in the cache history used for gossipping. That means that a message - /// won't get gossipped anymore when shift got called `gossip` many times after inserting the + /// The number of indices in the cache history used for gossiping. That means that a message + /// won't get gossiped anymore when shift got called `gossip` many times after inserting the /// message in the cache. gossip: usize, } @@ -68,30 +71,44 @@ impl MessageCache { /// Put a message into the memory cache. /// - /// Returns the message if it already exists. - pub fn put( - &mut self, - message_id: &MessageId, - msg: RawGossipsubMessage, - ) -> Option { - debug!("Put message {:?} in mcache", message_id); + /// Returns true if the message didn't already exist in the cache. + pub fn put(&mut self, message_id: &MessageId, msg: RawGossipsubMessage) -> bool { + trace!("Put message {:?} in mcache", message_id); let cache_entry = CacheEntry { mid: message_id.clone(), topic: msg.topic.clone(), }; - let seen_message = self.msgs.insert(message_id.clone(), msg); - if seen_message.is_none() { + if self + .msgs + .insert(message_id.clone(), (msg, HashSet::new())) + .is_none() + { // Don't add duplicate entries to the cache. self.history[0].push(cache_entry); + return true; + } else { + return false; + } + } + + /// Keeps track of peers we know have received the message to prevent forwarding to said peers. + pub fn observe_duplicate(&mut self, message_id: &MessageId, source: &PeerId) { + if let Some((message, originating_peers)) = self.msgs.get_mut(message_id) { + // if the message is already validated, we don't need to store extra peers sending us + // duplicates as the message has already been forwarded + if message.validated { + return; + } + + originating_peers.insert(*source); } - seen_message } /// Get a message with `message_id` #[cfg(test)] pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> { - self.msgs.get(message_id) + self.msgs.get(message_id).map(|(message, _)| message) } /// Increases the iwant count for the given message by one and returns the message together @@ -102,7 +119,7 @@ impl MessageCache { peer: &PeerId, ) -> Option<(&RawGossipsubMessage, u32)> { let iwant_counts = &mut self.iwant_counts; - self.msgs.get(message_id).and_then(|message| { + self.msgs.get(message_id).and_then(|(message, _)| { if !message.validated { None } else { @@ -120,10 +137,18 @@ impl MessageCache { } /// Gets a message with [`MessageId`] and tags it as validated. - pub fn validate(&mut self, message_id: &MessageId) -> Option<&RawGossipsubMessage> { - self.msgs.get_mut(message_id).map(|message| { + /// This function also returns the known peers that have sent us this message. This is used to + /// prevent us sending redundant messages to peers who have already propagated it. + pub fn validate( + &mut self, + message_id: &MessageId, + ) -> Option<(&RawGossipsubMessage, HashSet)> { + self.msgs.get_mut(message_id).map(|(message, known_peers)| { message.validated = true; - &*message + // Clear the known peers list (after a message is validated, it is forwarded and we no + // longer need to store the originating peers). + let originating_peers = std::mem::replace(known_peers, HashSet::new()); + (&*message, originating_peers) }) } @@ -139,7 +164,7 @@ impl MessageCache { if &entry.topic == topic { let mid = &entry.mid; // Only gossip validated messages - if let Some(true) = self.msgs.get(mid).map(|msg| msg.validated) { + if let Some(true) = self.msgs.get(mid).map(|(msg, _)| msg.validated) { Some(mid.clone()) } else { None @@ -160,7 +185,7 @@ impl MessageCache { /// last entry. pub fn shift(&mut self) { for entry in self.history.pop().expect("history is always > 1") { - if let Some(msg) = self.msgs.remove(&entry.mid) { + if let Some((msg, _)) = self.msgs.remove(&entry.mid) { if !msg.validated { // If GossipsubConfig::validate_messages is true, the implementing // application has to ensure that Gossipsub::validate_message gets called for @@ -171,7 +196,7 @@ impl MessageCache { ); } } - debug!("Remove message from the cache: {}", &entry.mid); + trace!("Remove message from the cache: {}", &entry.mid); self.iwant_counts.remove(&entry.mid); } @@ -181,7 +206,10 @@ impl MessageCache { } /// Removes a message from the cache and returns it if existent - pub fn remove(&mut self, message_id: &MessageId) -> Option { + pub fn remove( + &mut self, + message_id: &MessageId, + ) -> Option<(RawGossipsubMessage, HashSet)> { //We only remove the message from msgs and iwant_count and keep the message_id in the // history vector. Zhe id in the history vector will simply be ignored on popping. diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 09ba48ba..2b21deb6 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -265,7 +265,7 @@ impl PeerScore { let p3 = deficit * deficit; topic_score += p3 * topic_params.mesh_message_deliveries_weight; debug!( - "The peer {} has a mesh message deliveries deficit of {} in topic\ + "[Penalty] The peer {} has a mesh message deliveries deficit of {} in topic\ {} and will get penalized by {}", peer_id, deficit, @@ -314,7 +314,7 @@ impl PeerScore { let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; let p6 = surplus * surplus; debug!( - "The peer {} gets penalized because of too many peers with the ip {}. \ + "[Penalty] The peer {} gets penalized because of too many peers with the ip {}. \ The surplus is {}. ", peer_id, ip, surplus ); @@ -335,7 +335,7 @@ impl PeerScore { pub fn add_penalty(&mut self, peer_id: &PeerId, count: usize) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { debug!( - "Behavioral penalty for peer {}, count = {}.", + "[Penalty] Behavioral penalty for peer {}, count = {}.", peer_id, count ); peer_stats.behaviour_penalty += count as f64; @@ -597,7 +597,7 @@ impl PeerScore { /// Similar to `reject_message` except does not require the message id or reason for an invalid message. pub fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) { debug!( - "Message from {} rejected because of ValidationError or SelfOrigin", + "[Penalty] Message from {} rejected because of ValidationError or SelfOrigin", from ); self.mark_invalid_message_delivery(from, topic_hash); @@ -764,7 +764,7 @@ impl PeerScore { peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) { debug!( - "Peer {} delivered an invalid message in topic {} and gets penalized \ + "[Penalty] Peer {} delivered an invalid message in topic {} and gets penalized \ for it", peer_id, topic_hash ); diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index 768f2fc7..69ec67d3 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -179,6 +179,10 @@ where pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.element) } + + pub fn get_mut(&mut self, key: &Key) -> Option<&mut Value> { + self.map.get_mut(key).map(|e| &mut e.element) + } } pub struct DuplicateCache(TimeCache);