diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 350c3d7a..69e0f4f3 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,8 @@ # 0.33.0 [unreleased] +- Improve internal peer tracking. + [PR 2175](https://github.com/libp2p/rust-libp2p/pull/2175) + - Update dependencies. - Allow `message_id_fn`s to accept closures that capture variables. diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index a4f4ec24..803f79c9 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1241,6 +1241,15 @@ where let mut do_px = self.config.do_px(); + // For each topic, if a peer has grafted us, then we necessarily must be in their mesh + // and they must be subscribed to the topic. Ensure we have recorded the mapping. + for topic in &topics { + self.peer_topics + .entry(*peer_id) + .or_default() + .insert(topic.clone()); + } + // we don't GRAFT to/from explicit peers; complain loudly if this happens if self.explicit_peers.contains(peer_id) { warn!("GRAFT: ignoring request from direct peer {}", peer_id); @@ -1283,7 +1292,7 @@ where peer_score.add_penalty(peer_id, 1); } } - //no PX + // no PX do_px = false; to_prune_topics.insert(topic_hash.clone()); @@ -2808,34 +2817,33 @@ where // Ignore connections from blacklisted peers. if self.blacklisted_peers.contains(peer_id) { debug!("Ignoring connection from blacklisted peer: {}", peer_id); - return; - } + } else { + debug!("New peer connected: {}", peer_id); + // We need to send our subscriptions to the newly-connected node. + let mut subscriptions = vec![]; + for topic_hash in self.mesh.keys() { + subscriptions.push(GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }); + } - debug!("New peer connected: {}", peer_id); - // We need to send our subscriptions to the newly-connected node. - let mut subscriptions = vec![]; - for topic_hash in self.mesh.keys() { - subscriptions.push(GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, - }); - } - - if !subscriptions.is_empty() { - // send our subscriptions to the peer - if self - .send_message( - *peer_id, - GossipsubRpc { - messages: Vec::new(), - subscriptions, - control_msgs: Vec::new(), - } - .into_protobuf(), - ) - .is_err() - { - error!("Failed to send subscriptions, message too large"); + if !subscriptions.is_empty() { + // send our subscriptions to the peer + if self + .send_message( + *peer_id, + GossipsubRpc { + messages: Vec::new(), + subscriptions, + control_msgs: Vec::new(), + } + .into_protobuf(), + ) + .is_err() + { + error!("Failed to send subscriptions, message too large"); + } } } @@ -2854,9 +2862,10 @@ where let topics = match self.peer_topics.get(peer_id) { Some(topics) => (topics), None => { - if !self.blacklisted_peers.contains(peer_id) { - debug!("Disconnected node, not in connected nodes"); - } + debug_assert!( + self.blacklisted_peers.contains(peer_id), + "Disconnected node not in connected list" + ); return; } }; @@ -2890,12 +2899,12 @@ where .get_mut(&topic) .map(|peers| peers.remove(peer_id)); } - - //forget px and outbound status for this peer - self.px_peers.remove(peer_id); - self.outbound_peers.remove(peer_id); } + // Forget px and outbound status for this peer + self.px_peers.remove(peer_id); + self.outbound_peers.remove(peer_id); + // Remove peer from peer_topics and connected_peers // NOTE: It is possible the peer has already been removed from all mappings if it does not // support the protocol. @@ -2913,11 +2922,6 @@ where connection_id: &ConnectionId, endpoint: &ConnectedPoint, ) { - // Ignore connections from blacklisted peers. - if self.blacklisted_peers.contains(peer_id) { - return; - } - // Check if the peer is an outbound peer if let ConnectedPoint::Dialer { .. } = endpoint { // Diverging from the go implementation we only want to consider a peer as outbound peer diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 81b2267f..463452b0 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -5228,4 +5228,36 @@ mod tests { //nobody got penalized assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score); } + + #[test] + /// Test nodes that send grafts without subscriptions. + fn test_graft_without_subscribe() { + // The node should: + // - Create an empty vector in mesh[topic] + // - Send subscription request to all peers + // - run JOIN(topic) + + let topic = String::from("test_subscribe"); + let subscribe_topic = vec![topic.clone()]; + let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()]; + let (mut gs, peers, topic_hashes) = inject_nodes1() + .peer_no(1) + .topics(subscribe_topic) + .to_subscribe(false) + .create_network(); + + assert!( + gs.mesh.get(&topic_hashes[0]).is_some(), + "Subscribe should add a new entry to the mesh[topic] hashmap" + ); + + // The node sends a graft for the subscribe topic. + gs.handle_graft(&peers[0], subscribe_topic_hash); + + // The node disconnects + gs.inject_disconnected(&peers[0]); + + // We unsubscribe from the topic. + let _ = gs.unsubscribe(&Topic::new(topic)); + } }