feat(identify): push changed listen protocols to remote

Now that we have the infrastructure for notifying protocols about changes to our listen protocols, we can use this to actively push those changes to our remotes. This allows other peers to re-configure themselves with very low-latency instead of waiting for the periodic identify event.

Resolves: #3613.

Pull-Request: #3980.
This commit is contained in:
Thomas Eizinger
2023-05-25 03:09:35 +02:00
committed by GitHub
parent 6329d74334
commit 137443b9df
2 changed files with 42 additions and 3 deletions

View File

@ -10,10 +10,14 @@
- Fix aborting the answering of an identify request in rare situations. - Fix aborting the answering of an identify request in rare situations.
See [PR 3876]. See [PR 3876].
- Actively push changes in listen protocols to remote.
See [PR 3980].
[PR 3545]: https://github.com/libp2p/rust-libp2p/pull/3545
[PR 3698]: https://github.com/libp2p/rust-libp2p/pull/3698 [PR 3698]: https://github.com/libp2p/rust-libp2p/pull/3698
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3545]: https://github.com/libp2p/rust-libp2p/pull/3545
[PR 3876]: https://github.com/libp2p/rust-libp2p/pull/3876 [PR 3876]: https://github.com/libp2p/rust-libp2p/pull/3876
[PR 3980]: https://github.com/libp2p/rust-libp2p/pull/3980
## 0.42.2 ## 0.42.2

View File

@ -36,7 +36,7 @@ use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
SubstreamProtocol, SupportedProtocols, SubstreamProtocol, SupportedProtocols,
}; };
use log::warn; use log::{warn, Level};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::HashSet; use std::collections::HashSet;
use std::{io, task::Context, task::Poll, time::Duration}; use std::{io, task::Context, task::Poll, time::Duration};
@ -60,6 +60,9 @@ pub struct Handler {
/// Future that fires when we need to identify the node again. /// Future that fires when we need to identify the node again.
trigger_next_identify: Delay, trigger_next_identify: Delay,
/// Whether we have exchanged at least one periodic identify.
exchanged_one_periodic_identify: bool,
/// The interval of `trigger_next_identify`, i.e. the recurrent delay. /// The interval of `trigger_next_identify`, i.e. the recurrent delay.
interval: Duration, interval: Duration,
@ -122,6 +125,7 @@ impl Handler {
events: SmallVec::new(), events: SmallVec::new(),
pending_replies: FuturesUnordered::new(), pending_replies: FuturesUnordered::new(),
trigger_next_identify: Delay::new(initial_delay), trigger_next_identify: Delay::new(initial_delay),
exchanged_one_periodic_identify: false,
interval, interval,
public_key, public_key,
protocol_version, protocol_version,
@ -238,6 +242,14 @@ impl Handler {
self.remote_supported_protocols = new_remote_protocols; self.remote_supported_protocols = new_remote_protocols;
} }
fn local_protocols_to_string(&mut self) -> String {
self.local_supported_protocols
.iter()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(", ")
}
} }
impl ConnectionHandler for Handler { impl ConnectionHandler for Handler {
@ -319,6 +331,7 @@ impl ConnectionHandler for Handler {
let event = result let event = result
.map(|()| Event::Identification) .map(|()| Event::Identification)
.unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err))); .unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err)));
self.exchanged_one_periodic_identify = true;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
} }
@ -349,7 +362,29 @@ impl ConnectionHandler for Handler {
| ConnectionEvent::ListenUpgradeError(_) | ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {} | ConnectionEvent::RemoteProtocolsChange(_) => {}
ConnectionEvent::LocalProtocolsChange(change) => { ConnectionEvent::LocalProtocolsChange(change) => {
self.local_supported_protocols.on_protocols_change(change); let before = log::log_enabled!(Level::Debug)
.then(|| self.local_protocols_to_string())
.unwrap_or_default();
let protocols_changed = self.local_supported_protocols.on_protocols_change(change);
let after = log::log_enabled!(Level::Debug)
.then(|| self.local_protocols_to_string())
.unwrap_or_default();
if protocols_changed && self.exchanged_one_periodic_identify {
log::debug!(
"Supported listen protocols changed from [{before}] to [{after}], pushing to {}",
self.remote_peer_id
);
let info = self.build_info();
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(Push::outbound(info)),
(),
),
});
}
} }
} }
} }