// Copyright 2020 Sigma Prime Pty Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use std::{ cmp::{max, Ordering}, collections::HashSet, collections::VecDeque, collections::{BTreeSet, HashMap}, fmt, net::IpAddr, sync::Arc, task::{Context, Poll}, time::Duration, }; use futures::StreamExt; use log::{debug, error, trace, warn}; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; use instant::Instant; use libp2p_core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, }; use libp2p_swarm::{ DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use crate::config::{GossipsubConfig, ValidationMode}; use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; use crate::time_cache::{DuplicateCache, TimeCache}; use crate::topic::{Hasher, Topic, TopicHash}; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::{ FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage, }; use crate::types::{GossipsubRpc, PeerConnections, PeerKind}; use crate::{backoff::BackoffStorage, interval::Interval}; use crate::{rpc_proto, TopicScoreParams}; use std::{cmp::Ordering::Equal, fmt::Debug}; #[cfg(test)] mod tests; /// Determines if published messages should be signed or not. /// /// Without signing, a number of privacy preserving modes can be selected. /// /// NOTE: The default validation settings are to require signatures. The [`ValidationMode`] /// should be updated in the [`GossipsubConfig`] to allow for unsigned messages. #[derive(Clone)] pub enum MessageAuthenticity { /// Message signing is enabled. The author will be the owner of the key and the sequence number /// will be a random number. Signed(Keypair), /// Message signing is disabled. /// /// The specified [`PeerId`] will be used as the author of all published messages. The sequence /// number will be randomized. Author(PeerId), /// Message signing is disabled. /// /// A random [`PeerId`] will be used when publishing each message. The sequence number will be /// randomized. RandomAuthor, /// Message signing is disabled. /// /// The author of the message and the sequence numbers are excluded from the message. /// /// NOTE: Excluding these fields may make these messages invalid by other nodes who /// enforce validation of these fields. See [`ValidationMode`] in the [`GossipsubConfig`] /// for how to customise this for rust-libp2p gossipsub. A custom `message_id` /// function will need to be set to prevent all messages from a peer being filtered /// as duplicates. Anonymous, } impl MessageAuthenticity { /// Returns true if signing is enabled. pub fn is_signing(&self) -> bool { matches!(self, MessageAuthenticity::Signed(_)) } pub fn is_anonymous(&self) -> bool { matches!(self, MessageAuthenticity::Anonymous) } } /// Event that can be emitted by the gossipsub behaviour. #[derive(Debug)] pub enum GossipsubEvent { /// A message has been received. Message { /// The peer that forwarded us this message. propagation_source: PeerId, /// The [`MessageId`] of the message. This should be referenced by the application when /// validating a message (if required). message_id: MessageId, /// The decompressed message itself. message: GossipsubMessage, }, /// A remote subscribed to a topic. Subscribed { /// Remote that has subscribed. peer_id: PeerId, /// The topic it has subscribed to. topic: TopicHash, }, /// A remote unsubscribed from a topic. Unsubscribed { /// Remote that has unsubscribed. peer_id: PeerId, /// The topic it has subscribed from. topic: TopicHash, }, /// A peer that does not support gossipsub has connected. GossipsubNotSupported { peer_id: PeerId }, } /// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`] /// for further details. enum PublishConfig { Signing { keypair: Keypair, author: PeerId, inline_key: Option>, }, Author(PeerId), RandomAuthor, Anonymous, } impl PublishConfig { pub fn get_own_id(&self) -> Option<&PeerId> { match self { Self::Signing { author, .. } => Some(author), Self::Author(author) => Some(author), _ => None, } } } impl From for PublishConfig { fn from(authenticity: MessageAuthenticity) -> Self { match authenticity { MessageAuthenticity::Signed(keypair) => { let public_key = keypair.public(); let key_enc = public_key.to_protobuf_encoding(); let key = if key_enc.len() <= 42 { // The public key can be inlined in [`rpc_proto::Message::from`], so we don't include it // specifically in the [`rpc_proto::Message::key`] field. None } else { // Include the protobuf encoding of the public key in the message. Some(key_enc) }; PublishConfig::Signing { keypair, author: public_key.to_peer_id(), inline_key: key, } } MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id), MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor, MessageAuthenticity::Anonymous => PublishConfig::Anonymous, } } } type GossipsubNetworkBehaviourAction = NetworkBehaviourAction>; /// Network behaviour that handles the gossipsub protocol. /// /// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If /// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an /// appropriate level to accept unsigned messages. /// /// The DataTransform trait allows applications to optionally add extra encoding/decoding /// functionality to the underlying messages. This is intended for custom compression algorithms. /// /// The TopicSubscriptionFilter allows applications to implement specific filters on topics to /// prevent unwanted messages being propagated and evaluated. pub struct Gossipsub< D: DataTransform = IdentityTransform, F: TopicSubscriptionFilter = AllowAllSubscriptionFilter, > { /// Configuration providing gossipsub performance parameters. config: GossipsubConfig, /// Events that need to be yielded to the outside when polling. events: VecDeque, /// Pools non-urgent control messages between heartbeats. control_pool: HashMap>, /// Information used for publishing messages. publish_config: PublishConfig, /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents /// duplicates from being propagated to the application and on the network. duplicate_cache: DuplicateCache, /// A set of connected peers, indexed by their [`PeerId`]. tracking both the [`PeerKind`] and /// the set of [`ConnectionId`]s. connected_peers: HashMap, /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. topic_peers: HashMap>, /// A map of all connected peers to their subscribed topics. peer_topics: HashMap>, /// A set of all explicit peers. These are peers that remain connected and we unconditionally /// forward messages to, outside of the scoring system. explicit_peers: HashSet, /// A list of peers that have been blacklisted by the user. /// Messages are not sent to and are rejected from these peers. blacklisted_peers: HashSet, /// Overlay network of connected peers - Maps topics to connected gossipsub peers. mesh: HashMap>, /// Map of topics to list of peers that we publish to, but don't subscribe to. fanout: HashMap>, /// The last publish time for fanout topics. fanout_last_pub: HashMap, ///Storage for backoffs backoffs: BackoffStorage, /// Message cache for the last few heartbeats. mcache: MessageCache, /// Heartbeat interval stream. heartbeat: Interval, /// Number of heartbeats since the beginning of time; this allows us to amortize some resource /// clean up -- eg backoff clean up. heartbeat_ticks: u64, /// We remember all peers we found through peer exchange, since those peers are not considered /// as safe as randomly discovered outbound peers. This behaviour diverges from the go /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will /// be removed from this list which may result in a true outbound rediscovery. px_peers: HashSet, /// Set of connected outbound peers (we only consider true outbound peers found through /// discovery and not by PX). outbound_peers: HashSet, /// Stores optional peer score data together with thresholds, decay interval and gossip /// promises. peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>, /// Counts the number of `IHAVE` received from each peer since the last heartbeat. count_received_ihave: HashMap, /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. count_sent_iwant: HashMap, /// Short term cache for published messsage ids. This is used for penalizing peers sending /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, /// Short term cache for fast message ids mapping them to the real message ids fast_messsage_id_cache: TimeCache, /// The filter used to handle message subscriptions. subscription_filter: F, /// A general transformation function that can be applied to data received from the wire before /// calculating the message-id and sending to the application. This is designed to allow the /// user to implement arbitrary topic-based compression algorithms. data_transform: D, } impl Gossipsub where D: DataTransform + Default, F: TopicSubscriptionFilter + Default, { /// Creates a [`Gossipsub`] struct given a set of parameters specified via a /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. pub fn new( privacy: MessageAuthenticity, config: GossipsubConfig, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, F::default(), D::default(), ) } } impl Gossipsub where D: DataTransform + Default, F: TopicSubscriptionFilter, { /// Creates a [`Gossipsub`] struct given a set of parameters specified via a /// [`GossipsubConfig`] and a custom subscription filter. pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: GossipsubConfig, subscription_filter: F, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, subscription_filter, D::default(), ) } } impl Gossipsub where D: DataTransform, F: TopicSubscriptionFilter + Default, { /// Creates a [`Gossipsub`] struct given a set of parameters specified via a /// [`GossipsubConfig`] and a custom data transform. pub fn new_with_transform( privacy: MessageAuthenticity, config: GossipsubConfig, data_transform: D, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, F::default(), data_transform, ) } } impl Gossipsub where D: DataTransform, F: TopicSubscriptionFilter, { /// Creates a [`Gossipsub`] struct given a set of parameters specified via a /// [`GossipsubConfig`] and a custom subscription filter and data transform. pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: GossipsubConfig, subscription_filter: F, data_transform: D, ) -> Result { // Set up the router given the configuration settings. // We do not allow configurations where a published message would also be rejected if it // were received locally. validate_config(&privacy, config.validation_mode())?; // Set up message publishing parameters. Ok(Gossipsub { events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), peer_topics: HashMap::new(), explicit_peers: HashSet::new(), blacklisted_peers: HashSet::new(), mesh: HashMap::new(), fanout: HashMap::new(), fanout_last_pub: HashMap::new(), backoffs: BackoffStorage::new( &config.prune_backoff(), config.heartbeat_interval(), config.backoff_slack(), ), mcache: MessageCache::new(config.history_gossip(), config.history_length()), heartbeat: Interval::new_initial( config.heartbeat_initial_delay(), config.heartbeat_interval(), ), heartbeat_ticks: 0, px_peers: HashSet::new(), outbound_peers: HashSet::new(), peer_score: None, count_received_ihave: HashMap::new(), count_sent_iwant: HashMap::new(), connected_peers: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, subscription_filter, data_transform, }) } } impl Gossipsub where D: DataTransform + Send + 'static, F: TopicSubscriptionFilter + Send + 'static, { /// Lists the hashes of the topics we are currently subscribed to. pub fn topics(&self) -> impl Iterator { self.mesh.keys() } /// Lists all mesh peers for a certain topic hash. pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator { self.mesh .get(topic_hash) .into_iter() .map(|x| x.iter()) .flatten() } /// Lists all mesh peers for all topics. pub fn all_mesh_peers(&self) -> impl Iterator { let mut res = BTreeSet::new(); for peers in self.mesh.values() { res.extend(peers); } res.into_iter() } /// Lists all known peers and their associated subscribed topics. pub fn all_peers(&self) -> impl Iterator)> { self.peer_topics .iter() .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect())) } /// Lists all known peers and their associated protocol. pub fn peer_protocol(&self) -> impl Iterator { self.connected_peers.iter().map(|(k, v)| (k, &v.kind)) } /// Returns the gossipsub score for a given peer, if one exists. pub fn peer_score(&self, peer_id: &PeerId) -> Option { self.peer_score .as_ref() .map(|(score, ..)| score.score(peer_id)) } /// Subscribe to a topic. /// /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already /// subscribed. pub fn subscribe(&mut self, topic: &Topic) -> Result { debug!("Subscribing to topic: {}", topic); let topic_hash = topic.hash(); if !self.subscription_filter.can_subscribe(&topic_hash) { return Err(SubscriptionError::NotAllowed); } if self.mesh.get(&topic_hash).is_some() { debug!("Topic: {} is already in the mesh.", topic); return Ok(false); } // send subscription request to all peers let peer_list = self.peer_topics.keys().cloned().collect::>(); if !peer_list.is_empty() { let event = GossipsubRpc { messages: Vec::new(), subscriptions: vec![GossipsubSubscription { topic_hash: topic_hash.clone(), action: GossipsubSubscriptionAction::Subscribe, }], control_msgs: Vec::new(), } .into_protobuf(); for peer in peer_list { debug!("Sending SUBSCRIBE to peer: {:?}", peer); self.send_message(peer, event.clone()) .map_err(SubscriptionError::PublishError)?; } } // call JOIN(topic) // this will add new peers to the mesh for the topic self.join(&topic_hash); debug!("Subscribed to topic: {}", topic); Ok(true) } /// Unsubscribes from a topic. /// /// Returns [`Ok(true)`] if we were subscribed to this topic. pub fn unsubscribe(&mut self, topic: &Topic) -> Result { debug!("Unsubscribing from topic: {}", topic); let topic_hash = topic.hash(); if self.mesh.get(&topic_hash).is_none() { debug!("Already unsubscribed from topic: {:?}", topic_hash); // we are not subscribed return Ok(false); } // announce to all peers let peer_list = self.peer_topics.keys().cloned().collect::>(); if !peer_list.is_empty() { let event = GossipsubRpc { messages: Vec::new(), subscriptions: vec![GossipsubSubscription { topic_hash: topic_hash.clone(), action: GossipsubSubscriptionAction::Unsubscribe, }], control_msgs: Vec::new(), } .into_protobuf(); for peer in peer_list { debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string()); self.send_message(peer, event.clone())?; } } // call LEAVE(topic) // this will remove the topic from the mesh self.leave(&topic_hash); debug!("Unsubscribed from topic: {:?}", topic_hash); Ok(true) } /// Publishes a message with multiple topics to the network. pub fn publish( &mut self, topic: Topic, data: impl Into>, ) -> Result { let data = data.into(); // Transform the data before building a raw_message. let transformed_data = self .data_transform .outbound_transform(&topic.hash(), data.clone())?; let raw_message = self.build_raw_message(topic.into(), transformed_data)?; // calculate the message id from the un-transformed data let msg_id = self.config.message_id(&GossipsubMessage { source: raw_message.source, data, // the uncompressed form sequence_number: raw_message.sequence_number, topic: raw_message.topic.clone(), }); let event = GossipsubRpc { subscriptions: Vec::new(), messages: vec![raw_message.clone()], control_msgs: Vec::new(), } .into_protobuf(); // check that the size doesn't exceed the max transmission size if event.encoded_len() > self.config.max_transmit_size() { return Err(PublishError::MessageTooLarge); } // Check the if the message has been published before if self.duplicate_cache.contains(&msg_id) { // This message has already been seen. We don't re-publish messages that have already // been published on the network. warn!( "Not publishing a message that has already been published. Msg-id {}", msg_id ); return Err(PublishError::Duplicate); } debug!("Publishing message: {:?}", msg_id); let topic_hash = raw_message.topic.clone(); // If we are not flood publishing forward the message to mesh peers. let mesh_peers_sent = !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?; let mut recipient_peers = HashSet::new(); if let Some(set) = self.topic_peers.get(&topic_hash) { if self.config.flood_publish() { // Forward to all peers above score and all explicit peers recipient_peers.extend( set.iter() .filter(|p| { self.explicit_peers.contains(*p) || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0 }) .cloned(), ); } else { // Explicit peers for peer in &self.explicit_peers { if set.contains(peer) { recipient_peers.insert(*peer); } } // Floodsub peers for (peer, connections) in &self.connected_peers { if connections.kind == PeerKind::Floodsub && !self .score_below_threshold(peer, |ts| ts.publish_threshold) .0 { recipient_peers.insert(*peer); } } // Gossipsub peers if self.mesh.get(&topic_hash).is_none() { debug!("Topic: {:?} not in the mesh", topic_hash); // If we have fanout peers add them to the map. if self.fanout.contains_key(&topic_hash) { for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { recipient_peers.insert(*peer); } } else { // We have no fanout peers, select mesh_n of them and add them to the fanout let mesh_n = self.config.mesh_n(); let new_peers = get_random_peers( &self.topic_peers, &self.connected_peers, &topic_hash, mesh_n, { |p| { !self.explicit_peers.contains(p) && !self .score_below_threshold(p, |pst| pst.publish_threshold) .0 } }, ); // Add the new peers to the fanout and recipient peers self.fanout.insert(topic_hash.clone(), new_peers.clone()); for peer in new_peers { debug!("Peer added to fanout: {:?}", peer); recipient_peers.insert(peer); } } // We are publishing to fanout peers - update the time we published self.fanout_last_pub .insert(topic_hash.clone(), Instant::now()); } } } if recipient_peers.is_empty() && !mesh_peers_sent { return Err(PublishError::InsufficientPeers); } // If the message isn't a duplicate and we have sent it to some peers add it to the // duplicate cache and memcache. self.duplicate_cache.insert(msg_id.clone()); self.mcache.put(&msg_id, raw_message); // If the message is anonymous or has a random author add it to the published message ids // cache. if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config { if !self.config.allow_self_origin() { self.published_message_ids.insert(msg_id.clone()); } } // Send to peers we know are subscribed to the topic. for peer_id in recipient_peers.iter() { debug!("Sending message to peer: {:?}", peer_id); self.send_message(*peer_id, event.clone())?; } debug!("Published message: {:?}", &msg_id); Ok(msg_id) } /// This function should be called when [`GossipsubConfig::validate_messages()`] is `true` after /// the message got validated by the caller. Messages are stored in the ['Memcache'] and /// validation is expected to be fast enough that the messages should still exist in the cache. /// There are three possible validation outcomes and the outcome is given in acceptance. /// /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the /// network. The `propagation_source` parameter indicates who the message was received by and /// will not be forwarded back to that peer. /// /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache /// and the P₄ penalty will be applied to the `propagation_source`. // /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache /// but no P₄ penalty will be applied. /// /// This function will return true if the message was found in the cache and false if was not /// in the cache anymore. /// /// This should only be called once per message. pub fn report_message_validation_result( &mut self, msg_id: &MessageId, propagation_source: &PeerId, acceptance: MessageAcceptance, ) -> Result { let reject_reason = match acceptance { MessageAcceptance::Accept => { let raw_message = match self.mcache.validate(msg_id) { Some(raw_message) => raw_message.clone(), None => { warn!( "Message not in cache. Ignoring forwarding. Message Id: {}", msg_id ); return Ok(false); } }; self.forward_msg(msg_id, raw_message, Some(propagation_source))?; return Ok(true); } MessageAcceptance::Reject => RejectReason::ValidationFailed, MessageAcceptance::Ignore => RejectReason::ValidationIgnored, }; if let Some(raw_message) = self.mcache.remove(msg_id) { // Tell peer_score about reject if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.reject_message( propagation_source, msg_id, &raw_message.topic, reject_reason, ); } Ok(true) } else { warn!("Rejected message not in cache. Message Id: {}", msg_id); Ok(false) } } /// Adds a new peer to the list of explicitly connected peers. pub fn add_explicit_peer(&mut self, peer_id: &PeerId) { debug!("Adding explicit peer {}", peer_id); self.explicit_peers.insert(*peer_id); self.check_explicit_peer_connection(peer_id); } /// This removes the peer from explicitly connected peers, note that this does not disconnect /// the peer. pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) { debug!("Removing explicit peer {}", peer_id); self.explicit_peers.remove(peer_id); } /// Blacklists a peer. All messages from this peer will be rejected and any message that was /// created by this peer will be rejected. pub fn blacklist_peer(&mut self, peer_id: &PeerId) { if self.blacklisted_peers.insert(*peer_id) { debug!("Peer has been blacklisted: {}", peer_id); } } /// Removes a peer from the blacklist if it has previously been blacklisted. pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) { if self.blacklisted_peers.remove(peer_id) { debug!("Peer has been removed from the blacklist: {}", peer_id); } } /// Activates the peer scoring system with the given parameters. This will reset all scores /// if there was already another peer scoring system activated. Returns an error if the /// params are not valid or if they got already set. pub fn with_peer_score( &mut self, params: PeerScoreParams, threshold: PeerScoreThresholds, ) -> Result<(), String> { self.with_peer_score_and_message_delivery_time_callback(params, threshold, None) } /// Activates the peer scoring system with the given parameters and a message delivery time /// callback. Returns an error if the parameters got already set. pub fn with_peer_score_and_message_delivery_time_callback( &mut self, params: PeerScoreParams, threshold: PeerScoreThresholds, callback: Option, ) -> Result<(), String> { params.validate()?; threshold.validate()?; if self.peer_score.is_some() { return Err("Peer score set twice".into()); } let interval = Interval::new(params.decay_interval); let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback); self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default())); Ok(()) } /// Sets scoring parameters for a topic. /// /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring. pub fn set_topic_params( &mut self, topic: Topic, params: TopicScoreParams, ) -> Result<(), &'static str> { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.set_topic_params(topic.hash(), params); Ok(()) } else { Err("Peer score must be initialised with `with_peer_score()`") } } /// Sets the application specific score for a peer. Returns true if scoring is active and /// the peer is connected or if the score of the peer is not yet expired, false otherwise. pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.set_application_score(peer_id, new_score) } else { false } } /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages. fn join(&mut self, topic_hash: &TopicHash) { debug!("Running JOIN for topic: {:?}", topic_hash); // if we are already in the mesh, return if self.mesh.contains_key(topic_hash) { debug!("JOIN: The topic is already in the mesh, ignoring JOIN"); return; } let mut added_peers = HashSet::new(); // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do, // removing the fanout entry. if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) { debug!( "JOIN: Removing peers from the fanout for topic: {:?}", topic_hash ); // remove explicit peers, peers with negative scores, and backoffed peers peers = peers .into_iter() .filter(|p| { !self.explicit_peers.contains(p) && !self.score_below_threshold(p, |_| 0.0).0 && !self.backoffs.is_backoff_with_slack(topic_hash, p) }) .collect(); // Add up to mesh_n of them them to the mesh // NOTE: These aren't randomly added, currently FIFO let add_peers = std::cmp::min(peers.len(), self.config.mesh_n()); debug!( "JOIN: Adding {:?} peers from the fanout for topic: {:?}", add_peers, topic_hash ); added_peers.extend(peers.iter().cloned().take(add_peers)); self.mesh.insert( topic_hash.clone(), peers.into_iter().take(add_peers).collect(), ); // remove the last published time self.fanout_last_pub.remove(topic_hash); } // check if we need to get more peers, which we randomly select if added_peers.len() < self.config.mesh_n() { // get the peers let new_peers = get_random_peers( &self.topic_peers, &self.connected_peers, topic_hash, self.config.mesh_n() - added_peers.len(), |peer| { !added_peers.contains(peer) && !self.explicit_peers.contains(peer) && !self.score_below_threshold(peer, |_| 0.0).0 && !self.backoffs.is_backoff_with_slack(topic_hash, peer) }, ); added_peers.extend(new_peers.clone()); // add them to the mesh debug!( "JOIN: Inserting {:?} random peers into the mesh", new_peers.len() ); let mesh_peers = self .mesh .entry(topic_hash.clone()) .or_insert_with(Default::default); mesh_peers.extend(new_peers); } for peer_id in added_peers { // Send a GRAFT control message debug!("JOIN: Sending Graft message to peer: {:?}", peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(&peer_id, topic_hash.clone()); } Self::control_pool_add( &mut self.control_pool, peer_id, GossipsubControlAction::Graft { topic_hash: topic_hash.clone(), }, ); // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( peer_id, vec![topic_hash], &self.mesh, self.peer_topics.get(&peer_id), &mut self.events, &self.connected_peers, ); } debug!("Completed JOIN for topic: {:?}", topic_hash); } /// Creates a PRUNE gossipsub action. fn make_prune( &mut self, topic_hash: &TopicHash, peer: &PeerId, do_px: bool, ) -> GossipsubControlAction { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer, topic_hash.clone()); } match self.connected_peers.get(peer).map(|v| &v.kind) { Some(PeerKind::Floodsub) => { error!("Attempted to prune a Floodsub peer"); } Some(PeerKind::Gossipsub) => { // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway return GossipsubControlAction::Prune { topic_hash: topic_hash.clone(), peers: Vec::new(), backoff: None, }; } None => { error!("Attempted to Prune an unknown peer"); } _ => {} // Gossipsub 1.1 peer perform the `Prune` } // Select peers for peer exchange let peers = if do_px { get_random_peers( &self.topic_peers, &self.connected_peers, topic_hash, self.config.prune_peers(), |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0, ) .into_iter() .map(|p| PeerInfo { peer_id: Some(p) }) .collect() } else { Vec::new() }; // update backoff self.backoffs .update_backoff(topic_hash, peer, self.config.prune_backoff()); GossipsubControlAction::Prune { topic_hash: topic_hash.clone(), peers, backoff: Some(self.config.prune_backoff().as_secs()), } } /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages. fn leave(&mut self, topic_hash: &TopicHash) { debug!("Running LEAVE for topic {:?}", topic_hash); // If our mesh contains the topic, send prune to peers and delete it from the mesh if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) { for peer in peers { // Send a PRUNE control message debug!("LEAVE: Sending PRUNE to peer: {:?}", peer); let control = self.make_prune(topic_hash, &peer, self.config.do_px()); Self::control_pool_add(&mut self.control_pool, peer, control); // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( peer, topic_hash, &self.mesh, self.peer_topics.get(&peer), &mut self.events, &self.connected_peers, ); } } debug!("Completed LEAVE for topic: {:?}", topic_hash); } /// Checks if the given peer is still connected and if not dials the peer again. fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) { if !self.peer_topics.contains_key(peer_id) { // Connect to peer debug!("Connecting to explicit peer {:?}", peer_id); let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: *peer_id, condition: DialPeerCondition::Disconnected, handler, }); } } /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the /// `threshold` parameter. fn score_below_threshold( &self, peer_id: &PeerId, threshold: impl Fn(&PeerScoreThresholds) -> f64, ) -> (bool, f64) { Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold) } fn score_below_threshold_from_scores( peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>, peer_id: &PeerId, threshold: impl Fn(&PeerScoreThresholds) -> f64, ) -> (bool, f64) { if let Some((peer_score, thresholds, ..)) = peer_score { let score = peer_score.score(peer_id); if score < threshold(thresholds) { return (true, score); } (false, score) } else { (false, 0.0) } } /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown, /// requests it with an IWANT control message. fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec)>) { // We ignore IHAVE gossip from any peer whose score is below the gossip threshold if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) { debug!( "IHAVE: ignoring peer {:?} with score below threshold [score = {}]", peer_id, score ); return; } // IHAVE flood protection let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0); *peer_have += 1; if *peer_have > self.config.max_ihave_messages() { debug!( "IHAVE: peer {} has advertised too many times ({}) within this heartbeat \ interval; ignoring", peer_id, *peer_have ); return; } if let Some(iasked) = self.count_sent_iwant.get(peer_id) { if *iasked >= self.config.max_ihave_length() { debug!( "IHAVE: peer {} has already advertised too many messages ({}); ignoring", peer_id, *iasked ); return; } } debug!("Handling IHAVE for peer: {:?}", peer_id); // use a hashset to avoid duplicates efficiently let mut iwant_ids = HashSet::new(); for (topic, ids) in ihave_msgs { // only process the message if we are subscribed if !self.mesh.contains_key(&topic) { debug!( "IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}", topic ); continue; } for id in ids { if !self.duplicate_cache.contains(&id) { // have not seen this message, request it iwant_ids.insert(id); } } } if !iwant_ids.is_empty() { let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0); let mut iask = iwant_ids.len(); if *iasked + iask > self.config.max_ihave_length() { iask = self.config.max_ihave_length().saturating_sub(*iasked); } // Send the list of IWANT control messages debug!( "IHAVE: Asking for {} out of {} messages from {}", iask, iwant_ids.len(), peer_id ); //ask in random order let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect(); let mut rng = thread_rng(); iwant_ids_vec.partial_shuffle(&mut rng, iask as usize); iwant_ids_vec.truncate(iask as usize); *iasked += iask; let message_ids = iwant_ids_vec.into_iter().cloned().collect::>(); if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( *peer_id, &message_ids, Instant::now() + self.config.iwant_followup_time(), ); } debug!( "IHAVE: Asking for the following messages from {}: {:?}", peer_id, message_ids ); Self::control_pool_add( &mut self.control_pool, *peer_id, GossipsubControlAction::IWant { message_ids }, ); } debug!("Completed IHAVE handling for peer: {:?}", peer_id); } /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is /// forwarded to the requesting peer. fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec) { // We ignore IWANT gossip from any peer whose score is below the gossip threshold if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) { debug!( "IWANT: ignoring peer {:?} with score below threshold [score = {}]", peer_id, score ); return; } debug!("Handling IWANT for peer: {:?}", peer_id); // build a hashmap of available messages let mut cached_messages = HashMap::new(); for id in iwant_msgs { // If we have it and the IHAVE count is not above the threshold, add it do the // cached_messages mapping if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) { if count > self.config.gossip_retransimission() { debug!( "IWANT: Peer {} has asked for message {} too many times; ignoring \ request", peer_id, &id ); } else { cached_messages.insert(id.clone(), msg.clone()); } } } if !cached_messages.is_empty() { debug!("IWANT: Sending cached messages to peer: {:?}", peer_id); // Send the messages to the peer let message_list = cached_messages.into_iter().map(|entry| entry.1).collect(); if self .send_message( *peer_id, GossipsubRpc { subscriptions: Vec::new(), messages: message_list, control_msgs: Vec::new(), } .into_protobuf(), ) .is_err() { error!("Failed to send cached messages. Messages too large"); } } debug!("Completed IWANT handling for peer: {}", peer_id); } /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not, /// responds with PRUNE messages. fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec) { debug!("Handling GRAFT message for peer: {}", peer_id); let mut to_prune_topics = HashSet::new(); let mut do_px = self.config.do_px(); // For each topic, if a peer has grafted us, then we necessarily must be in their mesh // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { self.peer_topics .entry(*peer_id) .or_default() .insert(topic.clone()); self.topic_peers .entry(topic.clone()) .or_default() .insert(*peer_id); } // we don't GRAFT to/from explicit peers; complain loudly if this happens if self.explicit_peers.contains(peer_id) { warn!("GRAFT: ignoring request from direct peer {}", peer_id); // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics to_prune_topics = topics.into_iter().collect(); // but don't PX do_px = false } else { let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0); let now = Instant::now(); for topic_hash in topics { if let Some(peers) = self.mesh.get_mut(&topic_hash) { // if the peer is already in the mesh ignore the graft if peers.contains(peer_id) { debug!( "GRAFT: Received graft for peer {:?} that is already in topic {:?}", peer_id, &topic_hash ); continue; } // make sure we are not backing off that peer if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id) { if backoff_time > now { warn!( "GRAFT: peer attempted graft within backoff time, penalizing {}", peer_id ); // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.add_penalty(peer_id, 1); // check the flood cutoff let flood_cutoff = (backoff_time + self.config.graft_flood_threshold()) - self.config.prune_backoff(); if flood_cutoff > now { //extra penalty peer_score.add_penalty(peer_id, 1); } } // no PX do_px = false; to_prune_topics.insert(topic_hash.clone()); continue; } } // check the score if below_zero { // we don't GRAFT peers with negative score debug!( "GRAFT: ignoring peer {:?} with negative score [score = {}, \ topic = {}]", peer_id, score, topic_hash ); // we do send them PRUNE however, because it's a matter of protocol correctness to_prune_topics.insert(topic_hash.clone()); // but we won't PX to them do_px = false; continue; } // check mesh upper bound and only allow graft if the upper bound is not reached or // if it is an outbound peer if peers.len() >= self.config.mesh_n_high() && !self.outbound_peers.contains(peer_id) { to_prune_topics.insert(topic_hash.clone()); continue; } // add peer to the mesh debug!( "GRAFT: Mesh link added for peer: {:?} in topic: {:?}", peer_id, &topic_hash ); peers.insert(*peer_id); // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( *peer_id, vec![&topic_hash], &self.mesh, self.peer_topics.get(peer_id), &mut self.events, &self.connected_peers, ); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(peer_id, topic_hash); } } else { // don't do PX when there is an unknown topic to avoid leaking our peers do_px = false; debug!( "GRAFT: Received graft for unknown topic {:?} from peer {:?}", &topic_hash, peer_id ); // spam hardening: ignore GRAFTs for unknown topics continue; } } } if !to_prune_topics.is_empty() { // build the prune messages to send let prune_messages = to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px)) .collect(); // Send the prune messages to the peer debug!( "GRAFT: Not subscribed to topics - Sending PRUNE to peer: {}", peer_id ); if self .send_message( *peer_id, GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: prune_messages, } .into_protobuf(), ) .is_err() { error!("Failed to send graft. Message too large"); } } debug!("Completed GRAFT handling for peer: {}", peer_id); } fn remove_peer_from_mesh( &mut self, peer_id: &PeerId, topic_hash: &TopicHash, backoff: Option, always_update_backoff: bool, ) { let mut update_backoff = always_update_backoff; if let Some(peers) = self.mesh.get_mut(topic_hash) { // remove the peer if it exists in the mesh if peers.remove(peer_id) { debug!( "PRUNE: Removing peer: {} from the mesh for topic: {}", peer_id.to_string(), topic_hash ); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer_id, topic_hash.clone()); } update_backoff = true; // inform the handler peer_removed_from_mesh( *peer_id, topic_hash, &self.mesh, self.peer_topics.get(peer_id), &mut self.events, &self.connected_peers, ); } } if update_backoff { let time = if let Some(backoff) = backoff { Duration::from_secs(backoff) } else { self.config.prune_backoff() }; // is there a backoff specified by the peer? if so obey it. self.backoffs.update_backoff(topic_hash, peer_id, time); } } /// Handles PRUNE control messages. Removes peer from the mesh. fn handle_prune( &mut self, peer_id: &PeerId, prune_data: Vec<(TopicHash, Vec, Option)>, ) { debug!("Handling PRUNE message for peer: {}", peer_id); let (below_threshold, score) = self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); if self.mesh.contains_key(&topic_hash) { //connect to px peers if !px.is_empty() { // we ignore PX from peers with insufficient score if below_threshold { debug!( "PRUNE: ignoring PX from peer {:?} with insufficient score \ [score ={} topic = {}]", peer_id, score, topic_hash ); continue; } // NOTE: We cannot dial any peers from PX currently as we typically will not // know their multiaddr. Until SignedRecords are spec'd this // remains a stub. By default `config.prune_peers()` is set to zero and // this is skipped. If the user modifies this, this will only be able to // dial already known peers (from an external discovery mechanism for // example). if self.config.prune_peers() > 0 { self.px_connect(px); } } } } debug!("Completed PRUNE handling for peer: {}", peer_id.to_string()); } fn px_connect(&mut self, mut px: Vec) { let n = self.config.prune_peers(); // Ignore peerInfo with no ID // //TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a // signed peer record? px = px.into_iter().filter(|p| p.peer_id.is_some()).collect(); if px.len() > n { // only use at most prune_peers many random peers let mut rng = thread_rng(); px.partial_shuffle(&mut rng, n); px = px.into_iter().take(n).collect(); } for p in px { // TODO: Once signed records are spec'd: extract signed peer record if given and handle // it, see https://github.com/libp2p/specs/pull/217 if let Some(peer_id) = p.peer_id { // mark as px peer self.px_peers.insert(peer_id); // dial peer let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, handler, }); } } } /// Applies some basic checks to whether this message is valid. Does not apply user validation /// checks. fn message_is_valid( &mut self, msg_id: &MessageId, raw_message: &mut RawGossipsubMessage, propagation_source: &PeerId, ) -> bool { debug!( "Handling message: {:?} from peer: {}", msg_id, propagation_source.to_string() ); // Reject any message from a blacklisted peer if self.blacklisted_peers.contains(propagation_source) { debug!( "Rejecting message from blacklisted peer: {}", propagation_source ); if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { peer_score.reject_message( propagation_source, msg_id, &raw_message.topic, RejectReason::BlackListedPeer, ); gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer); } return false; } // Also reject any message that originated from a blacklisted peer if let Some(source) = raw_message.source.as_ref() { if self.blacklisted_peers.contains(source) { debug!( "Rejecting message from peer {} because of blacklisted source: {}", propagation_source, source ); if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { peer_score.reject_message( propagation_source, msg_id, &raw_message.topic, RejectReason::BlackListedSource, ); gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource); } return false; } } // If we are not validating messages, assume this message is validated // This will allow the message to be gossiped without explicitly calling // `validate_message`. if !self.config.validate_messages() { raw_message.validated = true; } // reject messages claiming to be from ourselves but not locally published let self_published = !self.config.allow_self_origin() && if let Some(own_id) = self.publish_config.get_own_id() { own_id != propagation_source && raw_message.source.as_ref().map_or(false, |s| s == own_id) } else { self.published_message_ids.contains(msg_id) }; if self_published { debug!( "Dropping message {} claiming to be from self but forwarded from {}", msg_id, propagation_source ); if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { peer_score.reject_message( propagation_source, msg_id, &raw_message.topic, RejectReason::SelfOrigin, ); gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin); } return false; } true } /// Handles a newly received [`RawGossipsubMessage`]. /// /// Forwards the message to all peers in the mesh. fn handle_received_message( &mut self, mut raw_message: RawGossipsubMessage, propagation_source: &PeerId, ) { let fast_message_id = self.config.fast_message_id(&raw_message); if let Some(fast_message_id) = fast_message_id.as_ref() { if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) { let msg_id = msg_id.clone(); self.message_is_valid(&msg_id, &mut raw_message, propagation_source); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic); } return; } } // Try and perform the data transform to the message. If it fails, consider it invalid. let message = match self.data_transform.inbound_transform(raw_message.clone()) { Ok(message) => message, Err(e) => { debug!("Invalid message. Transform error: {:?}", e); // Reject the message and return self.handle_invalid_message( propagation_source, raw_message, ValidationError::TransformFailed, ); return; } }; // Calculate the message id on the transformed data. let msg_id = self.config.message_id(&message); // Check the validity of the message // Peers get penalized if this message is invalid. We don't add it to the duplicate cache // and instead continually penalize peers that repeatedly send this message. if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { return; } // Add the message to the duplicate caches if let Some(fast_message_id) = fast_message_id { // add id to cache self.fast_messsage_id_cache .entry(fast_message_id) .or_insert_with(|| msg_id.clone()); } if !self.duplicate_cache.insert(msg_id.clone()) { debug!("Message already received, ignoring. Message: {}", msg_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.duplicated_message(propagation_source, &msg_id, &message.topic); } return; } debug!( "Put message {:?} in duplicate_cache and resolve promises", msg_id ); // Tells score that message arrived (but is maybe not fully validated yet). // Consider the message as delivered for gossip promises. if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { peer_score.validate_message(propagation_source, &msg_id, &message.topic); gossip_promises.message_delivered(&msg_id); } // Add the message to our memcache self.mcache.put(&msg_id, raw_message.clone()); // Dispatch the message to the user if we are subscribed to any of the topics if self.mesh.contains_key(&message.topic) { debug!("Sending received message to user"); self.events.push_back(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Message { propagation_source: *propagation_source, message_id: msg_id.clone(), message, }, )); } else { debug!( "Received message on a topic we are not subscribed to: {:?}", message.topic ); return; } // forward the message to mesh peers, if no validation is required if !self.config.validate_messages() { if self .forward_msg(&msg_id, raw_message, Some(propagation_source)) .is_err() { error!("Failed to forward message. Too large"); } debug!("Completed message handling for message: {:?}", msg_id); } } // Handles invalid messages received. fn handle_invalid_message( &mut self, propagation_source: &PeerId, raw_message: RawGossipsubMessage, validation_error: ValidationError, ) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { let reason = RejectReason::ValidationError(validation_error); let fast_message_id_cache = &self.fast_messsage_id_cache; if let Some(msg_id) = self .config .fast_message_id(&raw_message) .and_then(|id| fast_message_id_cache.get(&id)) { peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason); gossip_promises.reject_message(msg_id, &reason); } else { // The message is invalid, we reject it ignoring any gossip promises. If a peer is // advertising this message via an IHAVE and it's invalid it will be double // penalized, one for sending us an invalid and again for breaking a promise. peer_score.reject_invalid_message(propagation_source, &raw_message.topic); } } } /// Handles received subscriptions. fn handle_received_subscriptions( &mut self, subscriptions: &[GossipsubSubscription], propagation_source: &PeerId, ) { debug!( "Handling subscriptions: {:?}, from source: {}", subscriptions, propagation_source.to_string() ); let mut unsubscribed_peers = Vec::new(); let subscribed_topics = match self.peer_topics.get_mut(propagation_source) { Some(topics) => topics, None => { error!( "Subscription by unknown peer: {}", propagation_source.to_string() ); return; } }; // Collect potential graft topics for the peer. let mut topics_to_graft = Vec::new(); // Notify the application about the subscription, after the grafts are sent. let mut application_event = Vec::new(); let filtered_topics = match self .subscription_filter .filter_incoming_subscriptions(subscriptions, subscribed_topics) { Ok(topics) => topics, Err(s) => { error!( "Subscription filter error: {}; ignoring RPC from peer {}", s, propagation_source.to_string() ); return; } }; for subscription in filtered_topics { // get the peers from the mapping, or insert empty lists if the topic doesn't exist let peer_list = self .topic_peers .entry(subscription.topic_hash.clone()) .or_insert_with(Default::default); match subscription.action { GossipsubSubscriptionAction::Subscribe => { if peer_list.insert(*propagation_source) { debug!( "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}", propagation_source.to_string(), subscription.topic_hash ); } // add to the peer_topics mapping subscribed_topics.insert(subscription.topic_hash.clone()); // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) && matches!( self.connected_peers .get(propagation_source) .map(|v| &v.kind), Some(PeerKind::Gossipsubv1_1) | Some(PeerKind::Gossipsub) ) && !Self::score_below_threshold_from_scores( &self.peer_score, propagation_source, |_| 0.0, ) .0 && !self .backoffs .is_backoff_with_slack(&subscription.topic_hash, propagation_source) { if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { if peers.len() < self.config.mesh_n_low() && peers.insert(*propagation_source) { debug!( "SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}", propagation_source.to_string(), subscription.topic_hash ); // send graft to the peer debug!( "Sending GRAFT to peer {} for topic {:?}", propagation_source.to_string(), subscription.topic_hash ); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score .graft(propagation_source, subscription.topic_hash.clone()); } topics_to_graft.push(subscription.topic_hash.clone()); } } } // generates a subscription event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Subscribed { peer_id: *propagation_source, topic: subscription.topic_hash.clone(), }, )); } GossipsubSubscriptionAction::Unsubscribe => { if peer_list.remove(propagation_source) { debug!( "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}", propagation_source.to_string(), subscription.topic_hash ); } // remove topic from the peer_topics mapping subscribed_topics.remove(&subscription.topic_hash); unsubscribed_peers.push((*propagation_source, subscription.topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Unsubscribed { peer_id: *propagation_source, topic: subscription.topic_hash.clone(), }, )); } } } // remove unsubscribed peers from the mesh if it exists for (peer_id, topic_hash) in unsubscribed_peers { self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); } // Potentially inform the handler if we have added this peer to a mesh for the first time. let topics_joined = topics_to_graft.iter().collect::>(); if !topics_joined.is_empty() { peer_added_to_mesh( *propagation_source, topics_joined, &self.mesh, self.peer_topics.get(propagation_source), &mut self.events, &self.connected_peers, ); } // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. if !topics_to_graft.is_empty() && self .send_message( *propagation_source, GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: topics_to_graft .into_iter() .map(|topic_hash| GossipsubControlAction::Graft { topic_hash }) .collect(), } .into_protobuf(), ) .is_err() { error!("Failed sending grafts. Message too large"); } // Notify the application of the subscriptions for event in application_event { self.events.push_back(event); } trace!( "Completed handling subscriptions from source: {:?}", propagation_source ); } /// Applies penalties to peers that did not respond to our IWANT requests. fn apply_iwant_penalties(&mut self) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { for (peer, count) in gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); } } } /// Heartbeat function which shifts the memcache and updates the mesh. fn heartbeat(&mut self) { debug!("Starting heartbeat"); self.heartbeat_ticks += 1; let mut to_graft = HashMap::new(); let mut to_prune = HashMap::new(); let mut no_px = HashSet::new(); // clean up expired backoffs self.backoffs.heartbeat(); // clean up ihave counters self.count_sent_iwant.clear(); self.count_received_ihave.clear(); // apply iwant penalties self.apply_iwant_penalties(); // check connections to explicit peers if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 { for p in self.explicit_peers.clone() { self.check_explicit_peer_connection(&p); } } // cache scores throughout the heartbeat let mut scores = HashMap::new(); let peer_score = &self.peer_score; let mut score = |p: &PeerId| match peer_score { Some((peer_score, ..)) => *scores.entry(*p).or_insert_with(|| peer_score.score(p)), _ => 0.0, }; // maintain the mesh for each topic for (topic_hash, peers) in self.mesh.iter_mut() { let explicit_peers = &self.explicit_peers; let backoffs = &self.backoffs; let topic_peers = &self.topic_peers; let outbound_peers = &self.outbound_peers; // drop all peers with negative score, without PX // if there is at some point a stable retain method for BTreeSet the following can be // written more efficiently with retain. let to_remove: Vec<_> = peers .iter() .filter(|&p| { if score(p) < 0.0 { debug!( "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \ {}]", p, score(p), topic_hash ); let current_topic = to_prune.entry(*p).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); no_px.insert(*p); true } else { false } }) .cloned() .collect(); for peer in to_remove { peers.remove(&peer); } // too little peers - add some if peers.len() < self.config.mesh_n_low() { debug!( "HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}", topic_hash, peers.len(), self.config.mesh_n_low() ); // not enough peers - get mesh_n - current_length more let desired_peers = self.config.mesh_n() - peers.len(); let peer_list = get_random_peers( topic_peers, &self.connected_peers, topic_hash, desired_peers, |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) && score(peer) >= 0.0 }, ); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } // update the mesh debug!("Updating mesh, new mesh: {:?}", peer_list); peers.extend(peer_list); } // too many peers - remove some if peers.len() > self.config.mesh_n_high() { debug!( "HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}", topic_hash, peers.len(), self.config.mesh_n_high() ); let excess_peer_no = peers.len() - self.config.mesh_n(); // shuffle the peers and then sort by score ascending beginning with the worst let mut rng = thread_rng(); let mut shuffled = peers.iter().cloned().collect::>(); shuffled.shuffle(&mut rng); shuffled .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal)); // shuffle everything except the last retain_scores many peers (the best ones) shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng); // count total number of outbound peers let mut outbound = { let outbound_peers = &self.outbound_peers; shuffled .iter() .filter(|p| outbound_peers.contains(*p)) .count() }; // remove the first excess_peer_no allowed (by outbound restrictions) peers adding // them to to_prune let mut removed = 0; for peer in shuffled { if removed == excess_peer_no { break; } if self.outbound_peers.contains(&peer) { if outbound <= self.config.mesh_outbound_min() { // do not remove anymore outbound peers continue; } else { // an outbound peer gets removed outbound -= 1; } } // remove the peer peers.remove(&peer); let current_topic = to_prune.entry(peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); removed += 1; } } // do we have enough outbound peers? if peers.len() >= self.config.mesh_n_low() { // count number of outbound peers we have let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() }; // if we have not enough outbound peers, graft to some new outbound peers if outbound < self.config.mesh_outbound_min() { let needed = self.config.mesh_outbound_min() - outbound; let peer_list = get_random_peers( topic_peers, &self.connected_peers, topic_hash, needed, |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) && score(peer) >= 0.0 && outbound_peers.contains(peer) }, ); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } // update the mesh debug!("Updating mesh, new mesh: {:?}", peer_list); peers.extend(peer_list); } } // should we try to improve the mesh with opportunistic grafting? if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0 && peers.len() > 1 && self.peer_score.is_some() { if let Some((_, thresholds, _, _)) = &self.peer_score { // Opportunistic grafting works as follows: we check the median score of peers // in the mesh; if this score is below the opportunisticGraftThreshold, we // select a few peers at random with score over the median. // The intention is to (slowly) improve an underperforming mesh by introducing // good scoring peers that may have been gossiping at us. This allows us to // get out of sticky situations where we are stuck with poor peers and also // recover from churn of good peers. // now compute the median peer score in the mesh let mut peers_by_score: Vec<_> = peers.iter().collect(); peers_by_score .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal)); let middle = peers_by_score.len() / 2; let median = if peers_by_score.len() % 2 == 0 { (score( *peers_by_score.get(middle - 1).expect( "middle < vector length and middle > 0 since peers.len() > 0", ), ) + score(*peers_by_score.get(middle).expect("middle < vector length"))) * 0.5 } else { score(*peers_by_score.get(middle).expect("middle < vector length")) }; // if the median score is below the threshold, select a better peer (if any) and // GRAFT if median < thresholds.opportunistic_graft_threshold { let peer_list = get_random_peers( topic_peers, &self.connected_peers, topic_hash, self.config.opportunistic_graft_peers(), |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) && score(peer) > median }, ); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } // update the mesh debug!( "Opportunistically graft in topic {} with peers {:?}", topic_hash, peer_list ); peers.extend(peer_list); } } } } // remove expired fanout topics { let fanout = &mut self.fanout; // help the borrow checker let fanout_ttl = self.config.fanout_ttl(); self.fanout_last_pub.retain(|topic_hash, last_pub_time| { if *last_pub_time + fanout_ttl < Instant::now() { debug!( "HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}", topic_hash ); fanout.remove(topic_hash); return false; } true }); } // maintain fanout // check if our peers are still a part of the topic for (topic_hash, peers) in self.fanout.iter_mut() { let mut to_remove_peers = Vec::new(); let publish_threshold = match &self.peer_score { Some((_, thresholds, _, _)) => thresholds.publish_threshold, _ => 0.0, }; for peer in peers.iter() { // is the peer still subscribed to the topic? match self.peer_topics.get(peer) { Some(topics) => { if !topics.contains(topic_hash) || score(peer) < publish_threshold { debug!( "HEARTBEAT: Peer removed from fanout for topic: {:?}", topic_hash ); to_remove_peers.push(*peer); } } None => { // remove if the peer has disconnected to_remove_peers.push(*peer); } } } for to_remove in to_remove_peers { peers.remove(&to_remove); } // not enough peers if peers.len() < self.config.mesh_n() { debug!( "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}", peers.len(), self.config.mesh_n() ); let needed_peers = self.config.mesh_n() - peers.len(); let explicit_peers = &self.explicit_peers; let new_peers = get_random_peers( &self.topic_peers, &self.connected_peers, topic_hash, needed_peers, |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && score(peer) < publish_threshold }, ); peers.extend(new_peers); } } if self.peer_score.is_some() { trace!("Peer_scores: {:?}", { for peer in self.peer_topics.keys() { score(peer); } scores }); trace!("Mesh message deliveries: {:?}", { self.mesh .iter() .map(|(t, peers)| { ( t.clone(), peers .iter() .map(|p| { ( *p, peer_score .as_ref() .expect("peer_score.is_some()") .0 .mesh_message_deliveries(p, t) .unwrap_or(0.0), ) }) .collect::>(), ) }) .collect::>>() }) } self.emit_gossip(); // send graft/prunes if !to_graft.is_empty() | !to_prune.is_empty() { self.send_graft_prune(to_graft, to_prune, no_px); } // piggyback pooled control messages self.flush_control_pool(); // shift the memcache self.mcache.shift(); debug!("Completed Heartbeat"); } /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh /// and fanout peers fn emit_gossip(&mut self) { let mut rng = thread_rng(); for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) { let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash); if message_ids.is_empty() { return; } // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list if message_ids.len() > self.config.max_ihave_length() { // we do the truncation (with shuffling) per peer below debug!( "too many messages for gossip; will truncate IHAVE list ({} messages)", message_ids.len() ); } else { // shuffle to emit in random order message_ids.shuffle(&mut rng); } // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy` let n_map = |m| { max( self.config.gossip_lazy(), (self.config.gossip_factor() * m as f64) as usize, ) }; // get gossip_lazy random peers let to_msg_peers = get_random_peers_dynamic( &self.topic_peers, &self.connected_peers, topic_hash, n_map, |peer| { !peers.contains(peer) && !self.explicit_peers.contains(peer) && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0 }, ); debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len()); for peer in to_msg_peers { let mut peer_message_ids = message_ids.clone(); if peer_message_ids.len() > self.config.max_ihave_length() { // We do this per peer so that we emit a different set for each peer. // we have enough redundancy in the system that this will significantly increase // the message coverage when we do truncate. peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length()); peer_message_ids.truncate(self.config.max_ihave_length()); } // send an IHAVE message Self::control_pool_add( &mut self.control_pool, peer, GossipsubControlAction::IHave { topic_hash: topic_hash.clone(), message_ids: peer_message_ids, }, ); } } } /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control /// messages. fn send_graft_prune( &mut self, to_graft: HashMap>, mut to_prune: HashMap>, no_px: HashSet, ) { // handle the grafts and overlapping prunes per peer for (peer, topics) in to_graft.into_iter() { for topic in &topics { // inform scoring of graft if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(&peer, topic.clone()); } // inform the handler of the peer being added to the mesh // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( peer, vec![topic], &self.mesh, self.peer_topics.get(&peer), &mut self.events, &self.connected_peers, ); } let mut control_msgs: Vec = topics .iter() .map(|topic_hash| GossipsubControlAction::Graft { topic_hash: topic_hash.clone(), }) .collect(); // If there are prunes associated with the same peer add them. // NOTE: In this case a peer has been added to a topic mesh, and removed from another. // It therefore must be in at least one mesh and we do not need to inform the handler // of its removal from another. if let Some(topics) = to_prune.remove(&peer) { let mut prunes = topics .iter() .map(|topic_hash| { self.make_prune( topic_hash, &peer, self.config.do_px() && !no_px.contains(&peer), ) }) .collect::>(); control_msgs.append(&mut prunes); } // send the control messages if self .send_message( peer, GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs, } .into_protobuf(), ) .is_err() { error!("Failed to send control messages. Message too large"); } } // handle the remaining prunes for (peer, topics) in to_prune.iter() { let mut remaining_prunes = Vec::new(); for topic_hash in topics { let prune = self.make_prune( topic_hash, peer, self.config.do_px() && !no_px.contains(peer), ); remaining_prunes.push(prune); // inform the handler peer_removed_from_mesh( *peer, topic_hash, &self.mesh, self.peer_topics.get(peer), &mut self.events, &self.connected_peers, ); } if self .send_message( *peer, GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: remaining_prunes, } .into_protobuf(), ) .is_err() { error!("Failed to send prune messages. Message too large"); } } } /// Helper function which forwards a message to mesh\[topic\] peers. /// /// Returns true if at least one peer was messaged. fn forward_msg( &mut self, msg_id: &MessageId, message: RawGossipsubMessage, propagation_source: Option<&PeerId>, ) -> Result { // message is fully validated inform peer_score if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(peer) = propagation_source { peer_score.deliver_message(peer, msg_id, &message.topic); } } debug!("Forwarding message: {:?}", msg_id); let mut recipient_peers = HashSet::new(); // add mesh peers let topic = &message.topic; // mesh if let Some(mesh_peers) = self.mesh.get(topic) { for peer_id in mesh_peers { if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() { recipient_peers.insert(*peer_id); } } } // Add explicit peers for p in &self.explicit_peers { if let Some(topics) = self.peer_topics.get(p) { if Some(p) != propagation_source && Some(p) != message.source.as_ref() && topics.contains(&message.topic) { recipient_peers.insert(*p); } } } // forward the message to peers if !recipient_peers.is_empty() { let event = GossipsubRpc { subscriptions: Vec::new(), messages: vec![message.clone()], control_msgs: Vec::new(), } .into_protobuf(); for peer in recipient_peers.iter() { debug!("Sending message: {:?} to peer {:?}", msg_id, peer); self.send_message(*peer, event.clone())?; } debug!("Completed forwarding message"); Ok(true) } else { Ok(false) } } /// Constructs a [`RawGossipsubMessage`] performing message signing if required. pub(crate) fn build_raw_message( &self, topic: TopicHash, data: Vec, ) -> Result { match &self.publish_config { PublishConfig::Signing { ref keypair, author, inline_key, } => { // Build and sign the message let sequence_number: u64 = rand::random(); let signature = { let message = rpc_proto::Message { from: Some(author.clone().to_bytes()), data: Some(data.clone()), seqno: Some(sequence_number.to_be_bytes().to_vec()), topic: topic.clone().into_string(), signature: None, key: None, }; let mut buf = Vec::with_capacity(message.encoded_len()); message .encode(&mut buf) .expect("Buffer has sufficient capacity"); // the signature is over the bytes "libp2p-pubsub:" let mut signature_bytes = SIGNING_PREFIX.to_vec(); signature_bytes.extend_from_slice(&buf); Some(keypair.sign(&signature_bytes)?) }; Ok(RawGossipsubMessage { source: Some(*author), data, // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(sequence_number), topic, signature, key: inline_key.clone(), validated: true, // all published messages are valid }) } PublishConfig::Author(peer_id) => { Ok(RawGossipsubMessage { source: Some(*peer_id), data, // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(rand::random()), topic, signature: None, key: None, validated: true, // all published messages are valid }) } PublishConfig::RandomAuthor => { Ok(RawGossipsubMessage { source: Some(PeerId::random()), data, // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(rand::random()), topic, signature: None, key: None, validated: true, // all published messages are valid }) } PublishConfig::Anonymous => { Ok(RawGossipsubMessage { source: None, data, // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: None, topic, signature: None, key: None, validated: true, // all published messages are valid }) } } } // adds a control action to control_pool fn control_pool_add( control_pool: &mut HashMap>, peer: PeerId, control: GossipsubControlAction, ) { control_pool .entry(peer) .or_insert_with(Vec::new) .push(control); } /// Takes each control action mapping and turns it into a message fn flush_control_pool(&mut self) { for (peer, controls) in self.control_pool.drain().collect::>() { if self .send_message( peer, GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: controls, } .into_protobuf(), ) .is_err() { error!("Failed to flush control pool. Message too large"); } } } /// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it /// is not already an arc. fn send_message( &mut self, peer_id: PeerId, message: rpc_proto::Rpc, ) -> Result<(), PublishError> { // If the message is oversized, try and fragment it. If it cannot be fragmented, log an // error and drop the message (all individual messages should be small enough to fit in the // max_transmit_size) let messages = self.fragment_message(message)?; for message in messages { self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, event: Arc::new(GossipsubHandlerIn::Message(message)), handler: NotifyHandler::Any, }) } Ok(()) } // If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC // messages to be sent. fn fragment_message(&self, rpc: rpc_proto::Rpc) -> Result, PublishError> { if rpc.encoded_len() < self.config.max_transmit_size() { return Ok(vec![rpc]); } let new_rpc = rpc_proto::Rpc { subscriptions: Vec::new(), publish: Vec::new(), control: None, }; let mut rpc_list = vec![new_rpc.clone()]; // Gets an RPC if the object size will fit, otherwise create a new RPC. The last element // will be the RPC to add an object. macro_rules! create_or_add_rpc { ($object_size: ident ) => { let list_index = rpc_list.len() - 1; // the list is never empty // create a new RPC if the new object plus 5% of its size (for length prefix // buffers) exceeds the max transmit size. if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize > self.config.max_transmit_size() && rpc_list[list_index] != new_rpc { // create a new rpc and use this as the current rpc_list.push(new_rpc.clone()); } }; } macro_rules! add_item { ($object: ident, $type: ident ) => { let object_size = $object.encoded_len(); if object_size + 2 > self.config.max_transmit_size() { // This should not be possible. All received and published messages have already // been vetted to fit within the size. error!("Individual message too large to fragment"); return Err(PublishError::MessageTooLarge); } create_or_add_rpc!(object_size); rpc_list .last_mut() .expect("Must have at least one element") .$type .push($object.clone()); }; } // Add messages until the limit for message in &rpc.publish { add_item!(message, publish); } for subscription in &rpc.subscriptions { add_item!(subscription, subscriptions); } // handle the control messages. If all are within the max_transmit_size, send them without // fragmenting, otherwise, fragment the control messages let empty_control = rpc_proto::ControlMessage::default(); if let Some(control) = rpc.control.as_ref() { if control.encoded_len() + 2 > self.config.max_transmit_size() { // fragment the RPC for ihave in &control.ihave { let len = ihave.encoded_len(); create_or_add_rpc!(len); rpc_list .last_mut() .expect("Always an element") .control .get_or_insert_with(|| empty_control.clone()) .ihave .push(ihave.clone()); } for iwant in &control.iwant { let len = iwant.encoded_len(); create_or_add_rpc!(len); rpc_list .last_mut() .expect("Always an element") .control .get_or_insert_with(|| empty_control.clone()) .iwant .push(iwant.clone()); } for graft in &control.graft { let len = graft.encoded_len(); create_or_add_rpc!(len); rpc_list .last_mut() .expect("Always an element") .control .get_or_insert_with(|| empty_control.clone()) .graft .push(graft.clone()); } for prune in &control.prune { let len = prune.encoded_len(); create_or_add_rpc!(len); rpc_list .last_mut() .expect("Always an element") .control .get_or_insert_with(|| empty_control.clone()) .prune .push(prune.clone()); } } else { let len = control.encoded_len(); create_or_add_rpc!(len); rpc_list.last_mut().expect("Always an element").control = Some(control.clone()); } } Ok(rpc_list) } } fn get_ip_addr(addr: &Multiaddr) -> Option { addr.iter().find_map(|p| match p { Ip4(addr) => Some(IpAddr::V4(addr)), Ip6(addr) => Some(IpAddr::V6(addr)), _ => None, }) } impl NetworkBehaviour for Gossipsub where C: Send + 'static + DataTransform, F: Send + 'static + TopicSubscriptionFilter, { type ProtocolsHandler = GossipsubHandler; type OutEvent = GossipsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { GossipsubHandler::new( self.config.protocol_id_prefix().clone(), self.config.max_transmit_size(), self.config.validation_mode().clone(), self.config.idle_timeout(), self.config.support_floodsub(), ) } fn inject_connected(&mut self, peer_id: &PeerId) { // Ignore connections from blacklisted peers. if self.blacklisted_peers.contains(peer_id) { debug!("Ignoring connection from blacklisted peer: {}", peer_id); } else { debug!("New peer connected: {}", peer_id); // We need to send our subscriptions to the newly-connected node. let mut subscriptions = vec![]; for topic_hash in self.mesh.keys() { subscriptions.push(GossipsubSubscription { topic_hash: topic_hash.clone(), action: GossipsubSubscriptionAction::Subscribe, }); } if !subscriptions.is_empty() { // send our subscriptions to the peer if self .send_message( *peer_id, GossipsubRpc { messages: Vec::new(), subscriptions, control_msgs: Vec::new(), } .into_protobuf(), ) .is_err() { error!("Failed to send subscriptions, message too large"); } } } // Insert an empty set of the topics of this peer until known. self.peer_topics.insert(*peer_id, Default::default()); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.add_peer(*peer_id); } } fn inject_disconnected(&mut self, peer_id: &PeerId) { // remove from mesh, topic_peers, peer_topic and the fanout debug!("Peer disconnected: {}", peer_id); { let topics = match self.peer_topics.get(peer_id) { Some(topics) => (topics), None => { debug_assert!( self.blacklisted_peers.contains(peer_id), "Disconnected node not in connected list" ); return; } }; // remove peer from all mappings for topic in topics { // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(topic) { // check if the peer is in the mesh and remove it mesh_peers.remove(peer_id); } // remove from topic_peers if let Some(peer_list) = self.topic_peers.get_mut(topic) { if !peer_list.remove(peer_id) { // debugging purposes warn!( "Disconnected node: {} not in topic_peers peer list", peer_id ); } } else { warn!( "Disconnected node: {} with topic: {:?} not in topic_peers", &peer_id, &topic ); } // remove from fanout self.fanout .get_mut(topic) .map(|peers| peers.remove(peer_id)); } } // Forget px and outbound status for this peer self.px_peers.remove(peer_id); self.outbound_peers.remove(peer_id); // Remove peer from peer_topics and connected_peers // NOTE: It is possible the peer has already been removed from all mappings if it does not // support the protocol. self.peer_topics.remove(peer_id); self.connected_peers.remove(peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.remove_peer(peer_id); } } fn inject_connection_established( &mut self, peer_id: &PeerId, connection_id: &ConnectionId, endpoint: &ConnectedPoint, _: Option<&Vec>, ) { // Check if the peer is an outbound peer if let ConnectedPoint::Dialer { .. } = endpoint { // Diverging from the go implementation we only want to consider a peer as outbound peer // if its first connection is outbound. To check if this connection is the first we // check if the peer isn't connected yet. This only works because the // `inject_connection_established` event for the first connection gets called immediately // before `inject_connected` gets called. if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) { // The first connection is outbound and it is not a peer from peer exchange => mark // it as outbound peer self.outbound_peers.insert(*peer_id); } } // Add the IP to the peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) { peer_score.add_ip(peer_id, ip); } else { trace!( "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", peer_id, endpoint ) } } // By default we assume a peer is only a floodsub peer. // // The protocol negotiation occurs once a message is sent/received. Once this happens we // update the type of peer that this is in order to determine which kind of routing should // occur. self.connected_peers .entry(*peer_id) .or_insert(PeerConnections { kind: PeerKind::Floodsub, connections: vec![*connection_id], }) .connections .push(*connection_id); } fn inject_connection_closed( &mut self, peer_id: &PeerId, connection_id: &ConnectionId, endpoint: &ConnectedPoint, _: ::Handler, ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) { peer_score.remove_ip(peer_id, &ip); } else { trace!( "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", peer_id, endpoint ) } } // Remove the connection from the list // If there are no connections left, inject_disconnected will remove the mapping entirely. if let Some(connections) = self.connected_peers.get_mut(peer_id) { let index = connections .connections .iter() .position(|v| v == connection_id) .expect("Previously established connection to peer must be present"); connections.connections.remove(index); // If there are more connections and this peer is in a mesh, inform the first connection // handler. if !connections.connections.is_empty() { if let Some(topics) = self.peer_topics.get(peer_id) { for topic in topics { if let Some(mesh_peers) = self.mesh.get(topic) { if mesh_peers.contains(peer_id) { self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id: *peer_id, event: Arc::new(GossipsubHandlerIn::JoinedMesh), handler: NotifyHandler::One(connections.connections[0]), }); break; } } } } } } } fn inject_address_change( &mut self, peer: &PeerId, _: &ConnectionId, endpoint_old: &ConnectedPoint, endpoint_new: &ConnectedPoint, ) { // Exchange IP in peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) { peer_score.remove_ip(peer, &ip); } else { trace!( "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", peer, endpoint_old ) } if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) { peer_score.add_ip(peer, ip); } else { trace!( "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", peer, endpoint_new ) } } } fn inject_event( &mut self, propagation_source: PeerId, _: ConnectionId, handler_event: HandlerEvent, ) { match handler_event { HandlerEvent::PeerKind(kind) => { // We have identified the protocol this peer is using if let PeerKind::NotSupported = kind { debug!( "Peer does not support gossipsub protocols. {}", propagation_source ); self.events.push_back(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::GossipsubNotSupported { peer_id: propagation_source, }, )); } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) { // Only change the value if the old value is Floodsub (the default set in // inject_connected). All other PeerKind changes are ignored. debug!( "New peer type found: {} for peer: {}", kind, propagation_source ); if let PeerKind::Floodsub = conn.kind { conn.kind = kind; } } } HandlerEvent::Message { rpc, invalid_messages, } => { // Handle the gossipsub RPC // Handle subscriptions // Update connected peers topics if !rpc.subscriptions.is_empty() { self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source); } // Check if peer is graylisted in which case we ignore the event if let (true, _) = self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold) { debug!("RPC Dropped from greylisted peer {}", propagation_source); return; } // Handle any invalid messages from this peer if self.peer_score.is_some() { for (raw_message, validation_error) in invalid_messages { self.handle_invalid_message( &propagation_source, raw_message, validation_error, ) } } else { // log the invalid messages for (message, validation_error) in invalid_messages { warn!( "Invalid message. Reason: {:?} propagation_peer {} source {:?}", validation_error, propagation_source.to_string(), message.source ); } } // Handle messages for (count, raw_message) in rpc.messages.into_iter().enumerate() { // Only process the amount of messages the configuration allows. if self.config.max_messages_per_rpc().is_some() && Some(count) >= self.config.max_messages_per_rpc() { warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count); break; } self.handle_received_message(raw_message, &propagation_source); } // Handle control messages // group some control messages, this minimises SendEvents (code is simplified to handle each event at a time however) let mut ihave_msgs = vec![]; let mut graft_msgs = vec![]; let mut prune_msgs = vec![]; for control_msg in rpc.control_msgs { match control_msg { GossipsubControlAction::IHave { topic_hash, message_ids, } => { ihave_msgs.push((topic_hash, message_ids)); } GossipsubControlAction::IWant { message_ids } => { self.handle_iwant(&propagation_source, message_ids) } GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash), GossipsubControlAction::Prune { topic_hash, peers, backoff, } => prune_msgs.push((topic_hash, peers, backoff)), } } if !ihave_msgs.is_empty() { self.handle_ihave(&propagation_source, ihave_msgs); } if !graft_msgs.is_empty() { self.handle_graft(&propagation_source, graft_msgs); } if !prune_msgs.is_empty() { self.handle_prune(&propagation_source, prune_msgs); } } } } fn poll( &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, ) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event.map_in(|e: Arc| { // clone send event reference if others references are present Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone()) })); } // update scores if let Some((peer_score, _, interval, _)) = &mut self.peer_score { while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) { peer_score.refresh_scores(); } } while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) { self.heartbeat(); } Poll::Pending } } /// This is called when peers are added to any mesh. It checks if the peer existed /// in any other mesh. If this is the first mesh they have joined, it queues a message to notify /// the appropriate connection handler to maintain a connection. fn peer_added_to_mesh( peer_id: PeerId, new_topics: Vec<&TopicHash>, mesh: &HashMap>, known_topics: Option<&BTreeSet>, events: &mut VecDeque, connections: &HashMap, ) { // Ensure there is an active connection let connection_id = { let conn = connections.get(&peer_id).expect("To be connected to peer."); assert!( !conn.connections.is_empty(), "Must have at least one connection" ); conn.connections[0] }; if let Some(topics) = known_topics { for topic in topics { if !new_topics.contains(&topic) { if let Some(mesh_peers) = mesh.get(topic) { if mesh_peers.contains(&peer_id) { // the peer is already in a mesh for another topic return; } } } } } // This is the first mesh the peer has joined, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, event: Arc::new(GossipsubHandlerIn::JoinedMesh), handler: NotifyHandler::One(connection_id), }); } /// This is called when peers are removed from a mesh. It checks if the peer exists /// in any other mesh. If this is the last mesh they have joined, we return true, in order to /// notify the handler to no longer maintain a connection. fn peer_removed_from_mesh( peer_id: PeerId, old_topic: &TopicHash, mesh: &HashMap>, known_topics: Option<&BTreeSet>, events: &mut VecDeque, connections: &HashMap, ) { // Ensure there is an active connection let connection_id = connections .get(&peer_id) .expect("To be connected to peer.") .connections .get(0) .expect("There should be at least one connection to a peer."); if let Some(topics) = known_topics { for topic in topics { if topic != old_topic { if let Some(mesh_peers) = mesh.get(topic) { if mesh_peers.contains(&peer_id) { // the peer exists in another mesh still return; } } } } } // The peer is not in any other mesh, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, event: Arc::new(GossipsubHandlerIn::LeftMesh), handler: NotifyHandler::One(*connection_id), }); } /// Helper function to get a subset of random gossipsub peers for a `topic_hash` /// filtered by the function `f`. The number of peers to get equals the output of `n_map` /// that gets as input the number of filtered peers. fn get_random_peers_dynamic( topic_peers: &HashMap>, connected_peers: &HashMap, topic_hash: &TopicHash, // maps the number of total peers to the number of selected peers n_map: impl Fn(usize) -> usize, mut f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { let mut gossip_peers = match topic_peers.get(topic_hash) { // if they exist, filter the peers by `f` Some(peer_list) => peer_list .iter() .cloned() .filter(|p| { f(p) && match connected_peers.get(p) { Some(connections) if connections.kind == PeerKind::Gossipsub => true, Some(connections) if connections.kind == PeerKind::Gossipsubv1_1 => true, _ => false, } }) .collect(), None => Vec::new(), }; // if we have less than needed, return them let n = n_map(gossip_peers.len()); if gossip_peers.len() <= n { debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len()); return gossip_peers.into_iter().collect(); } // we have more peers than needed, shuffle them and return n of them let mut rng = thread_rng(); gossip_peers.partial_shuffle(&mut rng, n); debug!("RANDOM PEERS: Got {:?} peers", n); gossip_peers.into_iter().take(n).collect() } /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` /// filtered by the function `f`. fn get_random_peers( topic_peers: &HashMap>, connected_peers: &HashMap, topic_hash: &TopicHash, n: usize, f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { get_random_peers_dynamic(topic_peers, connected_peers, topic_hash, |_| n, f) } /// Validates the combination of signing, privacy and message validation to ensure the /// configuration will not reject published messages. fn validate_config( authenticity: &MessageAuthenticity, validation_mode: &ValidationMode, ) -> Result<(), &'static str> { match validation_mode { ValidationMode::Anonymous => { if authenticity.is_signing() { return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity"); } if !authenticity.is_anonymous() { return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config"); } } ValidationMode::Strict => { if !authenticity.is_signing() { return Err( "Messages will be published unsigned and incoming unsigned messages will be rejected. Consider adjusting the validation or privacy settings in the config" ); } } _ => {} } Ok(()) } impl fmt::Debug for Gossipsub { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Gossipsub") .field("config", &self.config) .field("events", &self.events.len()) .field("control_pool", &self.control_pool) .field("publish_config", &self.publish_config) .field("topic_peers", &self.topic_peers) .field("peer_topics", &self.peer_topics) .field("mesh", &self.mesh) .field("fanout", &self.fanout) .field("fanout_last_pub", &self.fanout_last_pub) .field("mcache", &self.mcache) .field("heartbeat", &self.heartbeat) .finish() } } impl fmt::Debug for PublishConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { PublishConfig::Signing { author, .. } => { f.write_fmt(format_args!("PublishConfig::Signing({})", author)) } PublishConfig::Author(author) => { f.write_fmt(format_args!("PublishConfig::Author({})", author)) } PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")), PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")), } } } #[cfg(test)] mod local_test { use super::*; use crate::IdentTopic; use asynchronous_codec::Encoder; use quickcheck::*; use rand::Rng; fn empty_rpc() -> GossipsubRpc { GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: Vec::new(), } } fn test_message() -> RawGossipsubMessage { RawGossipsubMessage { source: Some(PeerId::random()), data: vec![0; 100], sequence_number: None, topic: TopicHash::from_raw("test_topic"), signature: None, key: None, validated: false, } } fn test_subscription() -> GossipsubSubscription { GossipsubSubscription { action: GossipsubSubscriptionAction::Subscribe, topic_hash: IdentTopic::new("TestTopic").hash(), } } fn test_control() -> GossipsubControlAction { GossipsubControlAction::IHave { topic_hash: IdentTopic::new("TestTopic").hash(), message_ids: vec![MessageId(vec![12u8]); 5], } } impl Arbitrary for GossipsubRpc { fn arbitrary(g: &mut G) -> Self { let mut rpc = empty_rpc(); for _ in 0..g.gen_range(0, 10) { rpc.subscriptions.push(test_subscription()); } for _ in 0..g.gen_range(0, 10) { rpc.messages.push(test_message()); } for _ in 0..g.gen_range(0, 10) { rpc.control_msgs.push(test_control()); } rpc } } #[test] /// Tests RPC message fragmentation fn test_message_fragmentation_deterministic() { let max_transmit_size = 500; let config = crate::GossipsubConfigBuilder::default() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() .unwrap(); let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); // Message under the limit should be fine. let mut rpc = empty_rpc(); rpc.messages.push(test_message()); let mut rpc_proto = rpc.clone().into_protobuf(); let fragmented_messages = gs.fragment_message(rpc_proto.clone()).unwrap(); assert_eq!( fragmented_messages, vec![rpc_proto.clone()], "Messages under the limit shouldn't be fragmented" ); // Messages over the limit should be split while rpc_proto.encoded_len() < max_transmit_size { rpc.messages.push(test_message()); rpc_proto = rpc.clone().into_protobuf(); } let fragmented_messages = gs .fragment_message(rpc_proto) .expect("Should be able to fragment the messages"); assert!( fragmented_messages.len() > 1, "the message should be fragmented" ); // all fragmented messages should be under the limit for message in fragmented_messages { assert!( message.encoded_len() < max_transmit_size, "all messages should be less than the transmission size" ); } } #[test] fn test_message_fragmentation() { fn prop(rpc: GossipsubRpc) { let max_transmit_size = 500; let config = crate::GossipsubConfigBuilder::default() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() .unwrap(); let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); let mut length_codec = unsigned_varint::codec::UviBytes::default(); length_codec.set_max_len(max_transmit_size); let mut codec = crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive); let rpc_proto = rpc.into_protobuf(); let fragmented_messages = gs .fragment_message(rpc_proto.clone()) .expect("Messages must be valid"); if rpc_proto.encoded_len() < max_transmit_size { assert_eq!( fragmented_messages.len(), 1, "the message should not be fragmented" ); } else { assert!( fragmented_messages.len() > 1, "the message should be fragmented" ); } // all fragmented messages should be under the limit for message in fragmented_messages { assert!( message.encoded_len() < max_transmit_size, "all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size ); // ensure they can all be encoded let mut buf = bytes::BytesMut::with_capacity(message.encoded_len()); codec.encode(message, &mut buf).unwrap() } } QuickCheck::new() .max_tests(100) .quickcheck(prop as fn(_) -> _) } }