From eac43d144fca66c9a7c22d828048423c69f24afa Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 20 Jan 2021 20:47:05 +1100 Subject: [PATCH] protocols/gossipsub: Prevent non-published messages being added to caches (#1930) Currently, Gossipsub messages may not be published on a topic if there are no subscribed peers on that topic, however they are being added to a duplicate cache. If the messages are content-addressed, this prevents the user from attempting to republish the message in the future. This commit modifies the logic such that messages are only added to the caches (duplicate, memcache and published_messages) if the message could be published to at least one peer. Previously, a failed publish would be added the memcache. This meant that the router would gossip about a failed publish, such that if a peer joined the topic after the failed publish (but within the gossip history time window) the message could still be propagated to the newly connected peers. With this PR, a failed publish now definitively indicates the message will not be published on the network and the user is required to re-submit at a later time, once sufficient peers exist on the topic. --- protocols/gossipsub/src/behaviour.rs | 34 +++++++++++++--------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 8deae695..b0328cac 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -591,15 +591,11 @@ where // check that the size doesn't exceed the max transmission size if event.encoded_len() > self.config.max_transmit_size() { - // NOTE: The size limit can be reached by excessive topics or an excessive message. - // This is an estimate that should be within 10% of the true encoded value. It is - // possible to have a message that exceeds the RPC limit and is not caught here. A - // warning log will be emitted in this case. return Err(PublishError::MessageTooLarge); } - // Add published message to the duplicate cache. - if !self.duplicate_cache.insert(msg_id.clone()) { + // Check the if the message has been published before + if self.duplicate_cache.contains(&msg_id) { // This message has already been seen. We don't re-publish messages that have already // been published on the network. warn!( @@ -609,24 +605,13 @@ where return Err(PublishError::Duplicate); } - // If the message isn't a duplicate add it to the memcache. - self.mcache.put(&msg_id, raw_message.clone()); - debug!("Publishing message: {:?}", msg_id); - // If the message is anonymous or has a random author add it to the published message ids - // cache. - if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config { - if !self.config.allow_self_origin() { - self.published_message_ids.insert(msg_id.clone()); - } - } - let topic_hash = raw_message.topic.clone(); // If we are not flood publishing forward the message to mesh peers. let mesh_peers_sent = - !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message, None)?; + !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?; let mut recipient_peers = HashSet::new(); if let Some(set) = self.topic_peers.get(&topic_hash) { @@ -702,6 +687,19 @@ where return Err(PublishError::InsufficientPeers); } + // If the message isn't a duplicate and we have sent it to some peers add it to the + // duplicate cache and memcache. + self.duplicate_cache.insert(msg_id.clone()); + self.mcache.put(&msg_id, raw_message); + + // If the message is anonymous or has a random author add it to the published message ids + // cache. + if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config { + if !self.config.allow_self_origin() { + self.published_message_ids.insert(msg_id.clone()); + } + } + // Send to peers we know are subscribed to the topic. for peer_id in recipient_peers.iter() { debug!("Sending message to peer: {:?}", peer_id);