mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-12 17:41:22 +00:00
protocols/gossipsub: Prevent non-published messages being added to caches (#1930)
Currently, Gossipsub messages may not be published on a topic if there are no subscribed peers on that topic, however they are being added to a duplicate cache. If the messages are content-addressed, this prevents the user from attempting to republish the message in the future. This commit modifies the logic such that messages are only added to the caches (duplicate, memcache and published_messages) if the message could be published to at least one peer. Previously, a failed publish would be added the memcache. This meant that the router would gossip about a failed publish, such that if a peer joined the topic after the failed publish (but within the gossip history time window) the message could still be propagated to the newly connected peers. With this PR, a failed publish now definitively indicates the message will not be published on the network and the user is required to re-submit at a later time, once sufficient peers exist on the topic.
This commit is contained in:
@ -591,15 +591,11 @@ where
|
||||
|
||||
// check that the size doesn't exceed the max transmission size
|
||||
if event.encoded_len() > self.config.max_transmit_size() {
|
||||
// NOTE: The size limit can be reached by excessive topics or an excessive message.
|
||||
// This is an estimate that should be within 10% of the true encoded value. It is
|
||||
// possible to have a message that exceeds the RPC limit and is not caught here. A
|
||||
// warning log will be emitted in this case.
|
||||
return Err(PublishError::MessageTooLarge);
|
||||
}
|
||||
|
||||
// Add published message to the duplicate cache.
|
||||
if !self.duplicate_cache.insert(msg_id.clone()) {
|
||||
// Check the if the message has been published before
|
||||
if self.duplicate_cache.contains(&msg_id) {
|
||||
// This message has already been seen. We don't re-publish messages that have already
|
||||
// been published on the network.
|
||||
warn!(
|
||||
@ -609,24 +605,13 @@ where
|
||||
return Err(PublishError::Duplicate);
|
||||
}
|
||||
|
||||
// If the message isn't a duplicate add it to the memcache.
|
||||
self.mcache.put(&msg_id, raw_message.clone());
|
||||
|
||||
debug!("Publishing message: {:?}", msg_id);
|
||||
|
||||
// If the message is anonymous or has a random author add it to the published message ids
|
||||
// cache.
|
||||
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
|
||||
if !self.config.allow_self_origin() {
|
||||
self.published_message_ids.insert(msg_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let topic_hash = raw_message.topic.clone();
|
||||
|
||||
// If we are not flood publishing forward the message to mesh peers.
|
||||
let mesh_peers_sent =
|
||||
!self.config.flood_publish() && self.forward_msg(&msg_id, raw_message, None)?;
|
||||
!self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?;
|
||||
|
||||
let mut recipient_peers = HashSet::new();
|
||||
if let Some(set) = self.topic_peers.get(&topic_hash) {
|
||||
@ -702,6 +687,19 @@ where
|
||||
return Err(PublishError::InsufficientPeers);
|
||||
}
|
||||
|
||||
// If the message isn't a duplicate and we have sent it to some peers add it to the
|
||||
// duplicate cache and memcache.
|
||||
self.duplicate_cache.insert(msg_id.clone());
|
||||
self.mcache.put(&msg_id, raw_message);
|
||||
|
||||
// If the message is anonymous or has a random author add it to the published message ids
|
||||
// cache.
|
||||
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
|
||||
if !self.config.allow_self_origin() {
|
||||
self.published_message_ids.insert(msg_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Send to peers we know are subscribed to the topic.
|
||||
for peer_id in recipient_peers.iter() {
|
||||
debug!("Sending message to peer: {:?}", peer_id);
|
||||
|
Reference in New Issue
Block a user