feat(mdns): adaptive initial interval peer discovery

Peer discovery with mDNS can be very slow, particularly if the first mDNS query is lost. This patch resolves it by adjusting the timer with an adaptive initial interval. We start with a very short timer (500 ms). If a peer is discovered before the end of the timer, then the timer is reset to the normal query interval value (300s), otherwise the timer's value is multiplied by 2 until it reaches the normal query interval value.

Related: https://github.com/libp2p/rust-libp2p/pull/3323.
Resolves: https://github.com/libp2p/rust-libp2p/issues/3319.

Pull-Request: #3975.
This commit is contained in:
Philippe Jalaber 2023-05-25 06:09:24 +02:00 committed by GitHub
parent 137443b9df
commit 67b26cce4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 5 deletions

View File

@ -6,7 +6,9 @@
- Raise MSRV to 1.65.
See [PR 3715].
- Remove deprecated `Mdns` prefixed items. See [PR 3699].
- Faster peer discovery with adaptive initial interval. See [PR 3975].
[PR 3975]: https://github.com/libp2p/rust-libp2p/pull/3975
[PR 3621]: https://github.com/libp2p/rust-libp2p/pull/3621
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3699]: https://github.com/libp2p/rust-libp2p/pull/3699

View File

@ -38,6 +38,30 @@ use std::{
time::{Duration, Instant},
};
/// Initial interval for starting probe
const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500);
#[derive(Debug, Clone)]
enum ProbeState {
Probing(Duration),
Finished(Duration),
}
impl Default for ProbeState {
fn default() -> Self {
ProbeState::Probing(INITIAL_TIMEOUT_INTERVAL)
}
}
impl ProbeState {
fn interval(&self) -> &Duration {
match self {
ProbeState::Probing(query_interval) => query_interval,
ProbeState::Finished(query_interval) => query_interval,
}
}
}
/// An mDNS instance for a networking interface. To discover all peers when having multiple
/// interfaces an [`InterfaceState`] is required for each interface.
#[derive(Debug)]
@ -67,7 +91,7 @@ pub(crate) struct InterfaceState<U, T> {
discovered: VecDeque<(PeerId, Multiaddr, Instant)>,
/// TTL
ttl: Duration,
probe_state: ProbeState,
local_peer_id: PeerId,
}
@ -134,19 +158,22 @@ where
send_buffer: Default::default(),
discovered: Default::default(),
query_interval,
timeout: T::interval_at(Instant::now(), query_interval),
timeout: T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL),
multicast_addr,
ttl: config.ttl,
probe_state: Default::default(),
local_peer_id,
})
}
pub(crate) fn reset_timer(&mut self) {
self.timeout = T::interval(self.query_interval);
log::trace!("reset timer on {:#?} {:#?}", self.addr, self.probe_state);
let interval = *self.probe_state.interval();
self.timeout = T::interval(interval);
}
pub(crate) fn fire_timer(&mut self) {
self.timeout = T::interval_at(Instant::now(), self.query_interval);
self.timeout = T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL);
}
pub(crate) fn poll(
@ -159,6 +186,19 @@ where
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
log::trace!("tick on {:#?} {:#?}", self.addr, self.probe_state);
// Stop to probe when the initial interval reach the query interval
if let ProbeState::Probing(interval) = self.probe_state {
let interval = interval * 2;
self.probe_state = if interval >= self.query_interval {
ProbeState::Finished(self.query_interval)
} else {
ProbeState::Probing(interval)
};
}
self.reset_timer();
}
// 2nd priority: Keep local buffers small: Send packets to remote.
@ -193,7 +233,6 @@ where
.map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from))
{
Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
self.reset_timer();
log::trace!(
"received query from {} on {}",
query.remote_addr(),
@ -217,6 +256,12 @@ where
self.discovered
.extend(response.extract_discovered(Instant::now(), self.local_peer_id));
// Stop probing when we have a valid response
if !self.discovered.is_empty() {
self.probe_state = ProbeState::Finished(self.query_interval);
self.reset_timer();
}
continue;
}
Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {