diff --git a/CHANGELOG.md b/CHANGELOG.md index cc3f4ebb..8cfa61ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,10 @@ - [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md) - [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md) +# Version 0.32.0 [unreleased] + +- Update to `libp2p-mdns-0.26`. + # Version 0.31.2 [2020-12-02] - Bump minimum `libp2p-core` patch version. diff --git a/Cargo.toml b/Cargo.toml index c275fa91..04f5dafd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p" edition = "2018" description = "Peer-to-peer networking library" -version = "0.31.2" +version = "0.32.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,7 +17,7 @@ default = [ "identify", "kad", "gossipsub", - "mdns-async-std", + "mdns", "mplex", "noise", "ping", @@ -37,8 +37,7 @@ floodsub = ["libp2p-floodsub"] identify = ["libp2p-identify"] kad = ["libp2p-kad"] gossipsub = ["libp2p-gossipsub"] -mdns-async-std = ["libp2p-mdns", "libp2p-mdns/async-std"] -mdns-tokio = ["libp2p-mdns", "libp2p-mdns/tokio"] +mdns = ["libp2p-mdns"] mplex = ["libp2p-mplex"] noise = ["libp2p-noise"] ping = ["libp2p-ping"] @@ -87,7 +86,7 @@ wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] libp2p-deflate = { version = "0.25.0", path = "protocols/deflate", optional = true } libp2p-dns = { version = "0.25.0", path = "transports/dns", optional = true } -libp2p-mdns = { version = "0.25.0", path = "protocols/mdns", optional = true } +libp2p-mdns = { version = "0.26.0", path = "protocols/mdns", optional = true } libp2p-tcp = { version = "0.25.1", path = "transports/tcp", optional = true } libp2p-websocket = { version = "0.26.0", path = "transports/websocket", optional = true } @@ -125,4 +124,4 @@ members = [ [[example]] name = "chat-tokio" -required-features = ["tcp-tokio", "mdns-tokio"] +required-features = ["tcp-tokio", "mdns"] diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 7d919dbb..15577ac6 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -46,8 +46,7 @@ use libp2p::{ core::upgrade, identity, floodsub::{self, Floodsub, FloodsubEvent}, - // `TokioMdns` is available through the `mdns-tokio` feature. - mdns::{TokioMdns, MdnsEvent}, + mdns::{Mdns, MdnsEvent}, mplex, noise, swarm::{NetworkBehaviourEventProcess, SwarmBuilder}, @@ -90,7 +89,7 @@ async fn main() -> Result<(), Box> { #[derive(NetworkBehaviour)] struct MyBehaviour { floodsub: Floodsub, - mdns: TokioMdns, + mdns: Mdns, } impl NetworkBehaviourEventProcess for MyBehaviour { @@ -122,7 +121,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events. let mut swarm = { - let mdns = TokioMdns::new()?; + let mdns = Mdns::new().await?; let mut behaviour = MyBehaviour { floodsub: Floodsub::new(peer_id.clone()), mdns, @@ -172,4 +171,4 @@ async fn main() -> Result<(), Box> { } } } -} \ No newline at end of file +} diff --git a/examples/chat.rs b/examples/chat.rs index ba82849d..67966e07 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -121,7 +121,7 @@ fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { - let mdns = Mdns::new()?; + let mdns = task::block_on(Mdns::new())?; let mut behaviour = MyBehaviour { floodsub: Floodsub::new(local_peer_id.clone()), mdns, diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 5be91d29..6d5df32f 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -151,7 +151,7 @@ fn main() -> Result<(), Box> { // Create a Kademlia behaviour. let store = MemoryStore::new(local_peer_id.clone()); let kademlia = Kademlia::new(local_peer_id.clone(), store); - let mdns = Mdns::new()?; + let mdns = task::block_on(Mdns::new())?; let behaviour = MyBehaviour { kademlia, mdns }; Swarm::new(transport, behaviour, local_peer_id) }; diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index b3b4ab78..a8f4323a 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -26,7 +26,7 @@ fn main() -> Result<(), Box> { // This example provides passive discovery of the libp2p nodes on the // network that send mDNS queries and answers. task::block_on(async move { - let mut service = MdnsService::new()?; + let mut service = MdnsService::new().await?; loop { let (srv, packet) = service.next().await; match packet { diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 3cfe9154..16c58ca0 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,3 +1,16 @@ +# 0.26.0 [unreleased] + +- Detect interface changes and join the MDNS multicast + group on all interfaces as they become available. + [PR 1830](https://github.com/libp2p/rust-libp2p/pull/1830). + +- Replace the use of macros for abstracting over `tokio` + and `async-std` with the use of `async-io`. As a result + there may now be an additional reactor thread running + called `async-io` when using `tokio`, with the futures + still being polled by the `tokio` runtime. + [PR 1830](https://github.com/libp2p/rust-libp2p/pull/1830). + # 0.25.0 [2020-11-25] - Update `libp2p-swarm` and `libp2p-core`. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 168a341e..d1af2fde 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-mdns" edition = "2018" -version = "0.25.0" +version = "0.26.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" @@ -10,22 +10,21 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-std = { version = "1.6.2", optional = true } -data-encoding = "2.0" -dns-parser = "0.8" -either = "1.5.3" -futures = "0.3.1" -lazy_static = "1.2" +async-io = "1.3.0" +data-encoding = "2.3.1" +dns-parser = "0.8.0" +futures = "0.3.8" +if-watch = "0.1.6" +lazy_static = "1.4.0" libp2p-core = { version = "0.25.0", path = "../../core" } libp2p-swarm = { version = "0.25.0", path = "../../swarm" } -log = "0.4" -net2 = "0.2" -rand = "0.7" -smallvec = "1.0" -tokio = { version = "0.3", default-features = false, features = ["net"], optional = true } -void = "1.0" -wasm-timer = "0.2.4" +log = "0.4.11" +rand = "0.7.3" +smallvec = "1.5.0" +socket2 = { version = "0.3.17", features = ["reuseport"] } +void = "1.0.2" [dev-dependencies] -if-addrs = "0.6.4" -tokio = { version = "0.3", default-features = false, features = ["rt", "rt-multi-thread"] } +async-std = "1.7.0" +if-addrs = "0.6.5" +tokio = { version = "0.3.4", default-features = false, features = ["rt", "rt-multi-thread"] } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index b9b5649d..f2e9ee52 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -18,7 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::service::{MdnsPacket, build_query_response, build_service_discovery_response}; +use crate::service::{MdnsPacket, MdnsService, build_query_response, build_service_discovery_response}; +use async_io::Timer; use futures::prelude::*; use libp2p_core::{ Multiaddr, @@ -34,21 +35,16 @@ use libp2p_swarm::{ ProtocolsHandler, protocols_handler::DummyProtocolsHandler }; -use log::warn; use smallvec::SmallVec; -use std::{cmp, fmt, io, iter, mem, pin::Pin, time::Duration, task::Context, task::Poll}; -use wasm_timer::{Delay, Instant}; +use std::{cmp, fmt, io, iter, mem, pin::Pin, time::{Duration, Instant}, task::Context, task::Poll}; const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60); -macro_rules! codegen { - ($feature_name:expr, $behaviour_name:ident, $maybe_busy_wrapper:ident, $service_name:ty) => { - /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. -pub struct $behaviour_name { +pub struct Mdns { /// The inner service. - service: $maybe_busy_wrapper, + service: MdnsBusyWrapper, /// List of nodes that we have discovered, the address, and when their TTL expires. /// @@ -59,44 +55,44 @@ pub struct $behaviour_name { /// Future that fires when the TTL of at least one node in `discovered_nodes` expires. /// /// `None` if `discovered_nodes` is empty. - closest_expiration: Option, + closest_expiration: Option, } /// `MdnsService::next` takes ownership of `self`, returning a future that resolves with both itself /// and a `MdnsPacket` (similar to the old Tokio socket send style). The two states are thus `Free` /// with an `MdnsService` or `Busy` with a future returning the original `MdnsService` and an /// `MdnsPacket`. -enum $maybe_busy_wrapper { - Free($service_name), - Busy(Pin + Send>>), +enum MdnsBusyWrapper { + Free(MdnsService), + Busy(Pin + Send>>), Poisoned, } -impl fmt::Debug for $maybe_busy_wrapper { +impl fmt::Debug for MdnsBusyWrapper { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - $maybe_busy_wrapper::Free(service) => { - fmt.debug_struct("$maybe_busy_wrapper::Free") + Self::Free(service) => { + fmt.debug_struct("MdnsBusyWrapper::Free") .field("service", service) .finish() }, - $maybe_busy_wrapper::Busy(_) => { - fmt.debug_struct("$maybe_busy_wrapper::Busy") + Self::Busy(_) => { + fmt.debug_struct("MdnsBusyWrapper::Busy") .finish() } - $maybe_busy_wrapper::Poisoned => { - fmt.debug_struct("$maybe_busy_wrapper::Poisoned") + Self::Poisoned => { + fmt.debug_struct("MdnsBusyWrapper::Poisoned") .finish() } } } } -impl $behaviour_name { +impl Mdns { /// Builds a new `Mdns` behaviour. - pub fn new() -> io::Result<$behaviour_name> { - Ok($behaviour_name { - service: $maybe_busy_wrapper::Free(<$service_name>::new()?), + pub async fn new() -> io::Result { + Ok(Self { + service: MdnsBusyWrapper::Free(MdnsService::new().await?), discovered_nodes: SmallVec::new(), closest_expiration: None, }) @@ -113,7 +109,7 @@ impl $behaviour_name { } } -impl NetworkBehaviour for $behaviour_name { +impl NetworkBehaviour for Mdns { type ProtocolsHandler = DummyProtocolsHandler; type OutEvent = MdnsEvent; @@ -138,9 +134,9 @@ impl NetworkBehaviour for $behaviour_name { &mut self, _: PeerId, _: ConnectionId, - _ev: ::OutEvent, + ev: ::OutEvent, ) { - void::unreachable(_ev) + void::unreachable(ev) } fn poll( @@ -155,9 +151,8 @@ impl NetworkBehaviour for $behaviour_name { > { // Remove expired peers. if let Some(ref mut closest_expiration) = self.closest_expiration { - match Future::poll(Pin::new(closest_expiration), cx) { - Poll::Ready(Ok(())) => { - let now = Instant::now(); + match Pin::new(closest_expiration).poll(cx) { + Poll::Ready(now) => { let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); while let Some(pos) = self.discovered_nodes.iter().position(|(_, _, exp)| *exp < now) { let (peer_id, addr, _) = self.discovered_nodes.remove(pos); @@ -173,38 +168,37 @@ impl NetworkBehaviour for $behaviour_name { } }, Poll::Pending => (), - Poll::Ready(Err(err)) => warn!("timer has errored: {:?}", err), } } // Polling the mDNS service, and obtain the list of nodes discovered this round. let discovered = loop { - let service = mem::replace(&mut self.service, $maybe_busy_wrapper::Poisoned); + let service = mem::replace(&mut self.service, MdnsBusyWrapper::Poisoned); let packet = match service { - $maybe_busy_wrapper::Free(service) => { - self.service = $maybe_busy_wrapper::Busy(Box::pin(service.next())); + MdnsBusyWrapper::Free(service) => { + self.service = MdnsBusyWrapper::Busy(Box::pin(service.next())); continue; }, - $maybe_busy_wrapper::Busy(mut fut) => { + MdnsBusyWrapper::Busy(mut fut) => { match fut.as_mut().poll(cx) { Poll::Ready((service, packet)) => { - self.service = $maybe_busy_wrapper::Free(service); + self.service = MdnsBusyWrapper::Free(service); packet }, Poll::Pending => { - self.service = $maybe_busy_wrapper::Busy(fut); + self.service = MdnsBusyWrapper::Busy(fut); return Poll::Pending; } } }, - $maybe_busy_wrapper::Poisoned => panic!("Mdns poisoned"), + MdnsBusyWrapper::Poisoned => panic!("Mdns poisoned"), }; match packet { MdnsPacket::Query(query) => { // MaybeBusyMdnsService should always be Free. - if let $maybe_busy_wrapper::Free(ref mut service) = self.service { + if let MdnsBusyWrapper::Free(ref mut service) = self.service { let resp = build_query_response( query.query_id(), params.local_peer_id().clone(), @@ -256,7 +250,7 @@ impl NetworkBehaviour for $behaviour_name { }, MdnsPacket::ServiceDiscovery(disc) => { // MaybeBusyMdnsService should always be Free. - if let $maybe_busy_wrapper::Free(ref mut service) = self.service { + if let MdnsBusyWrapper::Free(ref mut service) = self.service { let resp = build_service_discovery_response( disc.query_id(), MDNS_RESPONSE_TTL, @@ -273,7 +267,7 @@ impl NetworkBehaviour for $behaviour_name { .fold(None, |exp, &(_, _, elem_exp)| { Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp)) }) - .map(Delay::new_at); + .map(Timer::at); Poll::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(DiscoveredAddrsIter { inner: discovered.into_iter(), @@ -281,7 +275,7 @@ impl NetworkBehaviour for $behaviour_name { } } -impl fmt::Debug for $behaviour_name { +impl fmt::Debug for Mdns { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Mdns") .field("service", &self.service) @@ -289,15 +283,6 @@ impl fmt::Debug for $behaviour_name { } } -}; -} - -#[cfg(feature = "async-std")] -codegen!("async-std", Mdns, MaybeBusyMdnsService, crate::service::MdnsService); - -#[cfg(feature = "tokio")] -codegen!("tokio", TokioMdns, MaybeBusyTokioMdnsService, crate::service::TokioMdnsService); - /// Event that can be produced by the `Mdns` behaviour. #[derive(Debug)] pub enum MdnsEvent { diff --git a/protocols/mdns/src/dns.rs b/protocols/mdns/src/dns.rs index 4ac96584..81adcdc2 100644 --- a/protocols/mdns/src/dns.rs +++ b/protocols/mdns/src/dns.rs @@ -22,9 +22,7 @@ //! `dns_parser` library. use crate::{META_QUERY_SERVICE, SERVICE_NAME}; -use data_encoding; use libp2p_core::{Multiaddr, PeerId}; -use rand; use std::{borrow::Cow, cmp, error, fmt, str, time::Duration}; /// Maximum size of a DNS label as per RFC1035 @@ -226,7 +224,7 @@ fn segment_peer_id(peer_id: String) -> String { /// Combines and encodes a `PeerId` and service name for a DNS query. fn encode_peer_id(peer_id: &PeerId) -> Vec { - // DNS-safe encoding for the Peer ID + // DNS-safe encoding for the Peer ID let raw_peer_id = data_encoding::BASE32_DNSCURVE.encode(&peer_id.as_bytes()); // ensure we don't have any labels over 63 bytes long let encoded_peer_id = segment_peer_id(raw_peer_id); diff --git a/protocols/mdns/src/lib.rs b/protocols/mdns/src/lib.rs index 292bd01b..e8e152b9 100644 --- a/protocols/mdns/src/lib.rs +++ b/protocols/mdns/src/lib.rs @@ -35,12 +35,10 @@ const SERVICE_NAME: &[u8] = b"_p2p._udp.local"; /// Hardcoded name of the service used for DNS-SD. const META_QUERY_SERVICE: &[u8] = b"_services._dns-sd._udp.local"; -#[cfg(feature = "async-std")] -pub use self::{behaviour::Mdns, service::MdnsService}; -#[cfg(feature = "tokio")] -pub use self::{behaviour::TokioMdns, service::TokioMdnsService}; - -pub use self::behaviour::MdnsEvent; +pub use crate::{ + behaviour::{Mdns, MdnsEvent}, + service::MdnsService, +}; mod behaviour; mod dns; diff --git a/protocols/mdns/src/service.rs b/protocols/mdns/src/service.rs index 7b1a8de2..1b4dcfa7 100644 --- a/protocols/mdns/src/service.rs +++ b/protocols/mdns/src/service.rs @@ -19,14 +19,15 @@ // DEALINGS IN THE SOFTWARE. use crate::{SERVICE_NAME, META_QUERY_SERVICE, dns}; +use async_io::{Async, Timer}; use dns_parser::{Packet, RData}; -use either::Either::{Left, Right}; -use futures::{future, prelude::*}; +use futures::{prelude::*, select}; +use if_watch::{IfEvent, IfWatcher}; +use lazy_static::lazy_static; use libp2p_core::{multiaddr::{Multiaddr, Protocol}, PeerId}; use log::warn; -use std::{convert::TryFrom as _, fmt, io, net::Ipv4Addr, net::SocketAddr, str, time::{Duration, Instant}}; -use wasm_timer::Interval; -use lazy_static::lazy_static; +use socket2::{Socket, Domain, Type}; +use std::{convert::TryFrom, fmt, io, net::{IpAddr, Ipv4Addr, UdpSocket, SocketAddr}, str, time::{Duration, Instant}}; pub use dns::{MdnsResponseError, build_query_response, build_service_discovery_response}; @@ -37,9 +38,6 @@ lazy_static! { )); } -macro_rules! codegen { - ($feature_name:expr, $service_name:ident, $udp_socket:ty, $udp_socket_from_std:tt) => { - /// A running service that discovers libp2p peers and responds to other libp2p peers' queries on /// the local network. /// @@ -71,10 +69,7 @@ macro_rules! codegen { /// # let my_peer_id = PeerId::from(identity::Keypair::generate_ed25519().public()); /// # let my_listened_addrs: Vec = vec![]; /// # async { -/// # #[cfg(feature = "async-std")] -/// # let mut service = libp2p_mdns::service::MdnsService::new().unwrap(); -/// # #[cfg(feature = "tokio")] -/// # let mut service = libp2p_mdns::service::TokioMdnsService::new().unwrap(); +/// # let mut service = libp2p_mdns::service::MdnsService::new().await.unwrap(); /// let _future_to_poll = async { /// let (mut service, packet) = service.next().await; /// @@ -108,19 +103,18 @@ macro_rules! codegen { /// }; /// # }; /// # } -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -pub struct $service_name { +pub struct MdnsService { /// Main socket for listening. - socket: $udp_socket, + socket: Async, /// Socket for sending queries on the network. - query_socket: $udp_socket, + query_socket: Async, /// Interval for sending queries. - query_interval: Interval, + query_interval: Timer, /// Whether we send queries on the network at all. /// Note that we still need to have an interval for querying, as we need to wake up the socket - /// regularly to recover from errors. Otherwise we could simply use an `Option`. + /// regularly to recover from errors. Otherwise we could simply use an `Option`. silent: bool, /// Buffer used for receiving data from the main socket. /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 @@ -133,55 +127,54 @@ pub struct $service_name { send_buffers: Vec>, /// Buffers pending to send on the query socket. query_send_buffers: Vec>, + /// Iface watch. + if_watch: IfWatcher, } -impl $service_name { +impl MdnsService { /// Starts a new mDNS service. - pub fn new() -> io::Result<$service_name> { - Self::new_inner(false) + pub async fn new() -> io::Result { + Self::new_inner(false).await } /// Same as `new`, but we don't automatically send queries on the network. - pub fn silent() -> io::Result<$service_name> { - Self::new_inner(true) + pub async fn silent() -> io::Result { + Self::new_inner(true).await } /// Starts a new mDNS service. - fn new_inner(silent: bool) -> io::Result<$service_name> { - let std_socket = { + async fn new_inner(silent: bool) -> io::Result { + let socket = { + let socket = Socket::new(Domain::ipv4(), Type::dgram(), Some(socket2::Protocol::udp()))?; + socket.set_reuse_address(true)?; #[cfg(unix)] - fn platform_specific(s: &net2::UdpBuilder) -> io::Result<()> { - net2::unix::UnixUdpBuilderExt::reuse_port(s, true)?; - Ok(()) - } - #[cfg(not(unix))] - fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> { Ok(()) } - let builder = net2::UdpBuilder::new_v4()?; - builder.reuse_address(true)?; - platform_specific(&builder)?; - builder.bind(("0.0.0.0", 5353))? + socket.set_reuse_port(true)?; + socket.bind(&SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 5353).into())?; + let socket = socket.into_udp_socket(); + socket.set_multicast_loop_v4(true)?; + socket.set_multicast_ttl_v4(255)?; + Async::new(socket)? }; - let socket = $udp_socket_from_std(std_socket)?; // Given that we pass an IP address to bind, which does not need to be resolved, we can // use std::net::UdpSocket::bind, instead of its async counterpart from async-std. - let query_socket = $udp_socket_from_std( - std::net::UdpSocket::bind((Ipv4Addr::from([0u8, 0, 0, 0]), 0u16))?, - )?; + let query_socket = { + let socket = std::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))?; + Async::new(socket)? + }; - socket.set_multicast_loop_v4(true)?; - socket.set_multicast_ttl_v4(255)?; - // TODO: correct interfaces? - socket.join_multicast_v4(From::from([224, 0, 0, 251]), Ipv4Addr::UNSPECIFIED)?; - Ok($service_name { + let if_watch = if_watch::IfWatcher::new().await?; + + Ok(Self { socket, query_socket, - query_interval: Interval::new_at(Instant::now(), Duration::from_secs(20)), + query_interval: Timer::interval_at(Instant::now(), Duration::from_secs(20)), silent, recv_buffer: [0; 4096], send_buffers: Vec::new(), query_send_buffers: Vec::new(), + if_watch, }) } @@ -247,18 +240,8 @@ impl $service_name { } } - // Either (left) listen for incoming packets or (right) send query packets whenever the - // query interval fires. - let selected_output = match futures::future::select( - Box::pin(self.socket.recv_from(&mut self.recv_buffer)), - Box::pin(self.query_interval.next()), - ).await { - future::Either::Left((recved, _)) => Left(recved), - future::Either::Right(_) => Right(()), - }; - - match selected_output { - Left(left) => match left { + select! { + res = self.socket.recv_from(&mut self.recv_buffer).fuse() => match res { Ok((len, from)) => { match MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { Some(packet) => return (self, packet), @@ -270,7 +253,7 @@ impl $service_name { // The query interval will wake up the task at some point so that we can try again. }, }, - Right(_) => { + _ = self.query_interval.next().fuse() => { // Ensure underlying task is woken up on the next interval tick. while let Some(_) = self.query_interval.next().now_or_never() {}; @@ -278,13 +261,42 @@ impl $service_name { let query = dns::build_query(); self.query_send_buffers.push(query.to_vec()); } + }, + event = self.if_watch.next().fuse() => { + let multicast = From::from([224, 0, 0, 251]); + let socket = self.socket.get_ref(); + match event { + Ok(IfEvent::Up(inet)) => { + if inet.addr().is_loopback() { + continue; + } + if let IpAddr::V4(addr) = inet.addr() { + log::trace!("joining multicast on iface {}", addr); + if let Err(err) = socket.join_multicast_v4(&multicast, &addr) { + log::error!("join multicast failed: {}", err); + } + } + } + Ok(IfEvent::Down(inet)) => { + if inet.addr().is_loopback() { + continue; + } + if let IpAddr::V4(addr) = inet.addr() { + log::trace!("leaving multicast on iface {}", addr); + if let Err(err) = socket.leave_multicast_v4(&multicast, &addr) { + log::error!("leave multicast failed: {}", err); + } + } + } + Err(err) => log::error!("if watch returned an error: {}", err), + } } }; } } } -impl fmt::Debug for $service_name { +impl fmt::Debug for MdnsService { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("$service_name") .field("silent", &self.silent) @@ -292,17 +304,6 @@ impl fmt::Debug for $service_name { } } -}; -} - -#[cfg(feature = "async-std")] -codegen!("async-std", MdnsService, async_std::net::UdpSocket, (|socket| Ok::<_, std::io::Error>(async_std::net::UdpSocket::from(socket)))); - -// Note: Tokio's UdpSocket::from_std does not set the socket into non-blocking mode. -#[cfg(feature = "tokio")] -codegen!("tokio", TokioMdnsService, tokio::net::UdpSocket, (|socket: std::net::UdpSocket| { socket.set_nonblocking(true); tokio::net::UdpSocket::from_std(socket) })); - - /// A valid mDNS packet received by the service. #[derive(Debug)] pub enum MdnsPacket { @@ -595,7 +596,7 @@ mod tests { fn discover(peer_id: PeerId) { let fut = async { - let mut service = <$service_name>::new().unwrap(); + let mut service = <$service_name>::new().await.unwrap(); loop { let next = service.next().await; @@ -639,7 +640,7 @@ mod tests { .collect(); let fut = async { - let mut service = <$service_name>::new().unwrap(); + let mut service = <$service_name>::new().await.unwrap(); let mut sent_queries = vec![]; @@ -690,17 +691,15 @@ mod tests { } } - #[cfg(feature = "async-std")] testgen!( async_std, crate::service::MdnsService, (|fut| async_std::task::block_on::<_, ()>(fut)) ); - #[cfg(feature = "tokio")] testgen!( tokio, - crate::service::TokioMdnsService, + crate::service::MdnsService, (|fut| tokio::runtime::Runtime::new().unwrap().block_on::>(fut)) ); } diff --git a/src/lib.rs b/src/lib.rs index 8481a322..514c352d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -196,8 +196,8 @@ pub use libp2p_gossipsub as gossipsub; #[cfg_attr(docsrs, doc(cfg(feature = "mplex")))] #[doc(inline)] pub use libp2p_mplex as mplex; -#[cfg(any(feature = "mdns-async-std", feature = "mdns-tokio"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "mdns-async-std", feature = "mdns-tokio"))))] +#[cfg(feature = "mdns")] +#[cfg_attr(docsrs, doc(cfg(feature = "mdns")))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] pub use libp2p_mdns as mdns;