diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index a4c7452a..87754497 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -120,7 +120,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events. let mut swarm = { - let mdns = Mdns::new().await?; + let mdns = Mdns::new(Default::default()).await?; let mut behaviour = MyBehaviour { floodsub: Floodsub::new(peer_id.clone()), mdns, @@ -175,4 +175,4 @@ async fn main() -> Result<(), Box> { } } } -} \ No newline at end of file +} diff --git a/examples/chat.rs b/examples/chat.rs index 67966e07..e7050da9 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -58,7 +58,7 @@ use libp2p::{ NetworkBehaviour, identity, floodsub::{self, Floodsub, FloodsubEvent}, - mdns::{Mdns, MdnsEvent}, + mdns::{Mdns, MdnsConfig, MdnsEvent}, swarm::NetworkBehaviourEventProcess }; use std::{error::Error, task::{Context, Poll}}; @@ -121,7 +121,7 @@ fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { - let mdns = task::block_on(Mdns::new())?; + let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?; let mut behaviour = MyBehaviour { floodsub: Floodsub::new(local_peer_id.clone()), mdns, diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 6d5df32f..412e933f 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -60,7 +60,7 @@ use libp2p::{ Swarm, build_development_transport, identity, - mdns::{Mdns, MdnsEvent}, + mdns::{Mdns, MdnsConfig, MdnsEvent}, swarm::NetworkBehaviourEventProcess }; use std::{error::Error, task::{Context, Poll}}; @@ -151,7 +151,7 @@ fn main() -> Result<(), Box> { // Create a Kademlia behaviour. let store = MemoryStore::new(local_peer_id.clone()); let kademlia = Kademlia::new(local_peer_id.clone(), store); - let mdns = task::block_on(Mdns::new())?; + let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?; let behaviour = MyBehaviour { kademlia, mdns }; Swarm::new(transport, behaviour, local_peer_id) }; diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index a8f4323a..774fc9e6 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -18,43 +18,42 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use async_std::task; -use libp2p::mdns::service::{MdnsPacket, MdnsService}; +use libp2p::{identity, mdns::{Mdns, MdnsConfig, MdnsEvent}, PeerId, Swarm}; use std::error::Error; -fn main() -> Result<(), Box> { - // This example provides passive discovery of the libp2p nodes on the - // network that send mDNS queries and answers. - task::block_on(async move { - let mut service = MdnsService::new().await?; - loop { - let (srv, packet) = service.next().await; - match packet { - MdnsPacket::Query(query) => { - // We detected a libp2p mDNS query on the network. In a real application, you - // probably want to answer this query by doing `query.respond(...)`. - println!("Detected query from {:?}", query.remote_addr()); - } - MdnsPacket::Response(response) => { - // We detected a libp2p mDNS response on the network. Responses are for - // everyone and not just for the requester, which makes it possible to - // passively listen. - for peer in response.discovered_peers() { - println!("Discovered peer {:?}", peer.id()); - // These are the self-reported addresses of the peer we just discovered. - for addr in peer.addresses() { - println!(" Address = {:?}", addr); - } - } - } - MdnsPacket::ServiceDiscovery(query) => { - // The last possibility is a service detection query from DNS-SD. - // Just like `Query`, in a real application you probably want to call - // `query.respond`. - println!("Detected service query from {:?}", query.remote_addr()); +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + // Create a random PeerId. + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(id_keys.public()); + println!("Local peer id: {:?}", peer_id); + + // Create a transport. + let transport = libp2p::build_development_transport(id_keys)?; + + // Create an MDNS network behaviour. + let behaviour = Mdns::new(MdnsConfig::default()).await?; + + // Create a Swarm that establishes connections through the given transport. + // Note that the MDNS behaviour itself will not actually inititiate any connections, + // as it only uses UDP. + let mut swarm = Swarm::new(transport, behaviour, peer_id); + Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + + loop { + match swarm.next().await { + MdnsEvent::Discovered(peers) => { + for (peer, addr) in peers { + println!("discovered {} {}", peer, addr); + } + } + MdnsEvent::Expired(expired) => { + for (peer, addr) in expired { + println!("expired {} {}", peer, addr); } } - service = srv } - }) + } } diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 86ee1faf..529e1337 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,5 +1,18 @@ # 0.29.0 [unreleased] +- Introduce `MdnsConfig` with configurable TTL of discovered peer + records and configurable multicast query interval. The default + query interval is increased from 20 seconds to 5 minutes, to + significantly reduce bandwidth usage. To ensure timely peer + discovery in the majority of cases, a multicast query is + initiated whenever a change on a network interface is detected, + which includes MDNS initialisation at node startup. If necessary + the MDNS query interval can be reduced via the `MdnsConfig`. + The `MdnsService` has been removed from the public API, making + it compulsory that all uses occur through the `Mdns` `NetworkBehaviour`. + An `MdnsConfig` must now be given to `Mdns::new()`. + [PR 1977](https://github.com/libp2p/rust-libp2p/pull/1977). + - Update `libp2p-swarm`. # 0.28.1 [2021-02-15] diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index bf6ea511..5b3b230a 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -10,21 +10,21 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-io = "1.3.0" -data-encoding = "2.3.1" +async-io = "1.3.1" +data-encoding = "2.3.2" dns-parser = "0.8.0" -futures = "0.3.8" -if-watch = "0.1.8" +futures = "0.3.13" +if-watch = "0.2.0" lazy_static = "1.4.0" libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" } -log = "0.4.11" -rand = "0.7.3" -smallvec = "1.5.0" -socket2 = { version = "0.3.17", features = ["reuseport"] } +log = "0.4.14" +rand = "0.8.3" +smallvec = "1.6.1" +socket2 = { version = "0.3.19", features = ["reuseport"] } void = "1.0.2" [dev-dependencies] -async-std = "1.7.0" +async-std = "1.9.0" if-addrs = "0.6.5" -tokio = { version = "1.0.1", default-features = false, features = ["rt", "rt-multi-thread"] } +tokio = { version = "1.2.0", default-features = false, features = ["rt", "rt-multi-thread"] } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index d066af31..19040f37 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -18,33 +18,80 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::service::{MdnsPacket, MdnsService, build_query_response, build_service_discovery_response}; -use async_io::Timer; +use crate::dns::{build_query, build_query_response, build_service_discovery_response}; +use crate::query::MdnsPacket; +use async_io::{Async, Timer}; use futures::prelude::*; +use if_watch::{IfEvent, IfWatcher}; +use lazy_static::lazy_static; use libp2p_core::{ - Multiaddr, - PeerId, - address_translation, - connection::ConnectionId, - multiaddr::Protocol + address_translation, connection::ConnectionId, multiaddr::Protocol, Multiaddr, PeerId, }; use libp2p_swarm::{ - NetworkBehaviour, - NetworkBehaviourAction, - PollParameters, - ProtocolsHandler, - protocols_handler::DummyProtocolsHandler + protocols_handler::DummyProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, ProtocolsHandler, }; use smallvec::SmallVec; -use std::{cmp, fmt, io, iter, mem, pin::Pin, time::{Duration, Instant}, task::Context, task::Poll}; +use socket2::{Domain, Socket, Type}; +use std::{ + cmp, + collections::VecDeque, + fmt, io, iter, + net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, + pin::Pin, + task::Context, + task::Poll, + time::{Duration, Instant}, +}; -const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60); +lazy_static! { + static ref IPV4_MDNS_MULTICAST_ADDRESS: SocketAddr = + SocketAddr::from((Ipv4Addr::new(224, 0, 0, 251), 5353)); +} + +pub struct MdnsConfig { + /// TTL to use for mdns records. + pub ttl: Duration, + /// Interval at which to poll the network for new peers. This isn't + /// necessary during normal operation but avoids the case that an + /// initial packet was lost and not discovering any peers until a new + /// peer joins the network. Receiving an mdns packet resets the timer + /// preventing unnecessary traffic. + pub query_interval: Duration, +} + +impl Default for MdnsConfig { + fn default() -> Self { + Self { + ttl: Duration::from_secs(6 * 60), + query_interval: Duration::from_secs(5 * 60), + } + } +} /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. +#[derive(Debug)] pub struct Mdns { - /// The inner service. - service: MdnsBusyWrapper, + /// Main socket for listening. + recv_socket: Async, + + /// Query socket for making queries. + send_socket: Async, + + /// Iface watcher. + if_watch: IfWatcher, + + /// Buffer used for receiving data from the main socket. + /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 + /// bytes, if it can be ensured that all participating devices can handle such large packets. + /// For computers with several interfaces and IP addresses responses can easily reach sizes in + /// the range of 3000 bytes, so 4096 seems sensible for now. For more information see + /// [rfc6762](https://tools.ietf.org/html/rfc6762#page-46). + recv_buffer: [u8; 4096], + + /// Buffers pending to send on the main socket. + send_buffer: VecDeque>, /// List of nodes that we have discovered, the address, and when their TTL expires. /// @@ -56,45 +103,55 @@ pub struct Mdns { /// /// `None` if `discovered_nodes` is empty. closest_expiration: Option, -} -/// `MdnsService::next` takes ownership of `self`, returning a future that resolves with both itself -/// and a `MdnsPacket` (similar to the old Tokio socket send style). The two states are thus `Free` -/// with an `MdnsService` or `Busy` with a future returning the original `MdnsService` and an -/// `MdnsPacket`. -enum MdnsBusyWrapper { - Free(MdnsService), - Busy(Pin + Send>>), - Poisoned, -} + /// Queued events. + events: VecDeque, -impl fmt::Debug for MdnsBusyWrapper { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Free(service) => { - fmt.debug_struct("MdnsBusyWrapper::Free") - .field("service", service) - .finish() - }, - Self::Busy(_) => { - fmt.debug_struct("MdnsBusyWrapper::Busy") - .finish() - } - Self::Poisoned => { - fmt.debug_struct("MdnsBusyWrapper::Poisoned") - .finish() - } - } - } + /// Discovery interval. + query_interval: Duration, + + /// Record ttl. + ttl: Duration, + + /// Discovery timer. + timeout: Timer, } impl Mdns { /// Builds a new `Mdns` behaviour. - pub async fn new() -> io::Result { + pub async fn new(config: MdnsConfig) -> io::Result { + let recv_socket = { + let socket = Socket::new( + Domain::ipv4(), + Type::dgram(), + Some(socket2::Protocol::udp()), + )?; + socket.set_reuse_address(true)?; + #[cfg(unix)] + socket.set_reuse_port(true)?; + socket.bind(&SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 5353).into())?; + let socket = socket.into_udp_socket(); + socket.set_multicast_loop_v4(true)?; + socket.set_multicast_ttl_v4(255)?; + Async::new(socket)? + }; + let send_socket = { + let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))?; + Async::new(socket)? + }; + let if_watch = if_watch::IfWatcher::new().await?; Ok(Self { - service: MdnsBusyWrapper::Free(MdnsService::new().await?), + recv_socket, + send_socket, + if_watch, + recv_buffer: [0; 4096], + send_buffer: Default::default(), discovered_nodes: SmallVec::new(), closest_expiration: None, + events: Default::default(), + query_interval: config.query_interval, + ttl: config.ttl, + timeout: Timer::interval(config.query_interval), }) } @@ -107,6 +164,77 @@ impl Mdns { pub fn discovered_nodes(&self) -> impl ExactSizeIterator { self.discovered_nodes.iter().map(|(p, _, _)| p) } + + fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) { + self.timeout.set_interval(self.query_interval); + match packet { + MdnsPacket::Query(query) => { + for packet in build_query_response( + query.query_id(), + *params.local_peer_id(), + params.listened_addresses(), + self.ttl, + ) { + self.send_buffer.push_back(packet); + } + } + MdnsPacket::Response(response) => { + // We replace the IP address with the address we observe the + // remote as and the address they listen on. + let obs_ip = Protocol::from(response.remote_addr().ip()); + let obs_port = Protocol::Udp(response.remote_addr().port()); + let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect(); + + let mut discovered: SmallVec<[_; 4]> = SmallVec::new(); + for peer in response.discovered_peers() { + if peer.id() == params.local_peer_id() { + continue; + } + + let new_expiration = Instant::now() + peer.ttl(); + + let mut addrs: Vec = Vec::new(); + for addr in peer.addresses() { + if let Some(new_addr) = address_translation(&addr, &observed) { + addrs.push(new_addr.clone()) + } + addrs.push(addr.clone()) + } + + for addr in addrs { + if let Some((_, _, cur_expires)) = self + .discovered_nodes + .iter_mut() + .find(|(p, a, _)| p == peer.id() && *a == addr) + { + *cur_expires = cmp::max(*cur_expires, new_expiration); + } else { + self.discovered_nodes + .push((*peer.id(), addr.clone(), new_expiration)); + } + discovered.push((*peer.id(), addr)); + } + } + + self.closest_expiration = self + .discovered_nodes + .iter() + .fold(None, |exp, &(_, _, elem_exp)| { + Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp)) + }) + .map(Timer::at); + + self.events + .push_back(MdnsEvent::Discovered(DiscoveredAddrsIter { + inner: discovered.into_iter(), + })); + } + MdnsPacket::ServiceDiscovery(disc) => { + let resp = build_service_discovery_response(disc.query_id(), self.ttl); + self.send_buffer.push_back(resp); + } + } + } } impl NetworkBehaviour for Mdns { @@ -149,138 +277,102 @@ impl NetworkBehaviour for Mdns { Self::OutEvent, >, > { - // Remove expired peers. - if let Some(ref mut closest_expiration) = self.closest_expiration { - match Pin::new(closest_expiration).poll(cx) { - Poll::Ready(now) => { - let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); - while let Some(pos) = self.discovered_nodes.iter().position(|(_, _, exp)| *exp < now) { - let (peer_id, addr, _) = self.discovered_nodes.remove(pos); - expired.push((peer_id, addr)); + while let Poll::Ready(event) = Pin::new(&mut self.if_watch).poll(cx) { + let multicast = From::from([224, 0, 0, 251]); + let socket = self.recv_socket.get_ref(); + match event { + Ok(IfEvent::Up(inet)) => { + if inet.addr().is_loopback() { + continue; } - - if !expired.is_empty() { - let event = MdnsEvent::Expired(ExpiredAddrsIter { - inner: expired.into_iter(), - }); - - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + if let IpAddr::V4(addr) = inet.addr() { + log::trace!("joining multicast on iface {}", addr); + if let Err(err) = socket.join_multicast_v4(&multicast, &addr) { + log::error!("join multicast failed: {}", err); + } else { + self.send_buffer.push_back(build_query()); + } } - }, - Poll::Pending => (), + } + Ok(IfEvent::Down(inet)) => { + if inet.addr().is_loopback() { + continue; + } + if let IpAddr::V4(addr) = inet.addr() { + log::trace!("leaving multicast on iface {}", addr); + if let Err(err) = socket.leave_multicast_v4(&multicast, &addr) { + log::error!("leave multicast failed: {}", err); + } + } + } + Err(err) => log::error!("if watch returned an error: {}", err), } } - - // Polling the mDNS service, and obtain the list of nodes discovered this round. - let discovered = loop { - let service = mem::replace(&mut self.service, MdnsBusyWrapper::Poisoned); - - let packet = match service { - MdnsBusyWrapper::Free(service) => { - self.service = MdnsBusyWrapper::Busy(Box::pin(service.next())); - continue; - }, - MdnsBusyWrapper::Busy(mut fut) => { - match fut.as_mut().poll(cx) { - Poll::Ready((service, packet)) => { - self.service = MdnsBusyWrapper::Free(service); - packet - }, - Poll::Pending => { - self.service = MdnsBusyWrapper::Busy(fut); - return Poll::Pending; - } + // Poll receive socket. + while self.recv_socket.poll_readable(cx).is_ready() { + match self + .recv_socket + .recv_from(&mut self.recv_buffer) + .now_or_never() + { + Some(Ok((len, from))) => { + if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) + { + self.inject_mdns_packet(packet, params); } - }, - MdnsBusyWrapper::Poisoned => panic!("Mdns poisoned"), - }; - - match packet { - MdnsPacket::Query(query) => { - // MaybeBusyMdnsService should always be Free. - if let MdnsBusyWrapper::Free(ref mut service) = self.service { - for packet in build_query_response( - query.query_id(), - *params.local_peer_id(), - params.listened_addresses(), - MDNS_RESPONSE_TTL, - ) { - service.enqueue_response(packet) - } - } else { debug_assert!(false); } - }, - MdnsPacket::Response(response) => { - // We replace the IP address with the address we observe the - // remote as and the address they listen on. - let obs_ip = Protocol::from(response.remote_addr().ip()); - let obs_port = Protocol::Udp(response.remote_addr().port()); - let observed: Multiaddr = iter::once(obs_ip) - .chain(iter::once(obs_port)) - .collect(); - - let mut discovered: SmallVec<[_; 4]> = SmallVec::new(); - for peer in response.discovered_peers() { - if peer.id() == params.local_peer_id() { - continue; - } - - let new_expiration = Instant::now() + peer.ttl(); - - let mut addrs: Vec = Vec::new(); - for addr in peer.addresses() { - if let Some(new_addr) = address_translation(&addr, &observed) { - addrs.push(new_addr.clone()) - } - addrs.push(addr.clone()) - } - - for addr in addrs { - if let Some((_, _, cur_expires)) = self.discovered_nodes.iter_mut() - .find(|(p, a, _)| p == peer.id() && *a == addr) - { - *cur_expires = cmp::max(*cur_expires, new_expiration); - } else { - self.discovered_nodes.push((*peer.id(), addr.clone(), new_expiration)); - } - - discovered.push((*peer.id(), addr)); - } - } - - break discovered; - }, - MdnsPacket::ServiceDiscovery(disc) => { - // MaybeBusyMdnsService should always be Free. - if let MdnsBusyWrapper::Free(ref mut service) = self.service { - let resp = build_service_discovery_response( - disc.query_id(), - MDNS_RESPONSE_TTL, - ); - service.enqueue_response(resp); - } else { debug_assert!(false); } - }, + } + Some(Err(err)) => log::error!("Failed reading datagram: {}", err), + _ => {} } - }; + } + if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { + self.send_buffer.push_back(build_query()); + } + // Send responses. + if !self.send_buffer.is_empty() { + while self.send_socket.poll_writable(cx).is_ready() { + if let Some(packet) = self.send_buffer.pop_front() { + match self + .send_socket + .send_to(&packet, *IPV4_MDNS_MULTICAST_ADDRESS) + .now_or_never() + { + Some(Ok(_)) => {} + Some(Err(err)) => log::error!("{}", err), + None => self.send_buffer.push_front(packet), + } + } else { + break; + } + } + } + // Emit discovered event. + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + // Emit expired event. + if let Some(ref mut closest_expiration) = self.closest_expiration { + if let Poll::Ready(now) = Pin::new(closest_expiration).poll(cx) { + let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); + while let Some(pos) = self + .discovered_nodes + .iter() + .position(|(_, _, exp)| *exp < now) + { + let (peer_id, addr, _) = self.discovered_nodes.remove(pos); + expired.push((peer_id, addr)); + } - // Getting this far implies that we discovered new nodes. As the final step, we need to - // refresh `closest_expiration`. - self.closest_expiration = self.discovered_nodes.iter() - .fold(None, |exp, &(_, _, elem_exp)| { - Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp)) - }) - .map(Timer::at); + if !expired.is_empty() { + let event = MdnsEvent::Expired(ExpiredAddrsIter { + inner: expired.into_iter(), + }); - Poll::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(DiscoveredAddrsIter { - inner: discovered.into_iter(), - }))) - } -} - -impl fmt::Debug for Mdns { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Mdns") - .field("service", &self.service) - .finish() + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + } + } + Poll::Pending } } @@ -299,7 +391,7 @@ pub enum MdnsEvent { /// Iterator that produces the list of addresses that have been discovered. pub struct DiscoveredAddrsIter { - inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]> + inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>, } impl Iterator for DiscoveredAddrsIter { @@ -316,19 +408,17 @@ impl Iterator for DiscoveredAddrsIter { } } -impl ExactSizeIterator for DiscoveredAddrsIter { -} +impl ExactSizeIterator for DiscoveredAddrsIter {} impl fmt::Debug for DiscoveredAddrsIter { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("DiscoveredAddrsIter") - .finish() + fmt.debug_struct("DiscoveredAddrsIter").finish() } } /// Iterator that produces the list of addresses that have expired. pub struct ExpiredAddrsIter { - inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]> + inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>, } impl Iterator for ExpiredAddrsIter { @@ -345,12 +435,10 @@ impl Iterator for ExpiredAddrsIter { } } -impl ExactSizeIterator for ExpiredAddrsIter { -} +impl ExactSizeIterator for ExpiredAddrsIter {} impl fmt::Debug for ExpiredAddrsIter { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("ExpiredAddrsIter") - .finish() + fmt.debug_struct("ExpiredAddrsIter").finish() } } diff --git a/protocols/mdns/src/dns.rs b/protocols/mdns/src/dns.rs index 771bf92a..e0645fea 100644 --- a/protocols/mdns/src/dns.rs +++ b/protocols/mdns/src/dns.rs @@ -264,7 +264,9 @@ fn append_u16(out: &mut Vec, value: u16) { /// be compatible with RFC 1035. fn segment_peer_id(peer_id: String) -> String { // Guard for the most common case - if peer_id.len() <= MAX_LABEL_LENGTH { return peer_id } + if peer_id.len() <= MAX_LABEL_LENGTH { + return peer_id; + } // This will only perform one allocation except in extreme circumstances. let mut out = String::with_capacity(peer_id.len() + 8); @@ -391,8 +393,10 @@ impl fmt::Display for MdnsResponseError { MdnsResponseError::TxtRecordTooLong => { write!(f, "TXT record invalid because it is too long") } - MdnsResponseError::NonAsciiMultiaddr => - write!(f, "A multiaddr contains non-ASCII characters when serialized"), + MdnsResponseError::NonAsciiMultiaddr => write!( + f, + "A multiaddr contains non-ASCII characters when serialized" + ), } } } @@ -414,7 +418,9 @@ mod tests { #[test] fn build_query_response_correct() { - let my_peer_id = identity::Keypair::generate_ed25519().public().into_peer_id(); + let my_peer_id = identity::Keypair::generate_ed25519() + .public() + .into_peer_id(); let addr1 = "/ip4/1.2.3.4/tcp/5000".parse().unwrap(); let addr2 = "/ip6/::1/udp/10000".parse().unwrap(); let packets = build_query_response( @@ -446,7 +452,10 @@ mod tests { assert_eq!(segment_peer_id(str_63.clone()), str_63); assert_eq!(segment_peer_id(str_64), [&str_63, "x"].join(".")); - assert_eq!(segment_peer_id(str_126), [&str_63, str_63.as_str()].join(".")); + assert_eq!( + segment_peer_id(str_126), + [&str_63, str_63.as_str()].join(".") + ); assert_eq!(segment_peer_id(str_127), [&str_63, &str_63, "x"].join(".")); } diff --git a/protocols/mdns/src/lib.rs b/protocols/mdns/src/lib.rs index 1d3ffa03..23806138 100644 --- a/protocols/mdns/src/lib.rs +++ b/protocols/mdns/src/lib.rs @@ -35,12 +35,8 @@ const SERVICE_NAME: &[u8] = b"_p2p._udp.local"; /// The meta query for looking up the `SERVICE_NAME`. const META_QUERY_SERVICE: &[u8] = b"_services._dns-sd._udp.local"; -pub use crate::{ - behaviour::{Mdns, MdnsEvent}, - service::MdnsService, -}; +pub use crate::behaviour::{Mdns, MdnsConfig, MdnsEvent}; mod behaviour; mod dns; - -pub mod service; +mod query; diff --git a/protocols/mdns/src/query.rs b/protocols/mdns/src/query.rs new file mode 100644 index 00000000..c605298a --- /dev/null +++ b/protocols/mdns/src/query.rs @@ -0,0 +1,305 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{dns, META_QUERY_SERVICE, SERVICE_NAME}; +use dns_parser::{Packet, RData}; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + PeerId, +}; +use std::{convert::TryFrom, fmt, net::SocketAddr, str, time::Duration}; + +/// A valid mDNS packet received by the service. +#[derive(Debug)] +pub enum MdnsPacket { + /// A query made by a remote. + Query(MdnsQuery), + /// A response sent by a remote in response to one of our queries. + Response(MdnsResponse), + /// A request for service discovery. + ServiceDiscovery(MdnsServiceDiscovery), +} + +impl MdnsPacket { + pub fn new_from_bytes(buf: &[u8], from: SocketAddr) -> Option { + match Packet::parse(buf) { + Ok(packet) => { + if packet.header.query { + if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) + { + let query = MdnsPacket::Query(MdnsQuery { + from, + query_id: packet.header.id, + }); + Some(query) + } else if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) + { + // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? + let discovery = MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery { + from, + query_id: packet.header.id, + }); + Some(discovery) + } else { + None + } + } else { + let resp = MdnsPacket::Response(MdnsResponse::new(packet, from)); + Some(resp) + } + } + Err(err) => { + log::debug!("Parsing mdns packet failed: {:?}", err); + None + } + } + } +} + +/// A received mDNS query. +pub struct MdnsQuery { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, +} + +impl MdnsQuery { + /// Source address of the packet. + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } + + /// Query id of the packet. + pub fn query_id(&self) -> u16 { + self.query_id + } +} + +impl fmt::Debug for MdnsQuery { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MdnsQuery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS service discovery query. +pub struct MdnsServiceDiscovery { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, +} + +impl MdnsServiceDiscovery { + /// Source address of the packet. + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } + + /// Query id of the packet. + pub fn query_id(&self) -> u16 { + self.query_id + } +} + +impl fmt::Debug for MdnsServiceDiscovery { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MdnsServiceDiscovery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS response. +pub struct MdnsResponse { + peers: Vec, + from: SocketAddr, +} + +impl MdnsResponse { + /// Creates a new `MdnsResponse` based on the provided `Packet`. + pub fn new(packet: Packet<'_>, from: SocketAddr) -> MdnsResponse { + let peers = packet + .answers + .iter() + .filter_map(|record| { + if record.name.to_string().as_bytes() != SERVICE_NAME { + return None; + } + + let record_value = match record.data { + RData::PTR(record) => record.0.to_string(), + _ => return None, + }; + + let mut peer_name = match record_value.rsplitn(4, |c| c == '.').last() { + Some(n) => n.to_owned(), + None => return None, + }; + + // if we have a segmented name, remove the '.' + peer_name.retain(|c| c != '.'); + + let peer_id = match data_encoding::BASE32_DNSCURVE.decode(peer_name.as_bytes()) { + Ok(bytes) => match PeerId::from_bytes(&bytes) { + Ok(id) => id, + Err(_) => return None, + }, + Err(_) => return None, + }; + + Some(MdnsPeer::new(&packet, record_value, peer_id, record.ttl)) + }) + .collect(); + + MdnsResponse { peers, from } + } + + /// Returns the list of peers that have been reported in this packet. + /// + /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. + pub fn discovered_peers(&self) -> impl Iterator { + self.peers.iter() + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl fmt::Debug for MdnsResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MdnsResponse") + .field("from", self.remote_addr()) + .finish() + } +} + +/// A peer discovered by the service. +pub struct MdnsPeer { + addrs: Vec, + /// Id of the peer. + peer_id: PeerId, + /// TTL of the record in seconds. + ttl: u32, +} + +impl MdnsPeer { + /// Creates a new `MdnsPeer` based on the provided `Packet`. + pub fn new( + packet: &Packet<'_>, + record_value: String, + my_peer_id: PeerId, + ttl: u32, + ) -> MdnsPeer { + let addrs = packet + .additional + .iter() + .filter_map(|add_record| { + if add_record.name.to_string() != record_value { + return None; + } + + if let RData::TXT(ref txt) = add_record.data { + Some(txt) + } else { + None + } + }) + .flat_map(|txt| txt.iter()) + .filter_map(|txt| { + // TODO: wrong, txt can be multiple character strings + let addr = match dns::decode_character_string(txt) { + Ok(a) => a, + Err(_) => return None, + }; + if !addr.starts_with(b"dnsaddr=") { + return None; + } + let addr = match str::from_utf8(&addr[8..]) { + Ok(a) => a, + Err(_) => return None, + }; + let mut addr = match addr.parse::() { + Ok(a) => a, + Err(_) => return None, + }; + match addr.pop() { + Some(Protocol::P2p(peer_id)) => { + if let Ok(peer_id) = PeerId::try_from(peer_id) { + if peer_id != my_peer_id { + return None; + } + } else { + return None; + } + } + _ => return None, + }; + Some(addr) + }) + .collect(); + + MdnsPeer { + addrs, + peer_id: my_peer_id, + ttl, + } + } + + /// Returns the id of the peer. + #[inline] + pub fn id(&self) -> &PeerId { + &self.peer_id + } + + /// Returns the requested time-to-live for the record. + #[inline] + pub fn ttl(&self) -> Duration { + Duration::from_secs(u64::from(self.ttl)) + } + + /// Returns the list of addresses the peer says it is listening on. + /// + /// Filters out invalid addresses. + pub fn addresses(&self) -> &Vec { + &self.addrs + } +} + +impl fmt::Debug for MdnsPeer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MdnsPeer") + .field("peer_id", &self.peer_id) + .finish() + } +} diff --git a/protocols/mdns/src/service.rs b/protocols/mdns/src/service.rs deleted file mode 100644 index 39852906..00000000 --- a/protocols/mdns/src/service.rs +++ /dev/null @@ -1,709 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::{SERVICE_NAME, META_QUERY_SERVICE, dns}; -use async_io::{Async, Timer}; -use dns_parser::{Packet, RData}; -use futures::{prelude::*, select}; -use if_watch::{IfEvent, IfWatcher}; -use lazy_static::lazy_static; -use libp2p_core::{multiaddr::{Multiaddr, Protocol}, PeerId}; -use log::debug; -use socket2::{Socket, Domain, Type}; -use std::{convert::TryFrom, fmt, io, net::{IpAddr, Ipv4Addr, UdpSocket, SocketAddr}, str, time::{Duration, Instant}}; - -pub use dns::{build_query_response, build_service_discovery_response}; - -lazy_static! { - static ref IPV4_MDNS_MULTICAST_ADDRESS: SocketAddr = SocketAddr::from(( - Ipv4Addr::new(224, 0, 0, 251), - 5353, - )); -} - -/// A running service that discovers libp2p peers and responds to other libp2p peers' queries on -/// the local network. -/// -/// # Usage -/// -/// In order to use mDNS to discover peers on the local network, use the `MdnsService`. This is -/// done by creating a `MdnsService` then polling it in the same way as you would poll a stream. -/// -/// Polling the `MdnsService` can produce either an `MdnsQuery`, corresponding to an mDNS query -/// received by another node on the local network, or an `MdnsResponse` corresponding to a response -/// to a query previously emitted locally. The `MdnsService` will automatically produce queries, -/// which means that you will receive responses automatically. -/// -/// When you receive an `MdnsQuery`, use the `respond` method to send back an answer to the node -/// that emitted the query. -/// -/// When you receive an `MdnsResponse`, use the provided methods to query the information received -/// in the response. -/// -/// # Example -/// -/// ```rust -/// # use futures::prelude::*; -/// # use futures::executor::block_on; -/// # use libp2p_core::{identity, Multiaddr, PeerId}; -/// # use libp2p_mdns::service::{MdnsPacket, build_query_response, build_service_discovery_response}; -/// # use std::{io, time::Duration, task::Poll}; -/// # fn main() { -/// # let my_peer_id = PeerId::from(identity::Keypair::generate_ed25519().public()); -/// # let my_listened_addrs: Vec = vec![]; -/// # async { -/// # let mut service = libp2p_mdns::service::MdnsService::new().await.unwrap(); -/// let _future_to_poll = async { -/// let (mut service, packet) = service.next().await; -/// -/// match packet { -/// MdnsPacket::Query(query) => { -/// println!("Query from {:?}", query.remote_addr()); -/// let packets = build_query_response( -/// query.query_id(), -/// my_peer_id.clone(), -/// vec![].into_iter(), -/// Duration::from_secs(120), -/// ); -/// for packet in packets { -/// service.enqueue_response(packet); -/// } -/// } -/// MdnsPacket::Response(response) => { -/// for peer in response.discovered_peers() { -/// println!("Discovered peer {:?}", peer.id()); -/// for addr in peer.addresses() { -/// println!("Address = {:?}", addr); -/// } -/// } -/// } -/// MdnsPacket::ServiceDiscovery(disc) => { -/// let resp = build_service_discovery_response( -/// disc.query_id(), -/// Duration::from_secs(120), -/// ); -/// service.enqueue_response(resp); -/// } -/// } -/// }; -/// # }; -/// # } -pub struct MdnsService { - /// Main socket for listening. - socket: Async, - - /// Socket for sending queries on the network. - query_socket: Async, - - /// Interval for sending queries. - query_interval: Timer, - /// Whether we send queries on the network at all. - /// Note that we still need to have an interval for querying, as we need to wake up the socket - /// regularly to recover from errors. Otherwise we could simply use an `Option`. - silent: bool, - /// Buffer used for receiving data from the main socket. - /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 - /// bytes, if it can be ensured that all participating devices can handle such large packets. - /// For computers with several interfaces and IP addresses responses can easily reach sizes in - /// the range of 3000 bytes, so 4096 seems sensible for now. For more information see - /// [rfc6762](https://tools.ietf.org/html/rfc6762#page-46). - recv_buffer: [u8; 4096], - /// Buffers pending to send on the main socket. - send_buffers: Vec>, - /// Buffers pending to send on the query socket. - query_send_buffers: Vec>, - /// Iface watch. - if_watch: IfWatcher, -} - -impl MdnsService { - /// Starts a new mDNS service. - pub async fn new() -> io::Result { - Self::new_inner(false).await - } - - /// Same as `new`, but we don't automatically send queries on the network. - pub async fn silent() -> io::Result { - Self::new_inner(true).await - } - - /// Starts a new mDNS service. - async fn new_inner(silent: bool) -> io::Result { - let socket = { - let socket = Socket::new(Domain::ipv4(), Type::dgram(), Some(socket2::Protocol::udp()))?; - socket.set_reuse_address(true)?; - #[cfg(unix)] - socket.set_reuse_port(true)?; - socket.bind(&SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 5353).into())?; - let socket = socket.into_udp_socket(); - socket.set_multicast_loop_v4(true)?; - socket.set_multicast_ttl_v4(255)?; - Async::new(socket)? - }; - - // Given that we pass an IP address to bind, which does not need to be resolved, we can - // use std::net::UdpSocket::bind, instead of its async counterpart from async-std. - let query_socket = { - let socket = std::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))?; - Async::new(socket)? - }; - - - let if_watch = if_watch::IfWatcher::new().await?; - - Ok(Self { - socket, - query_socket, - query_interval: Timer::interval_at(Instant::now(), Duration::from_secs(20)), - silent, - recv_buffer: [0; 4096], - send_buffers: Vec::new(), - query_send_buffers: Vec::new(), - if_watch, - }) - } - - pub fn enqueue_response(&mut self, rsp: Vec) { - self.send_buffers.push(rsp); - } - - /// Returns a future resolving to itself and the next received `MdnsPacket`. - // - // **Note**: Why does `next` take ownership of itself? - // - // `MdnsService::next` needs to be called from within `NetworkBehaviour` - // implementations. Given that traits cannot have async methods the - // respective `NetworkBehaviour` implementation needs to somehow keep the - // Future returned by `MdnsService::next` across classic `poll` - // invocations. The instance method `next` can either take a reference or - // ownership of itself: - // - // 1. Taking a reference - If `MdnsService::poll` takes a reference to - // `&self` the respective `NetworkBehaviour` implementation would need to - // keep both the Future as well as its `MdnsService` instance across poll - // invocations. Given that in this case the Future would have a reference - // to `MdnsService`, the `NetworkBehaviour` implementation struct would - // need to be self-referential which is not possible without unsafe code in - // Rust. - // - // 2. Taking ownership - Instead `MdnsService::next` takes ownership of - // self and returns it alongside an `MdnsPacket` once the actual future - // resolves, not forcing self-referential structures on the caller. - pub async fn next(mut self) -> (Self, MdnsPacket) { - loop { - // Flush the send buffer of the main socket. - while !self.send_buffers.is_empty() { - let to_send = self.send_buffers.remove(0); - - match self.socket.send_to(&to_send, *IPV4_MDNS_MULTICAST_ADDRESS).await { - Ok(bytes_written) => { - debug_assert_eq!(bytes_written, to_send.len()); - } - Err(_) => { - // Errors are non-fatal because they can happen for example if we lose - // connection to the network. - self.send_buffers.clear(); - break; - } - } - } - - // Flush the query send buffer. - while !self.query_send_buffers.is_empty() { - let to_send = self.query_send_buffers.remove(0); - - match self.query_socket.send_to(&to_send, *IPV4_MDNS_MULTICAST_ADDRESS).await { - Ok(bytes_written) => { - debug_assert_eq!(bytes_written, to_send.len()); - } - Err(_) => { - // Errors are non-fatal because they can happen for example if we lose - // connection to the network. - self.query_send_buffers.clear(); - break; - } - } - } - - select! { - res = self.socket.recv_from(&mut self.recv_buffer).fuse() => match res { - Ok((len, from)) => { - match MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { - Some(packet) => return (self, packet), - None => {}, - } - }, - Err(_) => { - // Errors are non-fatal and can happen if we get disconnected from the network. - // The query interval will wake up the task at some point so that we can try again. - }, - }, - _ = self.query_interval.next().fuse() => { - // Ensure underlying task is woken up on the next interval tick. - while let Some(_) = self.query_interval.next().now_or_never() {}; - - if !self.silent { - let query = dns::build_query(); - self.query_send_buffers.push(query.to_vec()); - } - }, - event = self.if_watch.next().fuse() => { - let multicast = From::from([224, 0, 0, 251]); - let socket = self.socket.get_ref(); - match event { - Ok(IfEvent::Up(inet)) => { - if inet.addr().is_loopback() { - continue; - } - if let IpAddr::V4(addr) = inet.addr() { - log::trace!("joining multicast on iface {}", addr); - if let Err(err) = socket.join_multicast_v4(&multicast, &addr) { - log::error!("join multicast failed: {}", err); - } - } - } - Ok(IfEvent::Down(inet)) => { - if inet.addr().is_loopback() { - continue; - } - if let IpAddr::V4(addr) = inet.addr() { - log::trace!("leaving multicast on iface {}", addr); - if let Err(err) = socket.leave_multicast_v4(&multicast, &addr) { - log::error!("leave multicast failed: {}", err); - } - } - } - Err(err) => log::error!("if watch returned an error: {}", err), - } - } - }; - } - } -} - -impl fmt::Debug for MdnsService { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("$service_name") - .field("silent", &self.silent) - .finish() - } -} - -/// A valid mDNS packet received by the service. -#[derive(Debug)] -pub enum MdnsPacket { - /// A query made by a remote. - Query(MdnsQuery), - /// A response sent by a remote in response to one of our queries. - Response(MdnsResponse), - /// A request for service discovery. - ServiceDiscovery(MdnsServiceDiscovery), -} - -impl MdnsPacket { - fn new_from_bytes(buf: &[u8], from: SocketAddr) -> Option { - match Packet::parse(buf) { - Ok(packet) => { - if packet.header.query { - if packet - .questions - .iter() - .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) - { - let query = MdnsPacket::Query(MdnsQuery { - from, - query_id: packet.header.id, - }); - Some(query) - } else if packet - .questions - .iter() - .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) - { - // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? - let discovery = MdnsPacket::ServiceDiscovery( - MdnsServiceDiscovery { - from, - query_id: packet.header.id, - }, - ); - Some(discovery) - } else { - None - } - } else { - let resp = MdnsPacket::Response(MdnsResponse::new ( - packet, - from, - )); - Some(resp) - } - } - Err(err) => { - debug!("Parsing mdns packet failed: {:?}", err); - None - } - } - } -} - -/// A received mDNS query. -pub struct MdnsQuery { - /// Sender of the address. - from: SocketAddr, - /// Id of the received DNS query. We need to pass this ID back in the results. - query_id: u16, -} - -impl MdnsQuery { - /// Source address of the packet. - pub fn remote_addr(&self) -> &SocketAddr { - &self.from - } - - /// Query id of the packet. - pub fn query_id(&self) -> u16 { - self.query_id - } -} - -impl fmt::Debug for MdnsQuery { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MdnsQuery") - .field("from", self.remote_addr()) - .field("query_id", &self.query_id) - .finish() - } -} - -/// A received mDNS service discovery query. -pub struct MdnsServiceDiscovery { - /// Sender of the address. - from: SocketAddr, - /// Id of the received DNS query. We need to pass this ID back in the results. - query_id: u16, -} - -impl MdnsServiceDiscovery { - /// Source address of the packet. - pub fn remote_addr(&self) -> &SocketAddr { - &self.from - } - - /// Query id of the packet. - pub fn query_id(&self) -> u16 { - self.query_id - } -} - -impl fmt::Debug for MdnsServiceDiscovery { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MdnsServiceDiscovery") - .field("from", self.remote_addr()) - .field("query_id", &self.query_id) - .finish() - } -} - -/// A received mDNS response. -pub struct MdnsResponse { - peers: Vec, - from: SocketAddr, -} - -impl MdnsResponse { - /// Creates a new `MdnsResponse` based on the provided `Packet`. - fn new(packet: Packet<'_>, from: SocketAddr) -> MdnsResponse { - let peers = packet.answers.iter().filter_map(|record| { - if record.name.to_string().as_bytes() != SERVICE_NAME { - return None; - } - - let record_value = match record.data { - RData::PTR(record) => record.0.to_string(), - _ => return None, - }; - - let mut peer_name = match record_value.rsplitn(4, |c| c == '.').last() { - Some(n) => n.to_owned(), - None => return None, - }; - - // if we have a segmented name, remove the '.' - peer_name.retain(|c| c != '.'); - - let peer_id = match data_encoding::BASE32_DNSCURVE.decode(peer_name.as_bytes()) { - Ok(bytes) => match PeerId::from_bytes(&bytes) { - Ok(id) => id, - Err(_) => return None, - }, - Err(_) => return None, - }; - - Some(MdnsPeer::new ( - &packet, - record_value, - peer_id, - record.ttl, - )) - }).collect(); - - MdnsResponse { - peers, - from, - } - } - - /// Returns the list of peers that have been reported in this packet. - /// - /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. - pub fn discovered_peers(&self) -> impl Iterator { - self.peers.iter() - } - - /// Source address of the packet. - #[inline] - pub fn remote_addr(&self) -> &SocketAddr { - &self.from - } -} - -impl fmt::Debug for MdnsResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MdnsResponse") - .field("from", self.remote_addr()) - .finish() - } -} - -/// A peer discovered by the service. -pub struct MdnsPeer { - addrs: Vec, - /// Id of the peer. - peer_id: PeerId, - /// TTL of the record in seconds. - ttl: u32, -} - -impl MdnsPeer { - /// Creates a new `MdnsPeer` based on the provided `Packet`. - pub fn new(packet: &Packet<'_>, record_value: String, my_peer_id: PeerId, ttl: u32) -> MdnsPeer { - let addrs = packet - .additional - .iter() - .filter_map(|add_record| { - if add_record.name.to_string() != record_value { - return None; - } - - if let RData::TXT(ref txt) = add_record.data { - Some(txt) - } else { - None - } - }) - .flat_map(|txt| txt.iter()) - .filter_map(|txt| { - // TODO: wrong, txt can be multiple character strings - let addr = match dns::decode_character_string(txt) { - Ok(a) => a, - Err(_) => return None, - }; - if !addr.starts_with(b"dnsaddr=") { - return None; - } - let addr = match str::from_utf8(&addr[8..]) { - Ok(a) => a, - Err(_) => return None, - }; - let mut addr = match addr.parse::() { - Ok(a) => a, - Err(_) => return None, - }; - match addr.pop() { - Some(Protocol::P2p(peer_id)) => { - if let Ok(peer_id) = PeerId::try_from(peer_id) { - if peer_id != my_peer_id { - return None; - } - } else { - return None; - } - }, - _ => return None, - }; - Some(addr) - }).collect(); - - MdnsPeer { - addrs, - peer_id: my_peer_id, - ttl, - } - } - - /// Returns the id of the peer. - #[inline] - pub fn id(&self) -> &PeerId { - &self.peer_id - } - - /// Returns the requested time-to-live for the record. - #[inline] - pub fn ttl(&self) -> Duration { - Duration::from_secs(u64::from(self.ttl)) - } - - /// Returns the list of addresses the peer says it is listening on. - /// - /// Filters out invalid addresses. - pub fn addresses(&self) -> &Vec { - &self.addrs - } -} - -impl fmt::Debug for MdnsPeer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MdnsPeer") - .field("peer_id", &self.peer_id) - .finish() - } -} - -#[cfg(test)] -mod tests { - macro_rules! testgen { - ($runtime_name:ident, $service_name:ty, $block_on_fn:tt) => { - mod $runtime_name { - use libp2p_core::{PeerId, multihash::{Code, MultihashDigest}}; - use std::time::Duration; - use crate::service::MdnsPacket; - - fn discover(peer_id: PeerId) { - let fut = async { - let mut service = <$service_name>::new().await.unwrap(); - - loop { - let next = service.next().await; - service = next.0; - - match next.1 { - MdnsPacket::Query(query) => { - let resp = crate::dns::build_query_response( - query.query_id(), - peer_id.clone(), - vec![].into_iter(), - Duration::from_secs(120), - ); - for r in resp { - service.enqueue_response(r); - } - } - MdnsPacket::Response(response) => { - for peer in response.discovered_peers() { - if peer.id() == &peer_id { - return; - } - } - } - MdnsPacket::ServiceDiscovery(_) => panic!( - "did not expect a service discovery packet", - ) - } - } - }; - - $block_on_fn(Box::pin(fut)); - } - - // As of today the underlying UDP socket is not stubbed out. Thus tests run in parallel to - // this unit tests inter fear with it. Test needs to be run in sequence to ensure test - // properties. - #[test] - fn respect_query_interval() { - let own_ips: Vec = if_addrs::get_if_addrs().unwrap() - .into_iter() - .map(|i| i.addr.ip()) - .collect(); - - let fut = async { - let mut service = <$service_name>::new().await.unwrap(); - - let mut sent_queries = vec![]; - - loop { - let next = service.next().await; - service = next.0; - - match next.1 { - MdnsPacket::Query(query) => { - // Ignore queries from other nodes. - let source_ip = query.remote_addr().ip(); - if !own_ips.contains(&source_ip) { - continue; - } - - sent_queries.push(query); - - if sent_queries.len() > 1 { - return; - } - } - // Ignore response packets. We don't stub out the UDP socket, thus this is - // either random noise from the network, or noise from other unit tests - // running in parallel. - MdnsPacket::Response(_) => {}, - MdnsPacket::ServiceDiscovery(_) => { - panic!("Did not expect a service discovery packet."); - }, - } - } - }; - - $block_on_fn(Box::pin(fut)); - } - - #[test] - fn discover_normal_peer_id() { - discover(PeerId::random()) - } - - #[test] - fn discover_long_peer_id() { - let max_value = String::from_utf8(vec![b'f'; 42]).unwrap(); - let hash = Code::Identity.digest(max_value.as_ref()); - discover(PeerId::from_multihash(hash).unwrap()) - } - } - } - } - - testgen!( - async_std, - crate::service::MdnsService, - (|fut| async_std::task::block_on::<_, ()>(fut)) - ); - - testgen!( - tokio, - crate::service::MdnsService, - (|fut| tokio::runtime::Runtime::new().unwrap().block_on::>(fut)) - ); -}