mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-24 07:11:38 +00:00
protocols/gossipsub: Implement unsub backoff spec changes (#2403)
Implements the changes specified by https://github.com/libp2p/specs/pull/383. Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -13,11 +13,15 @@
|
|||||||
|
|
||||||
- Fix `GossipsubConfigBuilder::build()` requiring `&self` to live for `'static` (see [PR 2409])
|
- Fix `GossipsubConfigBuilder::build()` requiring `&self` to live for `'static` (see [PR 2409])
|
||||||
|
|
||||||
|
- Implement Unsubscribe backoff as per [libp2p specs PR 383] (see [PR 2403]).
|
||||||
|
|
||||||
[PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346
|
[PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346
|
||||||
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
|
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
|
||||||
[PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327
|
[PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327
|
||||||
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408
|
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408
|
||||||
[PR 2409]: https://github.com/libp2p/rust-libp2p/pull/2409
|
[PR 2409]: https://github.com/libp2p/rust-libp2p/pull/2409
|
||||||
|
[PR 2403]: https://github.com/libp2p/rust-libp2p/pull/2403
|
||||||
|
[libp2p specs PR 383]: https://github.com/libp2p/specs/pull/383
|
||||||
|
|
||||||
# 0.34.0 [2021-11-16]
|
# 0.34.0 [2021-11-16]
|
||||||
|
|
||||||
|
@ -1049,6 +1049,7 @@ where
|
|||||||
topic_hash: &TopicHash,
|
topic_hash: &TopicHash,
|
||||||
peer: &PeerId,
|
peer: &PeerId,
|
||||||
do_px: bool,
|
do_px: bool,
|
||||||
|
on_unsubscribe: bool,
|
||||||
) -> GossipsubControlAction {
|
) -> GossipsubControlAction {
|
||||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||||
peer_score.prune(peer, topic_hash.clone());
|
peer_score.prune(peer, topic_hash.clone());
|
||||||
@ -1088,14 +1089,19 @@ where
|
|||||||
Vec::new()
|
Vec::new()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let backoff = if on_unsubscribe {
|
||||||
|
self.config.unsubscribe_backoff()
|
||||||
|
} else {
|
||||||
|
self.config.prune_backoff()
|
||||||
|
};
|
||||||
|
|
||||||
// update backoff
|
// update backoff
|
||||||
self.backoffs
|
self.backoffs.update_backoff(topic_hash, peer, backoff);
|
||||||
.update_backoff(topic_hash, peer, self.config.prune_backoff());
|
|
||||||
|
|
||||||
GossipsubControlAction::Prune {
|
GossipsubControlAction::Prune {
|
||||||
topic_hash: topic_hash.clone(),
|
topic_hash: topic_hash.clone(),
|
||||||
peers,
|
peers,
|
||||||
backoff: Some(self.config.prune_backoff().as_secs()),
|
backoff: Some(backoff.as_secs()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1111,7 +1117,9 @@ where
|
|||||||
for peer in peers {
|
for peer in peers {
|
||||||
// Send a PRUNE control message
|
// Send a PRUNE control message
|
||||||
debug!("LEAVE: Sending PRUNE to peer: {:?}", peer);
|
debug!("LEAVE: Sending PRUNE to peer: {:?}", peer);
|
||||||
let control = self.make_prune(topic_hash, &peer, self.config.do_px());
|
let on_unsubscribe = true;
|
||||||
|
let control =
|
||||||
|
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
|
||||||
Self::control_pool_add(&mut self.control_pool, peer, control);
|
Self::control_pool_add(&mut self.control_pool, peer, control);
|
||||||
|
|
||||||
// If the peer did not previously exist in any mesh, inform the handler
|
// If the peer did not previously exist in any mesh, inform the handler
|
||||||
@ -1487,9 +1495,10 @@ where
|
|||||||
|
|
||||||
if !to_prune_topics.is_empty() {
|
if !to_prune_topics.is_empty() {
|
||||||
// build the prune messages to send
|
// build the prune messages to send
|
||||||
|
let on_unsubscribe = false;
|
||||||
let prune_messages = to_prune_topics
|
let prune_messages = to_prune_topics
|
||||||
.iter()
|
.iter()
|
||||||
.map(|t| self.make_prune(t, peer_id, do_px))
|
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
|
||||||
.collect();
|
.collect();
|
||||||
// Send the prune messages to the peer
|
// Send the prune messages to the peer
|
||||||
debug!(
|
debug!(
|
||||||
@ -2598,6 +2607,9 @@ where
|
|||||||
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
|
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
|
||||||
// It therefore must be in at least one mesh and we do not need to inform the handler
|
// It therefore must be in at least one mesh and we do not need to inform the handler
|
||||||
// of its removal from another.
|
// of its removal from another.
|
||||||
|
|
||||||
|
// The following prunes are not due to unsubscribing.
|
||||||
|
let on_unsubscribe = false;
|
||||||
if let Some(topics) = to_prune.remove(&peer) {
|
if let Some(topics) = to_prune.remove(&peer) {
|
||||||
let mut prunes = topics
|
let mut prunes = topics
|
||||||
.iter()
|
.iter()
|
||||||
@ -2606,6 +2618,7 @@ where
|
|||||||
topic_hash,
|
topic_hash,
|
||||||
&peer,
|
&peer,
|
||||||
self.config.do_px() && !no_px.contains(&peer),
|
self.config.do_px() && !no_px.contains(&peer),
|
||||||
|
on_unsubscribe,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
@ -2630,6 +2643,8 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handle the remaining prunes
|
// handle the remaining prunes
|
||||||
|
// The following prunes are not due to unsubscribing.
|
||||||
|
let on_unsubscribe = false;
|
||||||
for (peer, topics) in to_prune.iter() {
|
for (peer, topics) in to_prune.iter() {
|
||||||
let mut remaining_prunes = Vec::new();
|
let mut remaining_prunes = Vec::new();
|
||||||
for topic_hash in topics {
|
for topic_hash in topics {
|
||||||
@ -2637,6 +2652,7 @@ where
|
|||||||
topic_hash,
|
topic_hash,
|
||||||
peer,
|
peer,
|
||||||
self.config.do_px() && !no_px.contains(peer),
|
self.config.do_px() && !no_px.contains(peer),
|
||||||
|
on_unsubscribe,
|
||||||
);
|
);
|
||||||
remaining_prunes.push(prune);
|
remaining_prunes.push(prune);
|
||||||
// inform the handler
|
// inform the handler
|
||||||
|
@ -2037,6 +2037,77 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_unsubscribe_backoff() {
|
||||||
|
const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100);
|
||||||
|
let config = GossipsubConfigBuilder::default()
|
||||||
|
.backoff_slack(1)
|
||||||
|
// ensure a prune_backoff > unsubscribe_backoff
|
||||||
|
.prune_backoff(Duration::from_secs(5))
|
||||||
|
.unsubscribe_backoff(1)
|
||||||
|
.heartbeat_interval(HEARTBEAT_INTERVAL)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let topic = String::from("test");
|
||||||
|
// only one peer => mesh too small and will try to regraft as early as possible
|
||||||
|
let (mut gs, _, topics) = inject_nodes1()
|
||||||
|
.peer_no(1)
|
||||||
|
.topics(vec![topic.clone()])
|
||||||
|
.to_subscribe(true)
|
||||||
|
.gs_config(config)
|
||||||
|
.create_network();
|
||||||
|
|
||||||
|
let _ = gs.unsubscribe(&Topic::new(topic.clone()));
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
count_control_msgs(&gs, |_, m| match m {
|
||||||
|
GossipsubControlAction::Prune { backoff, .. } => backoff == &Some(1),
|
||||||
|
_ => false,
|
||||||
|
}),
|
||||||
|
1,
|
||||||
|
"Peer should be pruned with `unsubscribe_backoff`."
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = gs.subscribe(&Topic::new(topics[0].to_string()));
|
||||||
|
|
||||||
|
// forget all events until now
|
||||||
|
flush_events(&mut gs);
|
||||||
|
|
||||||
|
// call heartbeat
|
||||||
|
gs.heartbeat();
|
||||||
|
|
||||||
|
// Sleep for one second and apply 10 regular heartbeats (interval = 100ms).
|
||||||
|
for _ in 0..10 {
|
||||||
|
sleep(HEARTBEAT_INTERVAL);
|
||||||
|
gs.heartbeat();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat
|
||||||
|
// is needed).
|
||||||
|
assert_eq!(
|
||||||
|
count_control_msgs(&gs, |_, m| match m {
|
||||||
|
GossipsubControlAction::Graft { .. } => true,
|
||||||
|
_ => false,
|
||||||
|
}),
|
||||||
|
0,
|
||||||
|
"Graft message created too early within backoff period"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Heartbeat one more time this should graft now
|
||||||
|
sleep(HEARTBEAT_INTERVAL);
|
||||||
|
gs.heartbeat();
|
||||||
|
|
||||||
|
// check that graft got created
|
||||||
|
assert!(
|
||||||
|
count_control_msgs(&gs, |_, m| match m {
|
||||||
|
GossipsubControlAction::Graft { .. } => true,
|
||||||
|
_ => false,
|
||||||
|
}) > 0,
|
||||||
|
"No graft message was created after backoff period"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_flood_publish() {
|
fn test_flood_publish() {
|
||||||
let config: GossipsubConfig = GossipsubConfig::default();
|
let config: GossipsubConfig = GossipsubConfig::default();
|
||||||
|
@ -77,6 +77,7 @@ pub struct GossipsubConfig {
|
|||||||
do_px: bool,
|
do_px: bool,
|
||||||
prune_peers: usize,
|
prune_peers: usize,
|
||||||
prune_backoff: Duration,
|
prune_backoff: Duration,
|
||||||
|
unsubscribe_backoff: Duration,
|
||||||
backoff_slack: u32,
|
backoff_slack: u32,
|
||||||
flood_publish: bool,
|
flood_publish: bool,
|
||||||
graft_flood_threshold: Duration,
|
graft_flood_threshold: Duration,
|
||||||
@ -276,6 +277,15 @@ impl GossipsubConfig {
|
|||||||
self.prune_backoff
|
self.prune_backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Controls the backoff time when unsubscribing from a topic.
|
||||||
|
///
|
||||||
|
/// This is how long to wait before resubscribing to the topic. A short backoff period in case
|
||||||
|
/// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
|
||||||
|
/// is 10 seconds.
|
||||||
|
pub fn unsubscribe_backoff(&self) -> Duration {
|
||||||
|
self.unsubscribe_backoff
|
||||||
|
}
|
||||||
|
|
||||||
/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
|
/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
|
||||||
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
|
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
|
||||||
/// solves problems occuring through high latencies. In particular if
|
/// solves problems occuring through high latencies. In particular if
|
||||||
@ -421,6 +431,7 @@ impl Default for GossipsubConfigBuilder {
|
|||||||
do_px: false,
|
do_px: false,
|
||||||
prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented.
|
prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented.
|
||||||
prune_backoff: Duration::from_secs(60),
|
prune_backoff: Duration::from_secs(60),
|
||||||
|
unsubscribe_backoff: Duration::from_secs(10),
|
||||||
backoff_slack: 1,
|
backoff_slack: 1,
|
||||||
flood_publish: true,
|
flood_publish: true,
|
||||||
graft_flood_threshold: Duration::from_secs(10),
|
graft_flood_threshold: Duration::from_secs(10),
|
||||||
@ -636,6 +647,16 @@ impl GossipsubConfigBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Controls the backoff time when unsubscribing from a topic.
|
||||||
|
///
|
||||||
|
/// This is how long to wait before resubscribing to the topic. A short backoff period in case
|
||||||
|
/// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
|
||||||
|
/// is 10 seconds.
|
||||||
|
pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: u64) -> &mut Self {
|
||||||
|
self.config.unsubscribe_backoff = Duration::from_secs(unsubscribe_backoff);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
|
/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
|
||||||
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
|
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
|
||||||
/// solves problems occuring through high latencies. In particular if
|
/// solves problems occuring through high latencies. In particular if
|
||||||
@ -777,6 +798,11 @@ impl GossipsubConfigBuilder {
|
|||||||
"The following inequality doesn't hold mesh_outbound_min <= self.config.mesh_n / 2",
|
"The following inequality doesn't hold mesh_outbound_min <= self.config.mesh_n / 2",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if self.config.unsubscribe_backoff.as_millis() == 0 {
|
||||||
|
return Err("The unsubscribe_backoff parameter should be positive.");
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.config.clone())
|
Ok(self.config.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user