protocols/gossipsub: Rework connection keep-alive (#2043)

Keep connections to peers in a mesh alive. Allow closing idle connections to
peers not in a mesh.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Age Manning
2021-05-14 17:16:50 +10:00
committed by GitHub
parent 9c5dd84099
commit c5bcada2c2
9 changed files with 589 additions and 253 deletions

View File

@ -50,7 +50,7 @@ use crate::backoff::BackoffStorage;
use crate::config::{GossipsubConfig, ValidationMode};
use crate::error::{PublishError, SubscriptionError, ValidationError};
use crate::gossip_promises::GossipPromises;
use crate::handler::{GossipsubHandler, HandlerEvent};
use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent};
use crate::mcache::MessageCache;
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
@ -62,7 +62,7 @@ use crate::types::{
FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription,
GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
};
use crate::types::{GossipsubRpc, PeerKind};
use crate::types::{GossipsubRpc, PeerConnections, PeerKind};
use crate::{rpc_proto, TopicScoreParams};
use std::{cmp::Ordering::Equal, fmt::Debug};
@ -193,7 +193,8 @@ impl From<MessageAuthenticity> for PublishConfig {
}
}
type GossipsubNetworkBehaviourAction = NetworkBehaviourAction<Arc<rpc_proto::Rpc>, GossipsubEvent>;
type GossipsubNetworkBehaviourAction =
NetworkBehaviourAction<Arc<GossipsubHandlerIn>, GossipsubEvent>;
/// Network behaviour that handles the gossipsub protocol.
///
@ -226,9 +227,9 @@ pub struct Gossipsub<
/// duplicates from being propagated to the application and on the network.
duplicate_cache: DuplicateCache<MessageId>,
/// A map of peers to their protocol kind. This is to identify different kinds of gossipsub
/// peers.
peer_protocols: HashMap<PeerId, PeerKind>,
/// A set of connected peers, indexed by their [`PeerId`]. tracking both the [`PeerKind`] and
/// the set of [`ConnectionId`]s.
connected_peers: HashMap<PeerId, PeerConnections>,
/// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids.
topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,
@ -414,7 +415,7 @@ where
peer_score: None,
count_received_ihave: HashMap::new(),
count_sent_iwant: HashMap::new(),
peer_protocols: HashMap::new(),
connected_peers: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
subscription_filter,
@ -460,7 +461,7 @@ where
/// Lists all known peers and their associated protocol.
pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
self.peer_protocols.iter()
self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
}
/// Returns the gossipsub score for a given peer, if one exists.
@ -489,17 +490,15 @@ where
// send subscription request to all peers
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Arc::new(
GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
let event = GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf();
for peer in peer_list {
debug!("Sending SUBSCRIBE to peer: {:?}", peer);
@ -531,17 +530,15 @@ where
// announce to all peers
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Arc::new(
GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Unsubscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
let event = GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Unsubscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf();
for peer in peer_list {
debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
@ -580,14 +577,12 @@ where
topic: raw_message.topic.clone(),
});
let event = Arc::new(
GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![raw_message.clone()],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
let event = GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![raw_message.clone()],
control_msgs: Vec::new(),
}
.into_protobuf();
// check that the size doesn't exceed the max transmission size
if event.encoded_len() > self.config.max_transmit_size() {
@ -634,8 +629,8 @@ where
}
// Floodsub peers
for (peer, kind) in &self.peer_protocols {
if kind == &PeerKind::Floodsub
for (peer, connections) in &self.connected_peers {
if connections.kind == PeerKind::Floodsub
&& !self
.score_below_threshold(peer, |ts| ts.publish_threshold)
.0
@ -657,7 +652,7 @@ where
let mesh_n = self.config.mesh_n();
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&self.connected_peers,
&topic_hash,
mesh_n,
{
@ -898,6 +893,7 @@ where
add_peers, topic_hash
);
added_peers.extend(peers.iter().cloned().take(add_peers));
self.mesh.insert(
topic_hash.clone(),
peers.into_iter().take(add_peers).collect(),
@ -911,7 +907,7 @@ where
// get the peers
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&self.connected_peers,
topic_hash,
self.config.mesh_n() - added_peers.len(),
|peer| {
@ -947,6 +943,16 @@ where
topic_hash: topic_hash.clone(),
},
);
// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
peer_id,
vec![topic_hash],
&self.mesh,
self.peer_topics.get(&peer_id),
&mut self.events,
&self.connected_peers,
);
}
debug!("Completed JOIN for topic: {:?}", topic_hash);
}
@ -962,7 +968,7 @@ where
peer_score.prune(peer, topic_hash.clone());
}
match self.peer_protocols.get(peer) {
match self.connected_peers.get(peer).map(|v| &v.kind) {
Some(PeerKind::Floodsub) => {
error!("Attempted to prune a Floodsub peer");
}
@ -984,7 +990,7 @@ where
let peers = if do_px {
get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&self.connected_peers,
&topic_hash,
self.config.prune_peers(),
|p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
@ -1018,6 +1024,16 @@ where
info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
let control = self.make_prune(topic_hash, &peer, self.config.do_px());
Self::control_pool_add(&mut self.control_pool, peer, control);
// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
peer,
topic_hash,
&self.mesh,
self.peer_topics.get(&peer),
&mut self.events,
&self.connected_peers,
);
}
}
debug!("Completed LEAVE for topic: {:?}", topic_hash);
@ -1074,10 +1090,7 @@ where
}
// IHAVE flood protection
let peer_have = self
.count_received_ihave
.entry(*peer_id)
.or_insert(0);
let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
*peer_have += 1;
if *peer_have > self.config.max_ihave_messages() {
debug!(
@ -1201,10 +1214,7 @@ where
if !cached_messages.is_empty() {
debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
// Send the messages to the peer
let message_list = cached_messages
.into_iter()
.map(|entry| entry.1)
.collect();
let message_list = cached_messages.into_iter().map(|entry| entry.1).collect();
if self
.send_message(
*peer_id,
@ -1282,7 +1292,7 @@ where
}
}
//check the score
// check the score
if below_zero {
// we don't GRAFT peers with negative score
debug!(
@ -1312,6 +1322,15 @@ where
peer_id, &topic_hash
);
peers.insert(*peer_id);
// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
*peer_id,
vec![&topic_hash],
&self.mesh,
self.peer_topics.get(&peer_id),
&mut self.events,
&self.connected_peers,
);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(peer_id, topic_hash);
@ -1381,6 +1400,16 @@ where
}
update_backoff = true;
// inform the handler
peer_removed_from_mesh(
*peer_id,
topic_hash,
&self.mesh,
self.peer_topics.get(&peer_id),
&mut self.events,
&self.connected_peers,
);
}
}
if update_backoff {
@ -1605,10 +1634,7 @@ where
.or_insert_with(|| msg_id.clone());
}
if !self.duplicate_cache.insert(msg_id.clone()) {
debug!(
"Message already received, ignoring. Message: {}",
msg_id
);
debug!("Message already received, ignoring. Message: {}", msg_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
}
@ -1710,8 +1736,8 @@ where
}
};
// Collect potential graft messages for the peer.
let mut grafts = Vec::new();
// Collect potential graft topics for the peer.
let mut topics_to_graft = Vec::new();
// Notify the application about the subscription, after the grafts are sent.
let mut application_event = Vec::new();
@ -1753,7 +1779,11 @@ where
// if the mesh needs peers add the peer to the mesh
if !self.explicit_peers.contains(propagation_source)
&& match self.peer_protocols.get(propagation_source) {
&& match self
.connected_peers
.get(propagation_source)
.map(|v| &v.kind)
{
Some(PeerKind::Gossipsubv1_1) => true,
Some(PeerKind::Gossipsub) => true,
_ => false,
@ -1787,9 +1817,7 @@ where
peer_score
.graft(propagation_source, subscription.topic_hash.clone());
}
grafts.push(GossipsubControlAction::Graft {
topic_hash: subscription.topic_hash.clone(),
});
topics_to_graft.push(subscription.topic_hash.clone());
}
}
}
@ -1811,8 +1839,7 @@ where
}
// remove topic from the peer_topics mapping
subscribed_topics.remove(&subscription.topic_hash);
unsubscribed_peers
.push((*propagation_source, subscription.topic_hash.clone()));
unsubscribed_peers.push((*propagation_source, subscription.topic_hash.clone()));
// generate an unsubscribe event to be polled
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Unsubscribed {
@ -1829,16 +1856,32 @@ where
self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
}
// Potentially inform the handler if we have added this peer to a mesh for the first time.
let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
if !topics_joined.is_empty() {
peer_added_to_mesh(
propagation_source.clone(),
topics_joined,
&self.mesh,
self.peer_topics.get(propagation_source),
&mut self.events,
&self.connected_peers,
);
}
// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
if !grafts.is_empty()
if !topics_to_graft.is_empty()
&& self
.send_message(
*propagation_source,
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: grafts,
control_msgs: topics_to_graft
.into_iter()
.map(|topic_hash| GossipsubControlAction::Graft { topic_hash })
.collect(),
}
.into_protobuf(),
)
@ -1898,9 +1941,7 @@ where
let mut scores = HashMap::new();
let peer_score = &self.peer_score;
let mut score = |p: &PeerId| match peer_score {
Some((peer_score, ..)) => *scores
.entry(*p)
.or_insert_with(|| peer_score.score(p)),
Some((peer_score, ..)) => *scores.entry(*p).or_insert_with(|| peer_score.score(p)),
_ => 0.0,
};
@ -1952,7 +1993,7 @@ where
let desired_peers = self.config.mesh_n() - peers.len();
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
&self.connected_peers,
topic_hash,
desired_peers,
|peer| {
@ -2008,15 +2049,15 @@ where
}
if self.outbound_peers.contains(&peer) {
if outbound <= self.config.mesh_outbound_min() {
//do not remove anymore outbound peers
// do not remove anymore outbound peers
continue;
} else {
//an outbound peer gets removed
// an outbound peer gets removed
outbound -= 1;
}
}
//remove the peer
// remove the peer
peers.remove(&peer);
let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
@ -2034,7 +2075,7 @@ where
let needed = self.config.mesh_outbound_min() - outbound;
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
&self.connected_peers,
topic_hash,
needed,
|peer| {
@ -2091,7 +2132,7 @@ where
if median < thresholds.opportunistic_graft_threshold {
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
&self.connected_peers,
topic_hash,
self.config.opportunistic_graft_peers(),
|peer| {
@ -2174,7 +2215,7 @@ where
let explicit_peers = &self.explicit_peers;
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&self.connected_peers,
topic_hash,
needed_peers,
|peer| {
@ -2268,7 +2309,7 @@ where
// get gossip_lazy random peers
let to_msg_peers = get_random_peers_dynamic(
&self.topic_peers,
&self.peer_protocols,
&self.connected_peers,
&topic_hash,
n_map,
|peer| {
@ -2313,12 +2354,23 @@ where
no_px: HashSet<PeerId>,
) {
// handle the grafts and overlapping prunes per peer
for (peer, topics) in to_graft.iter() {
for topic in topics {
//inform scoring of graft
for (peer, topics) in to_graft.into_iter() {
for topic in &topics {
// inform scoring of graft
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(peer, topic.clone());
peer_score.graft(&peer, topic.clone());
}
// inform the handler of the peer being added to the mesh
// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
peer,
vec![topic],
&self.mesh,
self.peer_topics.get(&peer),
&mut self.events,
&self.connected_peers,
);
}
let mut control_msgs: Vec<GossipsubControlAction> = topics
.iter()
@ -2328,14 +2380,17 @@ where
.collect();
// If there are prunes associated with the same peer add them.
if let Some(topics) = to_prune.remove(peer) {
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
// It therefore must be in at least one mesh and we do not need to inform the handler
// of its removal from another.
if let Some(topics) = to_prune.remove(&peer) {
let mut prunes = topics
.iter()
.map(|topic_hash| {
self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
&peer,
self.config.do_px() && !no_px.contains(&peer),
)
})
.collect::<Vec<_>>();
@ -2345,7 +2400,7 @@ where
// send the control messages
if self
.send_message(
*peer,
peer,
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
@ -2361,16 +2416,25 @@ where
// handle the remaining prunes
for (peer, topics) in to_prune.iter() {
let remaining_prunes = topics
.iter()
.map(|topic_hash| {
self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
)
})
.collect();
let mut remaining_prunes = Vec::new();
for topic_hash in topics {
let prune = self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
);
remaining_prunes.push(prune);
// inform the handler
peer_removed_from_mesh(
peer.clone(),
topic_hash,
&self.mesh,
self.peer_topics.get(&peer),
&mut self.events,
&self.connected_peers,
);
}
if self
.send_message(
*peer,
@ -2432,14 +2496,12 @@ where
// forward the message to peers
if !recipient_peers.is_empty() {
let event = Arc::new(
GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
let event = GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
control_msgs: Vec::new(),
}
.into_protobuf();
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
@ -2579,7 +2641,7 @@ where
fn send_message(
&mut self,
peer_id: PeerId,
message: impl Into<Arc<rpc_proto::Rpc>>,
message: rpc_proto::Rpc,
) -> Result<(), PublishError> {
// If the message is oversized, try and fragment it. If it cannot be fragmented, log an
// error and drop the message (all individual messages should be small enough to fit in the
@ -2591,7 +2653,7 @@ where
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: message,
event: Arc::new(GossipsubHandlerIn::Message(message)),
handler: NotifyHandler::Any,
})
}
@ -2600,10 +2662,7 @@ where
// If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC
// messages to be sent.
fn fragment_message(
&self,
rpc: Arc<rpc_proto::Rpc>,
) -> Result<Vec<Arc<rpc_proto::Rpc>>, PublishError> {
fn fragment_message(&self, rpc: rpc_proto::Rpc) -> Result<Vec<rpc_proto::Rpc>, PublishError> {
if rpc.encoded_len() < self.config.max_transmit_size() {
return Ok(vec![rpc]);
}
@ -2719,7 +2778,7 @@ where
}
}
Ok(rpc_list.into_iter().map(Arc::new).collect())
Ok(rpc_list)
}
}
@ -2744,6 +2803,7 @@ where
self.config.protocol_id_prefix().clone(),
self.config.max_transmit_size(),
self.config.validation_mode().clone(),
self.config.idle_timeout().clone(),
self.config.support_floodsub(),
)
}
@ -2790,15 +2850,6 @@ where
// Insert an empty set of the topics of this peer until known.
self.peer_topics.insert(*peer_id, Default::default());
// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
self.peer_protocols
.entry(*peer_id)
.or_insert(PeerKind::Floodsub);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(*peer_id);
}
@ -2853,11 +2904,11 @@ where
self.outbound_peers.remove(peer_id);
}
// Remove peer from peer_topics and peer_protocols
// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(peer_id);
self.peer_protocols.remove(peer_id);
self.connected_peers.remove(peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(peer_id);
@ -2867,7 +2918,7 @@ where
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// Ignore connections from blacklisted peers.
@ -2901,26 +2952,73 @@ where
)
}
}
// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
self.connected_peers
.entry(*peer_id)
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![*connection_id],
})
.connections
.push(*connection_id);
}
fn inject_connection_closed(
&mut self,
peer: &PeerId,
_: &ConnectionId,
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// Remove IP from peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
peer_score.remove_ip(peer, &ip);
peer_score.remove_ip(peer_id, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
peer_id,
endpoint
)
}
}
// Remove the connection from the list
// If there are no connections left, inject_disconnected will remove the mapping entirely.
if let Some(connections) = self.connected_peers.get_mut(peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == connection_id)
.expect(
"Previously established connection to a non-black-listed peer to be present",
);
connections.connections.remove(index);
// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(topics) = self.peer_topics.get(peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(peer_id) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connections.connections[0]),
});
break;
}
}
}
}
}
}
}
fn inject_address_change(
@ -2969,15 +3067,15 @@ where
);
// We treat this peer as disconnected
self.inject_disconnected(&propagation_source);
} else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) {
} else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
// Only change the value if the old value is Floodsub (the default set in
// inject_connected). All other PeerKind changes are ignored.
debug!(
"New peer type found: {} for peer: {}",
kind, propagation_source
);
if let PeerKind::Floodsub = *old_kind {
*old_kind = kind;
if let PeerKind::Floodsub = conn.kind {
conn.kind = kind;
}
}
}
@ -3126,12 +3224,92 @@ where
}
}
/// This is called when peers are added to any mesh. It checks if the peer existed
/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify
/// the appropriate connection handler to maintain a connection.
fn peer_added_to_mesh(
peer_id: PeerId,
new_topics: Vec<&TopicHash>,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<GossipsubNetworkBehaviourAction>,
connections: &HashMap<PeerId, PeerConnections>,
) {
// Ensure there is an active connection
let connection_id = {
let conn = connections.get(&peer_id).expect("To be connected to peer.");
assert!(
!conn.connections.is_empty(),
"Must have at least one connection"
);
conn.connections[0]
};
if let Some(topics) = known_topics {
for topic in topics {
if !new_topics.contains(&topic) {
if let Some(mesh_peers) = mesh.get(topic) {
if mesh_peers.contains(&peer_id) {
// the peer is already in a mesh for another topic
return;
}
}
}
}
}
// This is the first mesh the peer has joined, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connection_id),
});
}
/// This is called when peers are removed from a mesh. It checks if the peer exists
/// in any other mesh. If this is the last mesh they have joined, we return true, in order to
/// notify the handler to no longer maintain a connection.
fn peer_removed_from_mesh(
peer_id: PeerId,
old_topic: &TopicHash,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<GossipsubNetworkBehaviourAction>,
connections: &HashMap<PeerId, PeerConnections>,
) {
// Ensure there is an active connection
let connection_id = connections
.get(&peer_id)
.expect("To be connected to peer.")
.connections
.get(0)
.expect("There should be at least one connection to a peer.");
if let Some(topics) = known_topics {
for topic in topics {
if topic != old_topic {
if let Some(mesh_peers) = mesh.get(topic) {
if mesh_peers.contains(&peer_id) {
// the peer exists in another mesh still
return;
}
}
}
}
}
// The peer is not in any other mesh, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: Arc::new(GossipsubHandlerIn::LeftMesh),
handler: NotifyHandler::One(*connection_id),
});
}
/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
/// that gets as input the number of filtered peers.
fn get_random_peers_dynamic(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
connected_peers: &HashMap<PeerId, PeerConnections>,
topic_hash: &TopicHash,
// maps the number of total peers to the number of selected peers
n_map: impl Fn(usize) -> usize,
@ -3143,9 +3321,9 @@ fn get_random_peers_dynamic(
.iter()
.cloned()
.filter(|p| {
f(p) && match peer_protocols.get(p) {
Some(PeerKind::Gossipsub) => true,
Some(PeerKind::Gossipsubv1_1) => true,
f(p) && match connected_peers.get(p) {
Some(connections) if connections.kind == PeerKind::Gossipsub => true,
Some(connections) if connections.kind == PeerKind::Gossipsubv1_1 => true,
_ => false,
}
})
@ -3173,12 +3351,12 @@ fn get_random_peers_dynamic(
/// filtered by the function `f`.
fn get_random_peers(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
connected_peers: &HashMap<PeerId, PeerConnections>,
topic_hash: &TopicHash,
n: usize,
f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
get_random_peers_dynamic(topic_peers, connected_peers, topic_hash, |_| n, f)
}
/// Validates the combination of signing, privacy and message validation to ensure the
@ -3319,10 +3497,10 @@ mod local_test {
rpc.messages.push(test_message());
let mut rpc_proto = rpc.clone().into_protobuf();
let fragmented_messages = gs.fragment_message(Arc::new(rpc_proto.clone())).unwrap();
let fragmented_messages = gs.fragment_message(rpc_proto.clone()).unwrap();
assert_eq!(
fragmented_messages,
vec![Arc::new(rpc_proto.clone())],
vec![rpc_proto.clone()],
"Messages under the limit shouldn't be fragmented"
);
@ -3334,7 +3512,7 @@ mod local_test {
}
let fragmented_messages = gs
.fragment_message(Arc::new(rpc_proto))
.fragment_message(rpc_proto)
.expect("Should be able to fragment the messages");
assert!(
@ -3369,7 +3547,7 @@ mod local_test {
let rpc_proto = rpc.into_protobuf();
let fragmented_messages = gs
.fragment_message(Arc::new(rpc_proto.clone()))
.fragment_message(rpc_proto.clone())
.expect("Messages must be valid");
if rpc_proto.encoded_len() < max_transmit_size {
@ -3394,9 +3572,7 @@ mod local_test {
// ensure they can all be encoded
let mut buf = bytes::BytesMut::with_capacity(message.encoded_len());
codec
.encode(Arc::try_unwrap(message).unwrap(), &mut buf)
.unwrap()
codec.encode(message, &mut buf).unwrap()
}
}
QuickCheck::new()