protocols/mdns: Fix discovered event emission. (#2065)

mdns keeps rediscovering nodes. this PR changes that to only emit events for new
nodes it discovered. In addition we make sure to only send a query if it is
really needed. Some logging is added for debugging purposes.
This commit is contained in:
David Craven
2021-05-06 17:08:29 +02:00
committed by GitHub
parent 5c541a16f1
commit 8988ac247e
3 changed files with 30 additions and 28 deletions

View File

@ -1,3 +1,8 @@
# 0.30.2 [2021-05-06]
- Fix discovered event emission.
[PR 2065](https://github.com/libp2p/rust-libp2p/pull/2065)
# 0.30.1 [2021-04-21]
- Fix timely discovery of peers after listening on a new address.

View File

@ -1,7 +1,7 @@
[package]
name = "libp2p-mdns"
edition = "2018"
version = "0.30.1"
version = "0.30.2"
description = "Implementation of the libp2p mDNS discovery method"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"

View File

@ -24,10 +24,10 @@ use async_io::{Async, Timer};
use futures::prelude::*;
use if_watch::{IfEvent, IfWatcher};
use lazy_static::lazy_static;
use libp2p_core::connection::ListenerId;
use libp2p_core::{
address_translation, connection::ConnectionId, multiaddr::Protocol, Multiaddr, PeerId,
};
use libp2p_core::connection::ListenerId;
use libp2p_swarm::{
protocols_handler::DummyProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, ProtocolsHandler,
@ -124,11 +124,7 @@ impl Mdns {
/// Builds a new `Mdns` behaviour.
pub async fn new(config: MdnsConfig) -> io::Result<Self> {
let recv_socket = {
let socket = Socket::new(
Domain::IPV4,
Type::DGRAM,
Some(socket2::Protocol::UDP),
)?;
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(socket2::Protocol::UDP))?;
socket.set_reuse_address(true)?;
#[cfg(unix)]
socket.set_reuse_port(true)?;
@ -169,9 +165,10 @@ impl Mdns {
}
fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
self.timeout.set_interval(self.query_interval);
match packet {
MdnsPacket::Query(query) => {
self.timeout.set_interval(self.query_interval);
log::trace!("sending response");
for packet in build_query_response(
query.query_id(),
*params.local_peer_id(),
@ -214,8 +211,8 @@ impl Mdns {
} else {
self.discovered_nodes
.push((*peer.id(), addr.clone(), new_expiration));
discovered.push((*peer.id(), addr));
}
discovered.push((*peer.id(), addr));
}
}
@ -271,7 +268,8 @@ impl NetworkBehaviour for Mdns {
}
fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
self.send_buffer.push_back(build_query());
self.timeout
.set_interval_at(Instant::now(), self.query_interval);
}
fn poll(
@ -297,7 +295,8 @@ impl NetworkBehaviour for Mdns {
if let Err(err) = socket.join_multicast_v4(&multicast, &addr) {
log::error!("join multicast failed: {}", err);
} else {
self.send_buffer.push_back(build_query());
self.timeout
.set_interval_at(Instant::now(), self.query_interval);
}
}
}
@ -332,25 +331,23 @@ impl NetworkBehaviour for Mdns {
_ => {}
}
}
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
self.send_buffer.push_back(build_query());
}
// Send responses.
if !self.send_buffer.is_empty() {
while self.send_socket.poll_writable(cx).is_ready() {
if let Some(packet) = self.send_buffer.pop_front() {
match self
.send_socket
.send_to(&packet, *IPV4_MDNS_MULTICAST_ADDRESS)
.now_or_never()
{
Some(Ok(_)) => {}
Some(Err(err)) => log::error!("{}", err),
None => self.send_buffer.push_front(packet),
}
} else {
break;
while self.send_socket.poll_writable(cx).is_ready() {
if let Some(packet) = self.send_buffer.pop_front() {
match self
.send_socket
.send_to(&packet, *IPV4_MDNS_MULTICAST_ADDRESS)
.now_or_never()
{
Some(Ok(_)) => {}
Some(Err(err)) => log::error!("{}", err),
None => self.send_buffer.push_front(packet),
}
} else if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query");
self.send_buffer.push_back(build_query());
} else {
break;
}
}
// Emit discovered event.