protocols/mdns: Optimise InterfaceState::poll for low latency (#2939)

This commit is contained in:
Thomas Eizinger
2022-10-04 18:45:39 +11:00
committed by GitHub
parent 1b793242e6
commit a905a36cbc
4 changed files with 158 additions and 135 deletions

View File

@ -5,8 +5,11 @@
- Update to `libp2p-core` `v0.37.0`. - Update to `libp2p-core` `v0.37.0`.
- Update to `libp2p-swarm` `v0.40.0`. - Update to `libp2p-swarm` `v0.40.0`.
- Fix a bug that could cause a delay of ~10s until peers would get discovered when using the tokio runtime. See [PR 2939].
[PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918 [PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918
[PR 2939]: https://github.com/libp2p/rust-libp2p/pull/2939
# 0.40.0 # 0.40.0

View File

@ -203,7 +203,7 @@ where
// Emit discovered event. // Emit discovered event.
let mut discovered = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); let mut discovered = SmallVec::<[(PeerId, Multiaddr); 4]>::new();
for iface_state in self.iface_states.values_mut() { for iface_state in self.iface_states.values_mut() {
while let Some((peer, addr, expiration)) = iface_state.poll(cx, params) { while let Poll::Ready((peer, addr, expiration)) = iface_state.poll(cx, params) {
if let Some((_, _, cur_expires)) = self if let Some((_, _, cur_expires)) = self
.discovered_nodes .discovered_nodes
.iter_mut() .iter_mut()

View File

@ -25,12 +25,12 @@ use self::dns::{build_query, build_query_response, build_service_discovery_respo
use self::query::MdnsPacket; use self::query::MdnsPacket;
use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::behaviour::{socket::AsyncSocket, timer::Builder};
use crate::MdnsConfig; use crate::MdnsConfig;
use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::PollParameters; use libp2p_swarm::PollParameters;
use socket2::{Domain, Socket, Type}; use socket2::{Domain, Socket, Type};
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
io, iter, io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -145,106 +145,101 @@ where
self.timeout = T::interval_at(Instant::now(), self.query_interval); self.timeout = T::interval_at(Instant::now(), self.query_interval);
} }
fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
log::trace!("received packet on iface {} {:?}", self.addr, packet);
match packet {
MdnsPacket::Query(query) => {
self.reset_timer();
log::trace!("sending response on iface {}", self.addr);
for packet in build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
) {
self.send_buffer.push_back(packet);
}
}
MdnsPacket::Response(response) => {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(response.remote_addr().ip());
let obs_port = Protocol::Udp(response.remote_addr().port());
let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect();
for peer in response.discovered_peers() {
if peer.id() == params.local_peer_id() {
continue;
}
let new_expiration = Instant::now() + peer.ttl();
for addr in peer.addresses() {
if let Some(new_addr) = address_translation(addr, &observed) {
self.discovered.push_back((
*peer.id(),
new_addr.clone(),
new_expiration,
));
}
self.discovered
.push_back((*peer.id(), addr.clone(), new_expiration));
}
}
}
MdnsPacket::ServiceDiscovery(disc) => {
let resp = build_service_discovery_response(disc.query_id(), self.ttl);
self.send_buffer.push_back(resp);
}
}
}
pub fn poll( pub fn poll(
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
params: &impl PollParameters, params: &impl PollParameters,
) -> Option<(PeerId, Multiaddr, Instant)> { ) -> Poll<(PeerId, Multiaddr, Instant)> {
// Poll receive socket. loop {
while let Poll::Ready(data) = // 1st priority: Low latency: Create packet ASAP after timeout.
Pin::new(&mut self.recv_socket).poll_read(cx, &mut self.recv_buffer) if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
{ log::trace!("sending query on iface {}", self.addr);
match data { self.send_buffer.push_back(build_query());
Ok((len, from)) => { }
if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)
{ // 2nd priority: Keep local buffers small: Send packets to remote.
self.inject_mdns_packet(packet, params); if let Some(packet) = self.send_buffer.pop_front() {
match Pin::new(&mut self.send_socket).poll_write(
cx,
&packet,
SocketAddr::new(self.multicast_addr, 5353),
) {
Poll::Ready(Ok(_)) => {
log::trace!("sent packet on iface {}", self.addr);
continue;
}
Poll::Ready(Err(err)) => {
log::error!("error sending packet on iface {} {}", self.addr, err);
continue;
}
Poll::Pending => {
self.send_buffer.push_front(packet);
} }
} }
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { }
// No more bytes available on the socket to read
break; // 3rd priority: Keep local buffers small: Return discovered addresses.
if let Some(discovered) = self.discovered.pop_front() {
return Poll::Ready(discovered);
}
// 4th priority: Remote work: Answer incoming requests.
match Pin::new(&mut self.recv_socket)
.poll_read(cx, &mut self.recv_buffer)
.map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from))
{
Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
self.reset_timer();
log::trace!(
"received query from {} on {}",
query.remote_addr(),
self.addr
);
self.send_buffer.extend(build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
));
continue;
} }
Err(err) => { Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => {
log::trace!(
"received response from {} on {}",
response.remote_addr(),
self.addr
);
self.discovered.extend(
response.extract_discovered(Instant::now(), *params.local_peer_id()),
);
continue;
}
Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
log::trace!(
"received service discovery from {} on {}",
disc.remote_addr(),
self.addr
);
self.send_buffer
.push_back(build_service_discovery_response(disc.query_id(), self.ttl));
continue;
}
Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No more bytes available on the socket to read
}
Poll::Ready(Err(err)) => {
log::error!("failed reading datagram: {}", err); log::error!("failed reading datagram: {}", err);
} }
} Poll::Ready(Ok(Err(err))) => {
} log::debug!("Parsing mdns packet failed: {:?}", err);
// Send responses.
while let Some(packet) = self.send_buffer.pop_front() {
match Pin::new(&mut self.send_socket).poll_write(
cx,
&packet,
SocketAddr::new(self.multicast_addr, 5353),
) {
Poll::Ready(Ok(_)) => log::trace!("sent packet on iface {}", self.addr),
Poll::Ready(Err(err)) => {
log::error!("error sending packet on iface {} {}", self.addr, err);
}
Poll::Pending => {
self.send_buffer.push_front(packet);
break;
} }
Poll::Ready(Ok(Ok(None))) | Poll::Pending => {}
} }
}
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { return Poll::Pending;
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
} }
// Emit discovered event.
self.discovered.pop_front()
} }
} }

View File

@ -22,9 +22,11 @@ use super::dns;
use crate::{META_QUERY_SERVICE, SERVICE_NAME}; use crate::{META_QUERY_SERVICE, SERVICE_NAME};
use dns_parser::{Packet, RData}; use dns_parser::{Packet, RData};
use libp2p_core::{ use libp2p_core::{
address_translation,
multiaddr::{Multiaddr, Protocol}, multiaddr::{Multiaddr, Protocol},
PeerId, PeerId,
}; };
use std::time::Instant;
use std::{convert::TryFrom, fmt, net::SocketAddr, str, time::Duration}; use std::{convert::TryFrom, fmt, net::SocketAddr, str, time::Duration};
/// A valid mDNS packet received by the service. /// A valid mDNS packet received by the service.
@ -39,44 +41,40 @@ pub enum MdnsPacket {
} }
impl MdnsPacket { impl MdnsPacket {
pub fn new_from_bytes(buf: &[u8], from: SocketAddr) -> Option<MdnsPacket> { pub fn new_from_bytes(
match Packet::parse(buf) { buf: &[u8],
Ok(packet) => { from: SocketAddr,
if packet.header.query { ) -> Result<Option<MdnsPacket>, dns_parser::Error> {
if packet let packet = Packet::parse(buf)?;
.questions
.iter() if !packet.header.query {
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) return Ok(Some(MdnsPacket::Response(MdnsResponse::new(packet, from))));
{
let query = MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
});
Some(query)
} else if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
let discovery = MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery {
from,
query_id: packet.header.id,
});
Some(discovery)
} else {
None
}
} else {
let resp = MdnsPacket::Response(MdnsResponse::new(packet, from));
Some(resp)
}
}
Err(err) => {
log::debug!("Parsing mdns packet failed: {:?}", err);
None
}
} }
if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME)
{
return Ok(Some(MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
})));
}
if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
return Ok(Some(MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery {
from,
query_id: packet.header.id,
})));
}
Ok(None)
} }
} }
@ -167,18 +165,45 @@ impl MdnsResponse {
MdnsResponse { peers, from } MdnsResponse { peers, from }
} }
/// Returns the list of peers that have been reported in this packet. pub fn extract_discovered(
/// &self,
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. now: Instant,
pub fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> { local_peer_id: PeerId,
self.peers.iter() ) -> impl Iterator<Item = (PeerId, Multiaddr, Instant)> + '_ {
self.discovered_peers()
.filter(move |peer| peer.id() != &local_peer_id)
.flat_map(move |peer| {
let observed = self.observed_address();
let new_expiration = now + peer.ttl();
peer.addresses().iter().filter_map(move |address| {
let new_addr = address_translation(address, &observed)?;
Some((*peer.id(), new_addr, new_expiration))
})
})
} }
/// Source address of the packet. /// Source address of the packet.
#[inline]
pub fn remote_addr(&self) -> &SocketAddr { pub fn remote_addr(&self) -> &SocketAddr {
&self.from &self.from
} }
fn observed_address(&self) -> Multiaddr {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(self.remote_addr().ip());
let obs_port = Protocol::Udp(self.remote_addr().port());
Multiaddr::empty().with(obs_ip).with(obs_port)
}
/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> {
self.peers.iter()
}
} }
impl fmt::Debug for MdnsResponse { impl fmt::Debug for MdnsResponse {