mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-30 11:11:21 +00:00
protocols/gossipsub: Fix inconsistency in mesh peer tracking (#2189)
Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
parent
f701b24ec0
commit
ce23cbe76a
@ -1,5 +1,8 @@
|
|||||||
# 0.33.0 [unreleased]
|
# 0.33.0 [unreleased]
|
||||||
|
|
||||||
|
- Improve internal peer tracking.
|
||||||
|
[PR 2175](https://github.com/libp2p/rust-libp2p/pull/2175)
|
||||||
|
|
||||||
- Update dependencies.
|
- Update dependencies.
|
||||||
|
|
||||||
- Allow `message_id_fn`s to accept closures that capture variables.
|
- Allow `message_id_fn`s to accept closures that capture variables.
|
||||||
|
@ -1241,6 +1241,15 @@ where
|
|||||||
|
|
||||||
let mut do_px = self.config.do_px();
|
let mut do_px = self.config.do_px();
|
||||||
|
|
||||||
|
// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
|
||||||
|
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
|
||||||
|
for topic in &topics {
|
||||||
|
self.peer_topics
|
||||||
|
.entry(*peer_id)
|
||||||
|
.or_default()
|
||||||
|
.insert(topic.clone());
|
||||||
|
}
|
||||||
|
|
||||||
// we don't GRAFT to/from explicit peers; complain loudly if this happens
|
// we don't GRAFT to/from explicit peers; complain loudly if this happens
|
||||||
if self.explicit_peers.contains(peer_id) {
|
if self.explicit_peers.contains(peer_id) {
|
||||||
warn!("GRAFT: ignoring request from direct peer {}", peer_id);
|
warn!("GRAFT: ignoring request from direct peer {}", peer_id);
|
||||||
@ -2808,9 +2817,7 @@ where
|
|||||||
// Ignore connections from blacklisted peers.
|
// Ignore connections from blacklisted peers.
|
||||||
if self.blacklisted_peers.contains(peer_id) {
|
if self.blacklisted_peers.contains(peer_id) {
|
||||||
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
|
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
|
||||||
return;
|
} else {
|
||||||
}
|
|
||||||
|
|
||||||
debug!("New peer connected: {}", peer_id);
|
debug!("New peer connected: {}", peer_id);
|
||||||
// We need to send our subscriptions to the newly-connected node.
|
// We need to send our subscriptions to the newly-connected node.
|
||||||
let mut subscriptions = vec![];
|
let mut subscriptions = vec![];
|
||||||
@ -2838,6 +2845,7 @@ where
|
|||||||
error!("Failed to send subscriptions, message too large");
|
error!("Failed to send subscriptions, message too large");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Insert an empty set of the topics of this peer until known.
|
// Insert an empty set of the topics of this peer until known.
|
||||||
self.peer_topics.insert(*peer_id, Default::default());
|
self.peer_topics.insert(*peer_id, Default::default());
|
||||||
@ -2854,9 +2862,10 @@ where
|
|||||||
let topics = match self.peer_topics.get(peer_id) {
|
let topics = match self.peer_topics.get(peer_id) {
|
||||||
Some(topics) => (topics),
|
Some(topics) => (topics),
|
||||||
None => {
|
None => {
|
||||||
if !self.blacklisted_peers.contains(peer_id) {
|
debug_assert!(
|
||||||
debug!("Disconnected node, not in connected nodes");
|
self.blacklisted_peers.contains(peer_id),
|
||||||
}
|
"Disconnected node not in connected list"
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -2890,11 +2899,11 @@ where
|
|||||||
.get_mut(&topic)
|
.get_mut(&topic)
|
||||||
.map(|peers| peers.remove(peer_id));
|
.map(|peers| peers.remove(peer_id));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//forget px and outbound status for this peer
|
// Forget px and outbound status for this peer
|
||||||
self.px_peers.remove(peer_id);
|
self.px_peers.remove(peer_id);
|
||||||
self.outbound_peers.remove(peer_id);
|
self.outbound_peers.remove(peer_id);
|
||||||
}
|
|
||||||
|
|
||||||
// Remove peer from peer_topics and connected_peers
|
// Remove peer from peer_topics and connected_peers
|
||||||
// NOTE: It is possible the peer has already been removed from all mappings if it does not
|
// NOTE: It is possible the peer has already been removed from all mappings if it does not
|
||||||
@ -2913,11 +2922,6 @@ where
|
|||||||
connection_id: &ConnectionId,
|
connection_id: &ConnectionId,
|
||||||
endpoint: &ConnectedPoint,
|
endpoint: &ConnectedPoint,
|
||||||
) {
|
) {
|
||||||
// Ignore connections from blacklisted peers.
|
|
||||||
if self.blacklisted_peers.contains(peer_id) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the peer is an outbound peer
|
// Check if the peer is an outbound peer
|
||||||
if let ConnectedPoint::Dialer { .. } = endpoint {
|
if let ConnectedPoint::Dialer { .. } = endpoint {
|
||||||
// Diverging from the go implementation we only want to consider a peer as outbound peer
|
// Diverging from the go implementation we only want to consider a peer as outbound peer
|
||||||
|
@ -5228,4 +5228,36 @@ mod tests {
|
|||||||
//nobody got penalized
|
//nobody got penalized
|
||||||
assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score);
|
assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
/// Test nodes that send grafts without subscriptions.
|
||||||
|
fn test_graft_without_subscribe() {
|
||||||
|
// The node should:
|
||||||
|
// - Create an empty vector in mesh[topic]
|
||||||
|
// - Send subscription request to all peers
|
||||||
|
// - run JOIN(topic)
|
||||||
|
|
||||||
|
let topic = String::from("test_subscribe");
|
||||||
|
let subscribe_topic = vec![topic.clone()];
|
||||||
|
let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()];
|
||||||
|
let (mut gs, peers, topic_hashes) = inject_nodes1()
|
||||||
|
.peer_no(1)
|
||||||
|
.topics(subscribe_topic)
|
||||||
|
.to_subscribe(false)
|
||||||
|
.create_network();
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
gs.mesh.get(&topic_hashes[0]).is_some(),
|
||||||
|
"Subscribe should add a new entry to the mesh[topic] hashmap"
|
||||||
|
);
|
||||||
|
|
||||||
|
// The node sends a graft for the subscribe topic.
|
||||||
|
gs.handle_graft(&peers[0], subscribe_topic_hash);
|
||||||
|
|
||||||
|
// The node disconnects
|
||||||
|
gs.inject_disconnected(&peers[0]);
|
||||||
|
|
||||||
|
// We unsubscribe from the topic.
|
||||||
|
let _ = gs.unsubscribe(&Topic::new(topic));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user