diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index d6f7a89f..d0562caa 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -5,8 +5,11 @@ - Update to `libp2p-core` `v0.37.0`. - Update to `libp2p-swarm` `v0.40.0`. - + +- Fix a bug that could cause a delay of ~10s until peers would get discovered when using the tokio runtime. See [PR 2939]. + [PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918 +[PR 2939]: https://github.com/libp2p/rust-libp2p/pull/2939 # 0.40.0 diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 854bd885..d645468e 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -203,7 +203,7 @@ where // Emit discovered event. let mut discovered = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); for iface_state in self.iface_states.values_mut() { - while let Some((peer, addr, expiration)) = iface_state.poll(cx, params) { + while let Poll::Ready((peer, addr, expiration)) = iface_state.poll(cx, params) { if let Some((_, _, cur_expires)) = self .discovered_nodes .iter_mut() diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index b2d0506b..0ca9cb3d 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -25,12 +25,12 @@ use self::dns::{build_query, build_query_response, build_service_discovery_respo use self::query::MdnsPacket; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::MdnsConfig; -use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; +use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::PollParameters; use socket2::{Domain, Socket, Type}; use std::{ collections::VecDeque, - io, iter, + io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, pin::Pin, task::{Context, Poll}, @@ -145,106 +145,101 @@ where self.timeout = T::interval_at(Instant::now(), self.query_interval); } - fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) { - log::trace!("received packet on iface {} {:?}", self.addr, packet); - match packet { - MdnsPacket::Query(query) => { - self.reset_timer(); - log::trace!("sending response on iface {}", self.addr); - for packet in build_query_response( - query.query_id(), - *params.local_peer_id(), - params.listened_addresses(), - self.ttl, - ) { - self.send_buffer.push_back(packet); - } - } - MdnsPacket::Response(response) => { - // We replace the IP address with the address we observe the - // remote as and the address they listen on. - let obs_ip = Protocol::from(response.remote_addr().ip()); - let obs_port = Protocol::Udp(response.remote_addr().port()); - let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect(); - - for peer in response.discovered_peers() { - if peer.id() == params.local_peer_id() { - continue; - } - - let new_expiration = Instant::now() + peer.ttl(); - - for addr in peer.addresses() { - if let Some(new_addr) = address_translation(addr, &observed) { - self.discovered.push_back(( - *peer.id(), - new_addr.clone(), - new_expiration, - )); - } - - self.discovered - .push_back((*peer.id(), addr.clone(), new_expiration)); - } - } - } - MdnsPacket::ServiceDiscovery(disc) => { - let resp = build_service_discovery_response(disc.query_id(), self.ttl); - self.send_buffer.push_back(resp); - } - } - } - pub fn poll( &mut self, cx: &mut Context, params: &impl PollParameters, - ) -> Option<(PeerId, Multiaddr, Instant)> { - // Poll receive socket. - while let Poll::Ready(data) = - Pin::new(&mut self.recv_socket).poll_read(cx, &mut self.recv_buffer) - { - match data { - Ok((len, from)) => { - if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) - { - self.inject_mdns_packet(packet, params); + ) -> Poll<(PeerId, Multiaddr, Instant)> { + loop { + // 1st priority: Low latency: Create packet ASAP after timeout. + if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { + log::trace!("sending query on iface {}", self.addr); + self.send_buffer.push_back(build_query()); + } + + // 2nd priority: Keep local buffers small: Send packets to remote. + if let Some(packet) = self.send_buffer.pop_front() { + match Pin::new(&mut self.send_socket).poll_write( + cx, + &packet, + SocketAddr::new(self.multicast_addr, 5353), + ) { + Poll::Ready(Ok(_)) => { + log::trace!("sent packet on iface {}", self.addr); + continue; + } + Poll::Ready(Err(err)) => { + log::error!("error sending packet on iface {} {}", self.addr, err); + continue; + } + Poll::Pending => { + self.send_buffer.push_front(packet); } } - Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { - // No more bytes available on the socket to read - break; + } + + // 3rd priority: Keep local buffers small: Return discovered addresses. + if let Some(discovered) = self.discovered.pop_front() { + return Poll::Ready(discovered); + } + + // 4th priority: Remote work: Answer incoming requests. + match Pin::new(&mut self.recv_socket) + .poll_read(cx, &mut self.recv_buffer) + .map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)) + { + Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => { + self.reset_timer(); + log::trace!( + "received query from {} on {}", + query.remote_addr(), + self.addr + ); + + self.send_buffer.extend(build_query_response( + query.query_id(), + *params.local_peer_id(), + params.listened_addresses(), + self.ttl, + )); + continue; } - Err(err) => { + Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => { + log::trace!( + "received response from {} on {}", + response.remote_addr(), + self.addr + ); + + self.discovered.extend( + response.extract_discovered(Instant::now(), *params.local_peer_id()), + ); + continue; + } + Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => { + log::trace!( + "received service discovery from {} on {}", + disc.remote_addr(), + self.addr + ); + + self.send_buffer + .push_back(build_service_discovery_response(disc.query_id(), self.ttl)); + continue; + } + Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { + // No more bytes available on the socket to read + } + Poll::Ready(Err(err)) => { log::error!("failed reading datagram: {}", err); } - } - } - - // Send responses. - while let Some(packet) = self.send_buffer.pop_front() { - match Pin::new(&mut self.send_socket).poll_write( - cx, - &packet, - SocketAddr::new(self.multicast_addr, 5353), - ) { - Poll::Ready(Ok(_)) => log::trace!("sent packet on iface {}", self.addr), - Poll::Ready(Err(err)) => { - log::error!("error sending packet on iface {} {}", self.addr, err); - } - Poll::Pending => { - self.send_buffer.push_front(packet); - break; + Poll::Ready(Ok(Err(err))) => { + log::debug!("Parsing mdns packet failed: {:?}", err); } + Poll::Ready(Ok(Ok(None))) | Poll::Pending => {} } - } - if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { - log::trace!("sending query on iface {}", self.addr); - self.send_buffer.push_back(build_query()); + return Poll::Pending; } - - // Emit discovered event. - self.discovered.pop_front() } } diff --git a/protocols/mdns/src/behaviour/iface/query.rs b/protocols/mdns/src/behaviour/iface/query.rs index 67dc2e01..70e38016 100644 --- a/protocols/mdns/src/behaviour/iface/query.rs +++ b/protocols/mdns/src/behaviour/iface/query.rs @@ -22,9 +22,11 @@ use super::dns; use crate::{META_QUERY_SERVICE, SERVICE_NAME}; use dns_parser::{Packet, RData}; use libp2p_core::{ + address_translation, multiaddr::{Multiaddr, Protocol}, PeerId, }; +use std::time::Instant; use std::{convert::TryFrom, fmt, net::SocketAddr, str, time::Duration}; /// A valid mDNS packet received by the service. @@ -39,44 +41,40 @@ pub enum MdnsPacket { } impl MdnsPacket { - pub fn new_from_bytes(buf: &[u8], from: SocketAddr) -> Option { - match Packet::parse(buf) { - Ok(packet) => { - if packet.header.query { - if packet - .questions - .iter() - .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) - { - let query = MdnsPacket::Query(MdnsQuery { - from, - query_id: packet.header.id, - }); - Some(query) - } else if packet - .questions - .iter() - .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) - { - // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? - let discovery = MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery { - from, - query_id: packet.header.id, - }); - Some(discovery) - } else { - None - } - } else { - let resp = MdnsPacket::Response(MdnsResponse::new(packet, from)); - Some(resp) - } - } - Err(err) => { - log::debug!("Parsing mdns packet failed: {:?}", err); - None - } + pub fn new_from_bytes( + buf: &[u8], + from: SocketAddr, + ) -> Result, dns_parser::Error> { + let packet = Packet::parse(buf)?; + + if !packet.header.query { + return Ok(Some(MdnsPacket::Response(MdnsResponse::new(packet, from)))); } + + if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) + { + return Ok(Some(MdnsPacket::Query(MdnsQuery { + from, + query_id: packet.header.id, + }))); + } + + if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) + { + // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? + return Ok(Some(MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery { + from, + query_id: packet.header.id, + }))); + } + + Ok(None) } } @@ -167,18 +165,45 @@ impl MdnsResponse { MdnsResponse { peers, from } } - /// Returns the list of peers that have been reported in this packet. - /// - /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. - pub fn discovered_peers(&self) -> impl Iterator { - self.peers.iter() + pub fn extract_discovered( + &self, + now: Instant, + local_peer_id: PeerId, + ) -> impl Iterator + '_ { + self.discovered_peers() + .filter(move |peer| peer.id() != &local_peer_id) + .flat_map(move |peer| { + let observed = self.observed_address(); + let new_expiration = now + peer.ttl(); + + peer.addresses().iter().filter_map(move |address| { + let new_addr = address_translation(address, &observed)?; + + Some((*peer.id(), new_addr, new_expiration)) + }) + }) } /// Source address of the packet. - #[inline] pub fn remote_addr(&self) -> &SocketAddr { &self.from } + + fn observed_address(&self) -> Multiaddr { + // We replace the IP address with the address we observe the + // remote as and the address they listen on. + let obs_ip = Protocol::from(self.remote_addr().ip()); + let obs_port = Protocol::Udp(self.remote_addr().port()); + + Multiaddr::empty().with(obs_ip).with(obs_port) + } + + /// Returns the list of peers that have been reported in this packet. + /// + /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. + fn discovered_peers(&self) -> impl Iterator { + self.peers.iter() + } } impl fmt::Debug for MdnsResponse {