mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 18:41:22 +00:00
protocols/gossipsub: Improve bandwidth (#2327)
This PR adds some bandwidth improvements to gossipsub. After a bit of inspection on live networks a number of improvements have been made that can help reduce unnecessary bandwidth on gossipsub networks. This PR introduces the following: - A 1:1 tracking of all in-flight IWANT requests. This not only ensures that all IWANT requests are answered and peers penalized accordingly, but gossipsub will no no longer create multiple IWANT requests for multiple peers. Previously, gossipsub sampled the in-flight IWANT requests in order to penalize peers for not responding with a high probability that we would detect non-responsive nodes. Futher, it was possible to re-request IWANT messages that are already being requested causing added duplication in messages and wasted unnecessary IWANT control messages. This PR shifts this logic to only request message ids that we are not currently requesting from peers. - Triangle routing naturally gives rise to unnecessary duplicates. Consider a mesh of 4 peers that are interconnected. Peer 1 sends a new message to 2,3,4. 2 propagates to 3,4 and 3 propagates to 2,4 and 4 propagates to 2,3. In this case 3 has received the message 3 times. If we keep track of peers that send us messages, when publishing or forwarding we no longer send to peers that have sent us a duplicate, we can eliminate one of the sends in the scenario above. This only occurs when message validation is async however. This PR adds this logic to remove some elements of triangle-routing duplicates. Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com> Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: Diva M <divma@protonmail.com>
This commit is contained in:
@ -4,7 +4,11 @@
|
||||
|
||||
- Migrate to Rust edition 2021 (see [PR 2339]).
|
||||
|
||||
- Improve bandwidth performance by tracking IWANTs and reducing duplicate sends
|
||||
(see [PR 2327]).
|
||||
|
||||
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
|
||||
[PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327
|
||||
|
||||
# 0.34.0 [2021-11-16]
|
||||
|
||||
|
@ -231,7 +231,7 @@ pub struct Gossipsub<
|
||||
/// duplicates from being propagated to the application and on the network.
|
||||
duplicate_cache: DuplicateCache<MessageId>,
|
||||
|
||||
/// A set of connected peers, indexed by their [`PeerId`]. tracking both the [`PeerKind`] and
|
||||
/// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
|
||||
/// the set of [`ConnectionId`]s.
|
||||
connected_peers: HashMap<PeerId, PeerConnections>,
|
||||
|
||||
@ -291,12 +291,16 @@ pub struct Gossipsub<
|
||||
/// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
|
||||
count_sent_iwant: HashMap<PeerId, usize>,
|
||||
|
||||
/// Short term cache for published messsage ids. This is used for penalizing peers sending
|
||||
/// Keeps track of IWANT messages that we are awaiting to send.
|
||||
/// This is used to prevent sending duplicate IWANT messages for the same message.
|
||||
pending_iwant_msgs: HashSet<MessageId>,
|
||||
|
||||
/// Short term cache for published message ids. This is used for penalizing peers sending
|
||||
/// our own messages back if the messages are anonymous or use a random author.
|
||||
published_message_ids: DuplicateCache<MessageId>,
|
||||
|
||||
/// Short term cache for fast message ids mapping them to the real message ids
|
||||
fast_messsage_id_cache: TimeCache<FastMessageId, MessageId>,
|
||||
fast_message_id_cache: TimeCache<FastMessageId, MessageId>,
|
||||
|
||||
/// The filter used to handle message subscriptions.
|
||||
subscription_filter: F,
|
||||
@ -421,7 +425,7 @@ where
|
||||
control_pool: HashMap::new(),
|
||||
publish_config: privacy.into(),
|
||||
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
|
||||
fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()),
|
||||
fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()),
|
||||
topic_peers: HashMap::new(),
|
||||
peer_topics: HashMap::new(),
|
||||
explicit_peers: HashSet::new(),
|
||||
@ -445,6 +449,7 @@ where
|
||||
peer_score: None,
|
||||
count_received_ihave: HashMap::new(),
|
||||
count_sent_iwant: HashMap::new(),
|
||||
pending_iwant_msgs: HashSet::new(),
|
||||
connected_peers: HashMap::new(),
|
||||
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
|
||||
config,
|
||||
@ -636,8 +641,8 @@ where
|
||||
let msg_bytes = raw_message.data.len();
|
||||
|
||||
// If we are not flood publishing forward the message to mesh peers.
|
||||
let mesh_peers_sent =
|
||||
!self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?;
|
||||
let mesh_peers_sent = !self.config.flood_publish()
|
||||
&& self.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new())?;
|
||||
|
||||
let mut recipient_peers = HashSet::new();
|
||||
if let Some(set) = self.topic_peers.get(&topic_hash) {
|
||||
@ -767,8 +772,10 @@ where
|
||||
) -> Result<bool, PublishError> {
|
||||
let reject_reason = match acceptance {
|
||||
MessageAcceptance::Accept => {
|
||||
let raw_message = match self.mcache.validate(msg_id) {
|
||||
Some(raw_message) => raw_message.clone(),
|
||||
let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
|
||||
Some((raw_message, originating_peers)) => {
|
||||
(raw_message.clone(), originating_peers)
|
||||
}
|
||||
None => {
|
||||
warn!(
|
||||
"Message not in cache. Ignoring forwarding. Message Id: {}",
|
||||
@ -777,15 +784,21 @@ where
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
self.forward_msg(msg_id, raw_message, Some(propagation_source))?;
|
||||
self.forward_msg(
|
||||
msg_id,
|
||||
raw_message,
|
||||
Some(propagation_source),
|
||||
originating_peers,
|
||||
)?;
|
||||
return Ok(true);
|
||||
}
|
||||
MessageAcceptance::Reject => RejectReason::ValidationFailed,
|
||||
MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
|
||||
};
|
||||
|
||||
if let Some(raw_message) = self.mcache.remove(msg_id) {
|
||||
if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
|
||||
// Tell peer_score about reject
|
||||
// Reject the original source, and any duplicates we've seen from other peers.
|
||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||
peer_score.reject_message(
|
||||
propagation_source,
|
||||
@ -793,6 +806,9 @@ where
|
||||
&raw_message.topic,
|
||||
reject_reason,
|
||||
);
|
||||
for peer in originating_peers.iter() {
|
||||
peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
} else {
|
||||
@ -1173,10 +1189,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Handling IHAVE for peer: {:?}", peer_id);
|
||||
trace!("Handling IHAVE for peer: {:?}", peer_id);
|
||||
|
||||
// use a hashset to avoid duplicates efficiently
|
||||
let mut iwant_ids = HashSet::new();
|
||||
let mut iwant_ids = HashMap::new();
|
||||
|
||||
for (topic, ids) in ihave_msgs {
|
||||
// only process the message if we are subscribed
|
||||
@ -1189,9 +1205,16 @@ where
|
||||
}
|
||||
|
||||
for id in ids {
|
||||
if !self.duplicate_cache.contains(&id) {
|
||||
// have not seen this message, request it
|
||||
iwant_ids.insert(id);
|
||||
if !self.duplicate_cache.contains(&id) && !self.pending_iwant_msgs.contains(&id) {
|
||||
if self
|
||||
.peer_score
|
||||
.as_ref()
|
||||
.map(|(_, _, _, promises)| !promises.contains(&id))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
// have not seen this message and are not currently requesting it
|
||||
iwant_ids.insert(id, topic.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1211,15 +1234,21 @@ where
|
||||
peer_id
|
||||
);
|
||||
|
||||
//ask in random order
|
||||
let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect();
|
||||
// Ask in random order
|
||||
let mut iwant_ids_vec: Vec<_> = iwant_ids.keys().collect();
|
||||
let mut rng = thread_rng();
|
||||
iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
|
||||
|
||||
iwant_ids_vec.truncate(iask as usize);
|
||||
*iasked += iask;
|
||||
|
||||
let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
|
||||
let mut message_ids = Vec::new();
|
||||
for message_id in iwant_ids_vec {
|
||||
// Add all messages to the pending list
|
||||
self.pending_iwant_msgs.insert(message_id.clone());
|
||||
message_ids.push(message_id.clone());
|
||||
}
|
||||
|
||||
if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
|
||||
gossip_promises.add_promise(
|
||||
*peer_id,
|
||||
@ -1227,9 +1256,10 @@ where
|
||||
Instant::now() + self.config.iwant_followup_time(),
|
||||
);
|
||||
}
|
||||
debug!(
|
||||
trace!(
|
||||
"IHAVE: Asking for the following messages from {}: {:?}",
|
||||
peer_id, message_ids
|
||||
peer_id,
|
||||
message_ids
|
||||
);
|
||||
|
||||
Self::control_pool_add(
|
||||
@ -1238,7 +1268,7 @@ where
|
||||
GossipsubControlAction::IWant { message_ids },
|
||||
);
|
||||
}
|
||||
debug!("Completed IHAVE handling for peer: {:?}", peer_id);
|
||||
trace!("Completed IHAVE handling for peer: {:?}", peer_id);
|
||||
}
|
||||
|
||||
/// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
|
||||
@ -1696,13 +1726,26 @@ where
|
||||
propagation_source: &PeerId,
|
||||
) {
|
||||
let fast_message_id = self.config.fast_message_id(&raw_message);
|
||||
|
||||
if let Some(fast_message_id) = fast_message_id.as_ref() {
|
||||
if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) {
|
||||
if let Some(msg_id) = self.fast_message_id_cache.get(fast_message_id) {
|
||||
let msg_id = msg_id.clone();
|
||||
self.message_is_valid(&msg_id, &mut raw_message, propagation_source);
|
||||
// Report the duplicate
|
||||
if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
|
||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||
peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic);
|
||||
peer_score.duplicated_message(
|
||||
propagation_source,
|
||||
&msg_id,
|
||||
&raw_message.topic,
|
||||
);
|
||||
}
|
||||
// Update the cache, informing that we have received a duplicate from another peer.
|
||||
// The peers in this cache are used to prevent us forwarding redundant messages onto
|
||||
// these peers.
|
||||
self.mcache.observe_duplicate(&msg_id, propagation_source);
|
||||
}
|
||||
|
||||
// This message has been seen previously. Ignore it
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -1735,15 +1778,17 @@ where
|
||||
// Add the message to the duplicate caches
|
||||
if let Some(fast_message_id) = fast_message_id {
|
||||
// add id to cache
|
||||
self.fast_messsage_id_cache
|
||||
self.fast_message_id_cache
|
||||
.entry(fast_message_id)
|
||||
.or_insert_with(|| msg_id.clone());
|
||||
}
|
||||
|
||||
if !self.duplicate_cache.insert(msg_id.clone()) {
|
||||
debug!("Message already received, ignoring. Message: {}", msg_id);
|
||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||
peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
|
||||
}
|
||||
self.mcache.observe_duplicate(&msg_id, propagation_source);
|
||||
return;
|
||||
}
|
||||
debug!(
|
||||
@ -1782,7 +1827,12 @@ where
|
||||
// forward the message to mesh peers, if no validation is required
|
||||
if !self.config.validate_messages() {
|
||||
if self
|
||||
.forward_msg(&msg_id, raw_message, Some(propagation_source))
|
||||
.forward_msg(
|
||||
&msg_id,
|
||||
raw_message,
|
||||
Some(propagation_source),
|
||||
HashSet::new(),
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
error!("Failed to forward message. Too large");
|
||||
@ -1800,7 +1850,7 @@ where
|
||||
) {
|
||||
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
|
||||
let reason = RejectReason::ValidationError(validation_error);
|
||||
let fast_message_id_cache = &self.fast_messsage_id_cache;
|
||||
let fast_message_id_cache = &self.fast_message_id_cache;
|
||||
if let Some(msg_id) = self
|
||||
.config
|
||||
.fast_message_id(&raw_message)
|
||||
@ -2593,6 +2643,7 @@ where
|
||||
msg_id: &MessageId,
|
||||
message: RawGossipsubMessage,
|
||||
propagation_source: Option<&PeerId>,
|
||||
originating_peers: HashSet<PeerId>,
|
||||
) -> Result<bool, PublishError> {
|
||||
// message is fully validated inform peer_score
|
||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||
@ -2604,25 +2655,33 @@ where
|
||||
debug!("Forwarding message: {:?}", msg_id);
|
||||
let mut recipient_peers = HashSet::new();
|
||||
|
||||
// add mesh peers
|
||||
let topic = &message.topic;
|
||||
// mesh
|
||||
if let Some(mesh_peers) = self.mesh.get(topic) {
|
||||
for peer_id in mesh_peers {
|
||||
if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
|
||||
{
|
||||
// Populate the recipient peers mapping
|
||||
|
||||
// Add explicit peers
|
||||
for peer_id in &self.explicit_peers {
|
||||
if let Some(topics) = self.peer_topics.get(peer_id) {
|
||||
if Some(peer_id) != propagation_source
|
||||
&& !originating_peers.contains(peer_id)
|
||||
&& Some(peer_id) != message.source.as_ref()
|
||||
&& topics.contains(&message.topic)
|
||||
{
|
||||
recipient_peers.insert(*peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add explicit peers
|
||||
for p in &self.explicit_peers {
|
||||
if let Some(topics) = self.peer_topics.get(p) {
|
||||
if Some(p) != propagation_source
|
||||
&& Some(p) != message.source.as_ref()
|
||||
&& topics.contains(&message.topic)
|
||||
// add mesh peers
|
||||
let topic = &message.topic;
|
||||
// mesh
|
||||
if let Some(mesh_peers) = self.mesh.get(topic) {
|
||||
for peer_id in mesh_peers {
|
||||
if Some(peer_id) != propagation_source
|
||||
&& !originating_peers.contains(peer_id)
|
||||
&& Some(peer_id) != message.source.as_ref()
|
||||
{
|
||||
recipient_peers.insert(*p);
|
||||
recipient_peers.insert(*peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2770,6 +2829,9 @@ where
|
||||
error!("Failed to flush control pool. Message too large");
|
||||
}
|
||||
}
|
||||
|
||||
// This clears all pending IWANT messages
|
||||
self.pending_iwant_msgs.clear();
|
||||
}
|
||||
|
||||
/// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it
|
||||
|
@ -4659,6 +4659,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_iwant_penalties() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let config = GossipsubConfigBuilder::default()
|
||||
.iwant_followup_time(Duration::from_secs(4))
|
||||
.build()
|
||||
@ -4668,7 +4670,7 @@ mod tests {
|
||||
|
||||
// fill the mesh
|
||||
let (mut gs, peers, topics) = inject_nodes1()
|
||||
.peer_no(config.mesh_n_high())
|
||||
.peer_no(2)
|
||||
.topics(vec!["test".into()])
|
||||
.to_subscribe(false)
|
||||
.gs_config(config.clone())
|
||||
@ -4687,12 +4689,11 @@ mod tests {
|
||||
.map(|_| add_peer(&mut gs, &topics, false, false))
|
||||
.collect();
|
||||
|
||||
//each peer sends us two ihave containing each two message ids
|
||||
// each peer sends us an ihave containing each two message ids
|
||||
let mut first_messages = Vec::new();
|
||||
let mut second_messages = Vec::new();
|
||||
let mut seq = 0;
|
||||
for peer in &other_peers {
|
||||
for _ in 0..2 {
|
||||
let msg1 = random_message(&mut seq, &topics);
|
||||
let msg2 = random_message(&mut seq, &topics);
|
||||
|
||||
@ -4713,11 +4714,10 @@ mod tests {
|
||||
)],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
//other peers send us all the first message ids in time
|
||||
for message in first_messages {
|
||||
gs.handle_received_message(message.clone(), &PeerId::random());
|
||||
// the peers send us all the first message ids in time
|
||||
for (index, peer) in other_peers.iter().enumerate() {
|
||||
gs.handle_received_message(first_messages[index].clone(), &peer);
|
||||
}
|
||||
|
||||
// now we do a heartbeat no penalization should have been applied yet
|
||||
@ -4727,29 +4727,28 @@ mod tests {
|
||||
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 0.0);
|
||||
}
|
||||
|
||||
//receive the first twenty of the second messages (that are the messages of the first 10
|
||||
// peers)
|
||||
for message in second_messages.iter().take(20) {
|
||||
gs.handle_received_message(message.clone(), &PeerId::random());
|
||||
// receive the first twenty of the other peers then send their response
|
||||
for (index, peer) in other_peers.iter().enumerate().take(20) {
|
||||
gs.handle_received_message(second_messages[index].clone(), &peer);
|
||||
}
|
||||
|
||||
//sleep for one second
|
||||
// sleep for the promise duration
|
||||
sleep(Duration::from_secs(4));
|
||||
|
||||
// now we do a heartbeat to apply penalization
|
||||
gs.heartbeat();
|
||||
|
||||
//now we get all the second messages
|
||||
for message in second_messages {
|
||||
gs.handle_received_message(message.clone(), &PeerId::random());
|
||||
// now we get the second messages from the last 80 peers.
|
||||
for (index, peer) in other_peers.iter().enumerate() {
|
||||
if index > 19 {
|
||||
gs.handle_received_message(second_messages[index].clone(), &peer);
|
||||
}
|
||||
}
|
||||
|
||||
// no further penalizations should get applied
|
||||
gs.heartbeat();
|
||||
|
||||
//now randomly some peers got penalized, some may not, and some may got penalized twice
|
||||
//with very high probability (> 1 - 10^-50) all three cases are present under the 100 peers
|
||||
//but the first 10 peers should all not got penalized.
|
||||
// Only the last 80 peers should be penalized for not responding in time
|
||||
let mut not_penalized = 0;
|
||||
let mut single_penalized = 0;
|
||||
let mut double_penalized = 0;
|
||||
@ -4765,13 +4764,15 @@ mod tests {
|
||||
assert!(i > 9);
|
||||
double_penalized += 1
|
||||
} else {
|
||||
println!("{}", peer);
|
||||
println!("{}", score);
|
||||
assert!(false, "Invalid score of peer")
|
||||
}
|
||||
}
|
||||
|
||||
assert!(not_penalized > 10);
|
||||
assert!(single_penalized > 0);
|
||||
assert!(double_penalized > 0);
|
||||
assert_eq!(not_penalized, 20);
|
||||
assert_eq!(single_penalized, 80);
|
||||
assert_eq!(double_penalized, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -5138,7 +5139,7 @@ mod tests {
|
||||
gs.handle_received_message(message.clone(), &PeerId::random());
|
||||
}
|
||||
|
||||
assert!(counters.fast_counter <= 5);
|
||||
assert_eq!(counters.fast_counter, 5);
|
||||
assert_eq!(counters.slow_counter, 1);
|
||||
}
|
||||
|
||||
|
@ -758,12 +758,12 @@ impl GossipsubConfigBuilder {
|
||||
);
|
||||
}
|
||||
|
||||
if !(self.config.mesh_outbound_min < self.config.mesh_n_low
|
||||
if !(self.config.mesh_outbound_min <= self.config.mesh_n_low
|
||||
&& self.config.mesh_n_low <= self.config.mesh_n
|
||||
&& self.config.mesh_n <= self.config.mesh_n_high)
|
||||
{
|
||||
return Err("The following inequality doesn't hold \
|
||||
mesh_outbound_min < mesh_n_low <= mesh_n <= mesh_n_high");
|
||||
mesh_outbound_min <= mesh_n_low <= mesh_n <= mesh_n_high");
|
||||
}
|
||||
|
||||
if self.config.mesh_outbound_min * 2 > self.config.mesh_n {
|
||||
|
@ -24,12 +24,9 @@ use crate::MessageId;
|
||||
use instant::Instant;
|
||||
use libp2p_core::PeerId;
|
||||
use log::debug;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Tracks recently sent `IWANT` messages and checks if peers respond to them
|
||||
/// for each `IWANT` message we track one random requested message id.
|
||||
/// Tracks recently sent `IWANT` messages and checks if peers respond to them.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct GossipPromises {
|
||||
/// Stores for each tracked message id and peer the instant when this promise expires.
|
||||
@ -40,12 +37,15 @@ pub(crate) struct GossipPromises {
|
||||
}
|
||||
|
||||
impl GossipPromises {
|
||||
/// Returns true if the message id exists in the promises.
|
||||
pub fn contains(&self, message: &MessageId) -> bool {
|
||||
self.promises.contains_key(message)
|
||||
}
|
||||
|
||||
/// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting.
|
||||
pub fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) {
|
||||
// Randomly select a message id
|
||||
let mut rng = thread_rng();
|
||||
if let Some(message_id) = messages.choose(&mut rng) {
|
||||
// If a promise for this message id and peer already exists we don't update expires!
|
||||
for message_id in messages {
|
||||
// If a promise for this message id and peer already exists we don't update the expiry!
|
||||
self.promises
|
||||
.entry(message_id.clone())
|
||||
.or_insert_with(HashMap::new)
|
||||
@ -86,7 +86,7 @@ impl GossipPromises {
|
||||
let count = result.entry(*peer_id).or_insert(0);
|
||||
*count += 1;
|
||||
debug!(
|
||||
"The peer {} broke the promise to deliver message {} in time!",
|
||||
"[Penalty] The peer {} broke the promise to deliver message {} in time!",
|
||||
peer_id, msg
|
||||
);
|
||||
false
|
||||
|
@ -21,9 +21,12 @@
|
||||
use crate::topic::TopicHash;
|
||||
use crate::types::{MessageId, RawGossipsubMessage};
|
||||
use libp2p_core::PeerId;
|
||||
use log::debug;
|
||||
use log::{debug, trace};
|
||||
use std::fmt::Debug;
|
||||
use std::{collections::HashMap, fmt};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fmt,
|
||||
};
|
||||
|
||||
/// CacheEntry stored in the history.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
@ -35,12 +38,12 @@ pub struct CacheEntry {
|
||||
/// MessageCache struct holding history of messages.
|
||||
#[derive(Clone)]
|
||||
pub struct MessageCache {
|
||||
msgs: HashMap<MessageId, RawGossipsubMessage>,
|
||||
msgs: HashMap<MessageId, (RawGossipsubMessage, HashSet<PeerId>)>,
|
||||
/// For every message and peer the number of times this peer asked for the message
|
||||
iwant_counts: HashMap<MessageId, HashMap<PeerId, u32>>,
|
||||
history: Vec<Vec<CacheEntry>>,
|
||||
/// The number of indices in the cache history used for gossipping. That means that a message
|
||||
/// won't get gossipped anymore when shift got called `gossip` many times after inserting the
|
||||
/// The number of indices in the cache history used for gossiping. That means that a message
|
||||
/// won't get gossiped anymore when shift got called `gossip` many times after inserting the
|
||||
/// message in the cache.
|
||||
gossip: usize,
|
||||
}
|
||||
@ -68,30 +71,44 @@ impl MessageCache {
|
||||
|
||||
/// Put a message into the memory cache.
|
||||
///
|
||||
/// Returns the message if it already exists.
|
||||
pub fn put(
|
||||
&mut self,
|
||||
message_id: &MessageId,
|
||||
msg: RawGossipsubMessage,
|
||||
) -> Option<RawGossipsubMessage> {
|
||||
debug!("Put message {:?} in mcache", message_id);
|
||||
/// Returns true if the message didn't already exist in the cache.
|
||||
pub fn put(&mut self, message_id: &MessageId, msg: RawGossipsubMessage) -> bool {
|
||||
trace!("Put message {:?} in mcache", message_id);
|
||||
let cache_entry = CacheEntry {
|
||||
mid: message_id.clone(),
|
||||
topic: msg.topic.clone(),
|
||||
};
|
||||
|
||||
let seen_message = self.msgs.insert(message_id.clone(), msg);
|
||||
if seen_message.is_none() {
|
||||
if self
|
||||
.msgs
|
||||
.insert(message_id.clone(), (msg, HashSet::new()))
|
||||
.is_none()
|
||||
{
|
||||
// Don't add duplicate entries to the cache.
|
||||
self.history[0].push(cache_entry);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Keeps track of peers we know have received the message to prevent forwarding to said peers.
|
||||
pub fn observe_duplicate(&mut self, message_id: &MessageId, source: &PeerId) {
|
||||
if let Some((message, originating_peers)) = self.msgs.get_mut(message_id) {
|
||||
// if the message is already validated, we don't need to store extra peers sending us
|
||||
// duplicates as the message has already been forwarded
|
||||
if message.validated {
|
||||
return;
|
||||
}
|
||||
|
||||
originating_peers.insert(*source);
|
||||
}
|
||||
seen_message
|
||||
}
|
||||
|
||||
/// Get a message with `message_id`
|
||||
#[cfg(test)]
|
||||
pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> {
|
||||
self.msgs.get(message_id)
|
||||
self.msgs.get(message_id).map(|(message, _)| message)
|
||||
}
|
||||
|
||||
/// Increases the iwant count for the given message by one and returns the message together
|
||||
@ -102,7 +119,7 @@ impl MessageCache {
|
||||
peer: &PeerId,
|
||||
) -> Option<(&RawGossipsubMessage, u32)> {
|
||||
let iwant_counts = &mut self.iwant_counts;
|
||||
self.msgs.get(message_id).and_then(|message| {
|
||||
self.msgs.get(message_id).and_then(|(message, _)| {
|
||||
if !message.validated {
|
||||
None
|
||||
} else {
|
||||
@ -120,10 +137,18 @@ impl MessageCache {
|
||||
}
|
||||
|
||||
/// Gets a message with [`MessageId`] and tags it as validated.
|
||||
pub fn validate(&mut self, message_id: &MessageId) -> Option<&RawGossipsubMessage> {
|
||||
self.msgs.get_mut(message_id).map(|message| {
|
||||
/// This function also returns the known peers that have sent us this message. This is used to
|
||||
/// prevent us sending redundant messages to peers who have already propagated it.
|
||||
pub fn validate(
|
||||
&mut self,
|
||||
message_id: &MessageId,
|
||||
) -> Option<(&RawGossipsubMessage, HashSet<PeerId>)> {
|
||||
self.msgs.get_mut(message_id).map(|(message, known_peers)| {
|
||||
message.validated = true;
|
||||
&*message
|
||||
// Clear the known peers list (after a message is validated, it is forwarded and we no
|
||||
// longer need to store the originating peers).
|
||||
let originating_peers = std::mem::replace(known_peers, HashSet::new());
|
||||
(&*message, originating_peers)
|
||||
})
|
||||
}
|
||||
|
||||
@ -139,7 +164,7 @@ impl MessageCache {
|
||||
if &entry.topic == topic {
|
||||
let mid = &entry.mid;
|
||||
// Only gossip validated messages
|
||||
if let Some(true) = self.msgs.get(mid).map(|msg| msg.validated) {
|
||||
if let Some(true) = self.msgs.get(mid).map(|(msg, _)| msg.validated) {
|
||||
Some(mid.clone())
|
||||
} else {
|
||||
None
|
||||
@ -160,7 +185,7 @@ impl MessageCache {
|
||||
/// last entry.
|
||||
pub fn shift(&mut self) {
|
||||
for entry in self.history.pop().expect("history is always > 1") {
|
||||
if let Some(msg) = self.msgs.remove(&entry.mid) {
|
||||
if let Some((msg, _)) = self.msgs.remove(&entry.mid) {
|
||||
if !msg.validated {
|
||||
// If GossipsubConfig::validate_messages is true, the implementing
|
||||
// application has to ensure that Gossipsub::validate_message gets called for
|
||||
@ -171,7 +196,7 @@ impl MessageCache {
|
||||
);
|
||||
}
|
||||
}
|
||||
debug!("Remove message from the cache: {}", &entry.mid);
|
||||
trace!("Remove message from the cache: {}", &entry.mid);
|
||||
|
||||
self.iwant_counts.remove(&entry.mid);
|
||||
}
|
||||
@ -181,7 +206,10 @@ impl MessageCache {
|
||||
}
|
||||
|
||||
/// Removes a message from the cache and returns it if existent
|
||||
pub fn remove(&mut self, message_id: &MessageId) -> Option<RawGossipsubMessage> {
|
||||
pub fn remove(
|
||||
&mut self,
|
||||
message_id: &MessageId,
|
||||
) -> Option<(RawGossipsubMessage, HashSet<PeerId>)> {
|
||||
//We only remove the message from msgs and iwant_count and keep the message_id in the
|
||||
// history vector. Zhe id in the history vector will simply be ignored on popping.
|
||||
|
||||
|
@ -265,7 +265,7 @@ impl PeerScore {
|
||||
let p3 = deficit * deficit;
|
||||
topic_score += p3 * topic_params.mesh_message_deliveries_weight;
|
||||
debug!(
|
||||
"The peer {} has a mesh message deliveries deficit of {} in topic\
|
||||
"[Penalty] The peer {} has a mesh message deliveries deficit of {} in topic\
|
||||
{} and will get penalized by {}",
|
||||
peer_id,
|
||||
deficit,
|
||||
@ -314,7 +314,7 @@ impl PeerScore {
|
||||
let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
|
||||
let p6 = surplus * surplus;
|
||||
debug!(
|
||||
"The peer {} gets penalized because of too many peers with the ip {}. \
|
||||
"[Penalty] The peer {} gets penalized because of too many peers with the ip {}. \
|
||||
The surplus is {}. ",
|
||||
peer_id, ip, surplus
|
||||
);
|
||||
@ -335,7 +335,7 @@ impl PeerScore {
|
||||
pub fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
|
||||
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
|
||||
debug!(
|
||||
"Behavioral penalty for peer {}, count = {}.",
|
||||
"[Penalty] Behavioral penalty for peer {}, count = {}.",
|
||||
peer_id, count
|
||||
);
|
||||
peer_stats.behaviour_penalty += count as f64;
|
||||
@ -597,7 +597,7 @@ impl PeerScore {
|
||||
/// Similar to `reject_message` except does not require the message id or reason for an invalid message.
|
||||
pub fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
|
||||
debug!(
|
||||
"Message from {} rejected because of ValidationError or SelfOrigin",
|
||||
"[Penalty] Message from {} rejected because of ValidationError or SelfOrigin",
|
||||
from
|
||||
);
|
||||
self.mark_invalid_message_delivery(from, topic_hash);
|
||||
@ -764,7 +764,7 @@ impl PeerScore {
|
||||
peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
|
||||
{
|
||||
debug!(
|
||||
"Peer {} delivered an invalid message in topic {} and gets penalized \
|
||||
"[Penalty] Peer {} delivered an invalid message in topic {} and gets penalized \
|
||||
for it",
|
||||
peer_id, topic_hash
|
||||
);
|
||||
|
@ -179,6 +179,10 @@ where
|
||||
pub fn get(&self, key: &Key) -> Option<&Value> {
|
||||
self.map.get(key).map(|e| &e.element)
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self, key: &Key) -> Option<&mut Value> {
|
||||
self.map.get_mut(key).map(|e| &mut e.element)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DuplicateCache<Key>(TimeCache<Key, ()>);
|
||||
|
Reference in New Issue
Block a user