diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 4a763d6b..28837f89 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -267,11 +267,13 @@ where TBehaviour: NetworkBehaviour, } let behaviour_poll = { + let transport = self.raw_swarm.transport(); let mut parameters = PollParameters { topology: &mut self.topology, supported_protocols: &self.supported_protocols, listened_addrs: &self.listened_addrs, external_addresses: &self.external_addresses, + nat_traversal: &move |a, b| transport.nat_traversal(a, b), local_public_key: &self.local_public_key, local_peer_id: &self.raw_swarm.local_peer_id(), }; @@ -345,12 +347,13 @@ pub trait NetworkBehaviour { } /// Parameters passed to `poll()` that the `NetworkBehaviour` has access to. -#[derive(Debug)] +// TODO: #[derive(Debug)] pub struct PollParameters<'a, TTopology: 'a> { topology: &'a mut TTopology, supported_protocols: &'a [Vec], listened_addrs: &'a [Multiaddr], external_addresses: &'a [Multiaddr], + nat_traversal: &'a dyn Fn(&Multiaddr, &Multiaddr) -> Option, local_public_key: &'a PublicKey, local_peer_id: &'a PeerId, } @@ -398,6 +401,12 @@ impl<'a, TTopology> PollParameters<'a, TTopology> { pub fn local_peer_id(&self) -> &PeerId { self.local_peer_id } + + /// Calls the `nat_traversal` method on the underlying transport of the `Swarm`. + #[inline] + pub fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + (self.nat_traversal)(server, observed) + } } /// Action to perform. diff --git a/examples/chat.rs b/examples/chat.rs index 5218ea45..5c2cfae2 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -18,27 +18,36 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! A basic chat application demonstrating libp2p and the Floodsub protocol. +//! A basic chat application demonstrating libp2p and the mDNS and floodsub protocols. //! -//! Using two terminal windows, start two instances. Take note of the listening -//! address of the first instance and start the second with this address as the -//! first argument. In the first terminal window, run: -//! ```text -//! cargo run --example chat -//! ``` -//! It will print the PeerId and the listening address, e.g. `Listening on -//! "/ip4/0.0.0.0/tcp/24915"` -//! -//! In the second terminal window, start a new instance of the example with: -//! ```text -//! cargo run --example chat -- /ip4/127.0.0.1/tcp/24915 -//! ``` -//! The two nodes connect. Type a message in either terminal and hit return: the +//! Using two terminal windows, start two instances. If you local network allows mDNS, +//! they will automatically connect. Type a message in either terminal and hit return: the //! message is sent and printed in the other terminal. Close with Ctrl-c. //! //! You can of course open more terminal windows and add more participants. //! Dialing any of the other peers will propagate the new participant to all //! chat members and everyone will receive all messages. +//! +//! # If they don't automatically connect +//! +//! If the nodes don't automatically connect, take note of the listening address of the first +//! instance and start the second with this address as the first argument. In the first terminal +//! window, run: +//! +//! ```sh +//! cargo run --example chat +//! ``` +//! +//! It will print the PeerId and the listening address, e.g. `Listening on +//! "/ip4/0.0.0.0/tcp/24915"` +//! +//! In the second terminal window, start a new instance of the example with: +//! +//! ```sh +//! cargo run --example chat -- /ip4/127.0.0.1/tcp/24915 +//! ``` +//! +//! The two nodes then connect. extern crate env_logger; extern crate futures; @@ -47,7 +56,7 @@ extern crate tokio; use futures::prelude::*; use libp2p::{ - Transport, + NetworkBehaviour, Transport, core::upgrade::{self, OutboundUpgradeExt}, secio, mplex, @@ -74,10 +83,32 @@ fn main() { // Create a Floodsub topic let floodsub_topic = libp2p::floodsub::TopicBuilder::new("chat").build(); + // We create a custom network behaviour that combines floodsub and mDNS. + // In the future, we want to improve libp2p to make this easier to do. + #[derive(NetworkBehaviour)] + struct MyBehaviour { + #[behaviour(handler = "on_floodsub")] + floodsub: libp2p::floodsub::Floodsub, + mdns: libp2p::mdns::Mdns, + } + + impl MyBehaviour { + // Called when `floodsub` produces an event. + fn on_floodsub(&mut self, message: as libp2p::core::swarm::NetworkBehaviour>::OutEvent) + where TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite + { + println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source); + } + } + // Create a Swarm to manage peers and events let mut swarm = { - let mut behaviour = libp2p::floodsub::Floodsub::new(local_pub_key.clone().into_peer_id()); - behaviour.subscribe(floodsub_topic.clone()); + let mut behaviour = MyBehaviour { + floodsub: libp2p::floodsub::Floodsub::new(local_pub_key.clone().into_peer_id()), + mdns: libp2p::mdns::Mdns::new().expect("Failed to create mDNS service"), + }; + + behaviour.floodsub.subscribe(floodsub_topic.clone()); libp2p::Swarm::new(transport, behaviour, libp2p::core::topology::MemoryTopology::empty(), local_pub_key) }; @@ -85,7 +116,7 @@ fn main() { let addr = libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); println!("Listening on {:?}", addr); - // Reach out to another node + // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { let dialing = to_dial.clone(); match to_dial.parse() { @@ -107,7 +138,7 @@ fn main() { tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { loop { match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => swarm.publish(&floodsub_topic, line.as_bytes()), + Async::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), Async::Ready(None) => panic!("Stdin closed"), Async::NotReady => break, }; @@ -115,8 +146,8 @@ fn main() { loop { match swarm.poll().expect("Error while polling swarm") { - Async::Ready(Some(message)) => { - println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source); + Async::Ready(Some(_)) => { + }, Async::Ready(None) | Async::NotReady => break, } diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index 63a64502..2a17994b 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -24,7 +24,7 @@ extern crate rand; extern crate tokio; use futures::prelude::*; -use libp2p::mdns::{MdnsPacket, MdnsService}; +use libp2p::mdns::service::{MdnsPacket, MdnsService}; use std::io; fn main() { diff --git a/misc/mdns/Cargo.toml b/misc/mdns/Cargo.toml index 068c2c71..ac707a79 100644 --- a/misc/mdns/Cargo.toml +++ b/misc/mdns/Cargo.toml @@ -16,9 +16,12 @@ libp2p-core = { path = "../../core" } multiaddr = { package = "parity-multiaddr", path = "../multiaddr" } net2 = "0.2" rand = "0.6" +smallvec = "0.6" +tokio-io = "0.1" tokio-reactor = "0.1" tokio-timer = "0.2" tokio-udp = "0.1" +void = "1.0" [dev-dependencies] tokio = "0.1" diff --git a/misc/mdns/src/behaviour.rs b/misc/mdns/src/behaviour.rs new file mode 100644 index 00000000..5fee8652 --- /dev/null +++ b/misc/mdns/src/behaviour.rs @@ -0,0 +1,171 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::service::{MdnsService, MdnsPacket}; +use futures::prelude::*; +use libp2p_core::protocols_handler::{DummyProtocolsHandler, ProtocolsHandler}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p_core::{Multiaddr, PeerId, multiaddr::Protocol, topology::MemoryTopology}; +use smallvec::SmallVec; +use std::{fmt, io, iter, marker::PhantomData, time::Duration}; +use tokio_io::{AsyncRead, AsyncWrite}; +use void::{self, Void}; + +/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds +/// them to the topology. +pub struct Mdns { + /// The inner service. + service: MdnsService, + + /// If `Some`, then we automatically connect to nodes we discover and this is the list of nodes + /// to connect to. Drained in `poll()`. + /// If `None`, then we don't automatically connect. + to_connect_to: Option>, + + /// Marker to pin the generic. + marker: PhantomData, +} + +impl Mdns { + /// Builds a new `Mdns` behaviour. + pub fn new() -> io::Result> { + Ok(Mdns { + service: MdnsService::new()?, + to_connect_to: Some(SmallVec::new()), + marker: PhantomData, + }) + } +} + +/// Trait that must be implemented on the network topology for it to be usable with `Mdns`. +pub trait MdnsTopology { + /// Adds an address discovered by mDNS. + /// + /// Will never be called with the local peer ID. + fn add_mdns_discovered_address(&mut self, peer: PeerId, addr: Multiaddr); +} + +impl MdnsTopology for MemoryTopology { + #[inline] + fn add_mdns_discovered_address(&mut self, peer: PeerId, addr: Multiaddr) { + self.add_address(peer, addr) + } +} + +impl NetworkBehaviour for Mdns +where + TSubstream: AsyncRead + AsyncWrite, + TTopology: MdnsTopology, +{ + type ProtocolsHandler = DummyProtocolsHandler; + type OutEvent = Void; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + DummyProtocolsHandler::default() + } + + fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + + fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + + fn inject_node_event( + &mut self, + _: PeerId, + _ev: ::OutEvent, + ) { + void::unreachable(_ev) + } + + fn poll( + &mut self, + params: &mut PollParameters, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + loop { + if let Some(ref mut to_connect_to) = self.to_connect_to { + if !to_connect_to.is_empty() { + let peer_id = to_connect_to.remove(0); + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }); + } else { + to_connect_to.shrink_to_fit(); + } + } + + let event = match self.service.poll() { + Async::Ready(ev) => ev, + Async::NotReady => return Async::NotReady, + }; + + match event { + MdnsPacket::Query(query) => { + let _ = query.respond( + params.local_peer_id().clone(), + params.listened_addresses().cloned(), + Duration::from_secs(5 * 60) + ); + }, + MdnsPacket::Response(response) => { + // We perform a call to `nat_traversal()` with the address we observe the + // remote as and the address they listen on. + let obs_ip = Protocol::from(response.remote_addr().ip()); + let obs_port = Protocol::Udp(response.remote_addr().port()); + let observed: Multiaddr = iter::once(obs_ip) + .chain(iter::once(obs_port)) + .collect(); + + for peer in response.discovered_peers() { + if peer.id() == params.local_peer_id() { + continue; + } + + for addr in peer.addresses() { + let to_insert = if let Some(new_addr) = params.nat_traversal(&addr, &observed) { + new_addr + } else { + addr + }; + + params.topology().add_mdns_discovered_address(peer.id().clone(), to_insert); + } + + if let Some(ref mut to_connect_to) = self.to_connect_to { + to_connect_to.push(peer.id().clone()); + } + } + }, + MdnsPacket::ServiceDiscovery(disc) => { + disc.respond(Duration::from_secs(5 * 60)); + }, + } + } + } +} + +impl fmt::Debug for Mdns { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Mdns") + .field("service", &self.service) + .finish() + } +} diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs index b388fdcc..7f5fd59d 100644 --- a/misc/mdns/src/lib.rs +++ b/misc/mdns/src/lib.rs @@ -26,64 +26,9 @@ //! //! # Usage //! -//! In order to use mDNS to discover peers on the local network, use the `MdnsService`. This is -//! done by creating a `MdnsService` then polling it in the same way as you would poll a stream. +//! This crate provides the `Mdns` struct which implements the `NetworkBehaviour` trait. This +//! struct will automatically discover other libp2p nodes on the local network. //! -//! Polling the `MdnsService` can produce either an `MdnsQuery`, corresponding to an mDNS query -//! received by another node on the local network, or an `MdnsResponse` corresponding to a response -//! to a query previously emitted locally. The `MdnsService` will automatically produce queries, -//! which means that you will receive responses automatically. -//! -//! When you receive an `MdnsQuery`, use the `respond` method to send back an answer to the node -//! that emitted the query. -//! -//! When you receive an `MdnsResponse`, use the provided methods to query the information received -//! in the response. -//! -//! # Example -//! -//! ```rust -//! # extern crate futures; -//! # extern crate libp2p_core; -//! # extern crate libp2p_mdns; -//! # use futures::prelude::*; -//! # use libp2p_mdns::{MdnsService, MdnsPacket}; -//! # use std::{io, time::Duration}; -//! # fn main() { -//! # let my_peer_id = libp2p_core::PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id(); -//! # let my_listened_addrs = Vec::new(); -//! let mut service = MdnsService::new().expect("Error while creating mDNS service"); -//! let _future_to_poll = futures::stream::poll_fn(move || -> Poll, io::Error> { -//! loop { -//! let packet = match service.poll() { -//! Async::Ready(packet) => packet, -//! Async::NotReady => return Ok(Async::NotReady), -//! }; -//! -//! match packet { -//! MdnsPacket::Query(query) => { -//! println!("Query from {:?}", query.remote_addr()); -//! query.respond( -//! my_peer_id.clone(), -//! my_listened_addrs.clone(), -//! Duration::from_secs(120), -//! ); -//! } -//! MdnsPacket::Response(response) => { -//! for peer in response.discovered_peers() { -//! println!("Discovered peer {:?}", peer.id()); -//! for addr in peer.addresses() { -//! println!("Address = {:?}", addr); -//! } -//! } -//! } -//! MdnsPacket::ServiceDiscovery(query) => { -//! query.respond(std::time::Duration::from_secs(120)); -//! } -//! } -//! } -//! }).for_each(|_| Ok(())); -//! # } extern crate data_encoding; extern crate dns_parser; @@ -92,501 +37,25 @@ extern crate libp2p_core; extern crate multiaddr; extern crate net2; extern crate rand; +extern crate smallvec; +extern crate tokio_io; extern crate tokio_reactor; extern crate tokio_timer; extern crate tokio_udp; +extern crate void; #[cfg(test)] extern crate tokio; -use dns_parser::{Packet, RData}; -use futures::{prelude::*, task}; -use libp2p_core::{Multiaddr, PeerId}; -use multiaddr::Protocol; -use std::{fmt, io, net::Ipv4Addr, net::SocketAddr, str, time::Duration, time::Instant}; -use tokio_reactor::Handle; -use tokio_timer::Interval; -use tokio_udp::UdpSocket; - -pub use dns::MdnsResponseError; - -mod dns; - /// Hardcoded name of the mDNS service. Part of the mDNS libp2p specifications. const SERVICE_NAME: &'static [u8] = b"_p2p._udp.local"; /// Hardcoded name of the service used for DNS-SD. const META_QUERY_SERVICE: &'static [u8] = b"_services._dns-sd._udp.local"; -/// A running service that discovers libp2p peers and responds to other libp2p peers' queries on -/// the local network. -/// -/// See the crate root documentation for more info. -pub struct MdnsService { - /// Main socket for listening. - socket: UdpSocket, - /// Socket for sending queries on the network. - query_socket: UdpSocket, - /// Interval for sending queries. - query_interval: Interval, - /// Whether we send queries on the network at all. - /// Note that we still need to have an interval for querying, as we need to wake up the socket - /// regularly to recover from errors. Otherwise we could simply use an `Option`. - silent: bool, - /// Buffer used for receiving data from the main socket. - recv_buffer: [u8; 2048], - /// Buffers pending to send on the main socket. - send_buffers: Vec>, - /// Buffers pending to send on the query socket. - query_send_buffers: Vec>, -} +pub use self::behaviour::{Mdns, MdnsTopology}; +pub use self::service::MdnsService; -impl MdnsService { - /// Starts a new mDNS service. - #[inline] - pub fn new() -> io::Result { - Self::new_inner(false) - } +mod behaviour; +mod dns; - /// Same as `new`, but we don't send automatically send queries on the network. - #[inline] - pub fn silent() -> io::Result { - Self::new_inner(true) - } - - /// Starts a new mDNS service. - fn new_inner(silent: bool) -> io::Result { - let socket = { - #[cfg(unix)] - fn platform_specific(s: &net2::UdpBuilder) -> io::Result<()> { - net2::unix::UnixUdpBuilderExt::reuse_port(s, true)?; - Ok(()) - } - #[cfg(not(unix))] - fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> { Ok(()) } - let builder = net2::UdpBuilder::new_v4()?; - builder.reuse_address(true)?; - platform_specific(&builder)?; - builder.bind(("0.0.0.0", 5353))? - }; - - let socket = UdpSocket::from_std(socket, &Handle::default())?; - socket.set_multicast_loop_v4(true)?; - socket.set_multicast_ttl_v4(255)?; - // TODO: correct interfaces? - socket.join_multicast_v4(&From::from([224, 0, 0, 251]), &Ipv4Addr::UNSPECIFIED)?; - - Ok(MdnsService { - socket, - query_socket: UdpSocket::bind(&From::from(([0, 0, 0, 0], 0)))?, - query_interval: Interval::new(Instant::now(), Duration::from_secs(20)), - silent, - recv_buffer: [0; 2048], - send_buffers: Vec::new(), - query_send_buffers: Vec::new(), - }) - } - - /// Polls the service for packets. - pub fn poll(&mut self) -> Async { - // Send a query every time `query_interval` fires. - // Note that we don't use a loop here ; it is pretty unlikely that we need it, and there is - // no point in sending multiple requests in a row. - match self.query_interval.poll() { - Ok(Async::Ready(_)) => { - if !self.silent { - let query = dns::build_query(); - self.query_send_buffers.push(query.to_vec()); - } - } - Ok(Async::NotReady) => (), - _ => unreachable!("A tokio_timer::Interval never errors"), // TODO: is that true? - }; - - // Flush the send buffer of the main socket. - while !self.send_buffers.is_empty() { - let to_send = self.send_buffers.remove(0); - match self - .socket - .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) - { - Ok(Async::Ready(bytes_written)) => { - debug_assert_eq!(bytes_written, to_send.len()); - } - Ok(Async::NotReady) => { - self.send_buffers.insert(0, to_send); - break; - } - Err(_) => { - // Errors are non-fatal because they can happen for example if we lose - // connection to the network. - self.send_buffers.clear(); - break; - } - } - } - - // Flush the query send buffer. - // This has to be after the push to `query_send_buffers`. - while !self.query_send_buffers.is_empty() { - let to_send = self.query_send_buffers.remove(0); - match self - .query_socket - .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) - { - Ok(Async::Ready(bytes_written)) => { - debug_assert_eq!(bytes_written, to_send.len()); - } - Ok(Async::NotReady) => { - self.query_send_buffers.insert(0, to_send); - break; - } - Err(_) => { - // Errors are non-fatal because they can happen for example if we lose - // connection to the network. - self.query_send_buffers.clear(); - break; - } - } - } - - // Check for any incoming packet. - match self.socket.poll_recv_from(&mut self.recv_buffer) { - Ok(Async::Ready((len, from))) => { - match Packet::parse(&self.recv_buffer[..len]) { - Ok(packet) => { - if packet.header.query { - if packet - .questions - .iter() - .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) - { - return Async::Ready(MdnsPacket::Query(MdnsQuery { - from, - query_id: packet.header.id, - send_buffers: &mut self.send_buffers, - })); - } else if packet - .questions - .iter() - .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) - { - // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? - return Async::Ready(MdnsPacket::ServiceDiscovery( - MdnsServiceDiscovery { - from, - query_id: packet.header.id, - send_buffers: &mut self.send_buffers, - }, - )); - } else { - // Note that ideally we would use a loop instead. However as of the - // writing of this code non-lexical lifetimes haven't been merged - // yet, and I can't manage to write this code without having borrow - // issues. - task::current().notify(); - return Async::NotReady; - } - } else { - return Async::Ready(MdnsPacket::Response(MdnsResponse { - packet, - from, - })); - } - } - Err(_) => { - // Ignore errors while parsing the packet. We need to poll again for the - // next packet. - // Note that ideally we would use a loop instead. However as of the writing - // of this code non-lexical lifetimes haven't been merged yet, and I can't - // manage to write this code without having borrow issues. - task::current().notify(); - return Async::NotReady; - } - } - } - Ok(Async::NotReady) => (), - Err(_) => { - // Error are non-fatal and can happen if we get disconnected from example. - // The query interval will wake up the task at some point so that we can try again. - } - }; - - Async::NotReady - } -} - -/// A valid mDNS packet received by the service. -#[derive(Debug)] -pub enum MdnsPacket<'a> { - /// A query made by a remote. - Query(MdnsQuery<'a>), - /// A response sent by a remote in response to one of our queries. - Response(MdnsResponse<'a>), - /// A request for service discovery. - ServiceDiscovery(MdnsServiceDiscovery<'a>), -} - -/// A received mDNS query. -pub struct MdnsQuery<'a> { - /// Sender of the address. - from: SocketAddr, - /// Id of the received DNS query. We need to pass this ID back in the results. - query_id: u16, - /// Queue of pending buffers. - send_buffers: &'a mut Vec>, -} - -impl<'a> MdnsQuery<'a> { - /// Respond to the query. - /// - /// Pass the ID of the local peer, and the list o addresses we're listening on. - /// - /// If there are more than 2^16-1 addresses, ignores the others. - /// - /// > **Note**: Keep in mind that we will also receive this response in an `MdnsResponse`. - #[inline] - pub fn respond( - self, - peer_id: PeerId, - addresses: TAddresses, - ttl: Duration, - ) -> Result<(), MdnsResponseError> - where - TAddresses: IntoIterator, - TAddresses::IntoIter: ExactSizeIterator, - { - let response = - dns::build_query_response(self.query_id, peer_id, addresses.into_iter(), ttl)?; - self.send_buffers.push(response); - Ok(()) - } - - /// Source address of the packet. - #[inline] - pub fn remote_addr(&self) -> &SocketAddr { - &self.from - } -} - -impl<'a> fmt::Debug for MdnsQuery<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("MdnsQuery") - .field("from", self.remote_addr()) - .field("query_id", &self.query_id) - .finish() - } -} - -/// A received mDNS service discovery query. -pub struct MdnsServiceDiscovery<'a> { - /// Sender of the address. - from: SocketAddr, - /// Id of the received DNS query. We need to pass this ID back in the results. - query_id: u16, - /// Queue of pending buffers. - send_buffers: &'a mut Vec>, -} - -impl<'a> MdnsServiceDiscovery<'a> { - /// Respond to the query. - #[inline] - pub fn respond(self, ttl: Duration) { - let response = dns::build_service_discovery_response(self.query_id, ttl); - self.send_buffers.push(response); - } - - /// Source address of the packet. - #[inline] - pub fn remote_addr(&self) -> &SocketAddr { - &self.from - } -} - -impl<'a> fmt::Debug for MdnsServiceDiscovery<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("MdnsServiceDiscovery") - .field("from", self.remote_addr()) - .field("query_id", &self.query_id) - .finish() - } -} - -/// A received mDNS response. -pub struct MdnsResponse<'a> { - packet: Packet<'a>, - from: SocketAddr, -} - -impl<'a> MdnsResponse<'a> { - /// Returns the list of peers that have been reported in this packet. - /// - /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. - pub fn discovered_peers<'b>(&'b self) -> impl Iterator> { - let packet = &self.packet; - self.packet.answers.iter().filter_map(move |record| { - if record.name.to_string().as_bytes() != SERVICE_NAME { - return None; - } - - let record_value = match record.data { - RData::PTR(record) => record.0.to_string(), - _ => return None, - }; - - let peer_name = { - let mut iter = record_value.splitn(2, |c| c == '.'); - let name = match iter.next() { - Some(n) => n.to_owned(), - None => return None, - }; - if iter.next().map(|v| v.as_bytes()) != Some(SERVICE_NAME) { - return None; - } - name - }; - - let peer_id = match data_encoding::BASE32_DNSCURVE.decode(peer_name.as_bytes()) { - Ok(bytes) => match PeerId::from_bytes(bytes) { - Ok(id) => id, - Err(_) => return None, - }, - Err(_) => return None, - }; - - Some(MdnsPeer { - packet, - record_value, - peer_id, - }) - }) - } - - /// Source address of the packet. - #[inline] - pub fn remote_addr(&self) -> &SocketAddr { - &self.from - } -} - -impl<'a> fmt::Debug for MdnsResponse<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("MdnsResponse") - .field("from", self.remote_addr()) - .finish() - } -} - -/// A peer discovered by the service. -pub struct MdnsPeer<'a> { - /// The original packet ; will be used to determine the addresses. - packet: &'a Packet<'a>, - /// Cached value of `concat(base32(peer_id), service name)`. - record_value: String, - /// Id of the peer. - peer_id: PeerId, -} - -impl<'a> MdnsPeer<'a> { - /// Returns the id of the peer. - #[inline] - pub fn id(&self) -> &PeerId { - &self.peer_id - } - - /// Returns the list of addresses the peer says it is listening on. - /// - /// Filters out invalid addresses. - pub fn addresses<'b>(&'b self) -> impl Iterator + 'b { - let my_peer_id = &self.peer_id; - let record_value = &self.record_value; - self.packet - .additional - .iter() - .filter_map(move |add_record| { - if &add_record.name.to_string() != record_value { - return None; - } - - if let RData::TXT(ref txt) = add_record.data { - Some(txt) - } else { - None - } - }) - .flat_map(|txt| txt.iter()) - .filter_map(move |txt| { - // TODO: wrong, txt can be multiple character strings - let addr = match dns::decode_character_string(txt) { - Ok(a) => a, - Err(_) => return None, - }; - if !addr.starts_with(b"dnsaddr=") { - return None; - } - let addr = match str::from_utf8(&addr[8..]) { - Ok(a) => a, - Err(_) => return None, - }; - let mut addr = match addr.parse::() { - Ok(a) => a, - Err(_) => return None, - }; - match addr.pop() { - Some(Protocol::P2p(ref peer_id)) if peer_id == my_peer_id => (), - _ => return None, - }; - Some(addr) - }) - } -} - -impl<'a> fmt::Debug for MdnsPeer<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("MdnsPeer") - .field("peer_id", &self.peer_id) - .finish() - } -} - -#[cfg(test)] -mod tests { - use libp2p_core::PublicKey; - use std::{io, time::Duration}; - use tokio::{self, prelude::*}; - use {MdnsPacket, MdnsService}; - - #[test] - fn discover_ourselves() { - let mut service = MdnsService::new().unwrap(); - let peer_id = - PublicKey::Rsa((0..32).map(|_| rand::random::()).collect()).into_peer_id(); - let stream = stream::poll_fn(move || -> Poll, io::Error> { - loop { - let packet = match service.poll() { - Async::Ready(packet) => packet, - Async::NotReady => return Ok(Async::NotReady), - }; - - match packet { - MdnsPacket::Query(query) => { - query.respond(peer_id.clone(), None, Duration::from_secs(120)).unwrap(); - } - MdnsPacket::Response(response) => { - for peer in response.discovered_peers() { - if peer.id() == &peer_id { - return Ok(Async::Ready(None)); - } - } - } - MdnsPacket::ServiceDiscovery(_) => {} - } - } - }); - - tokio::run( - stream - .map_err(|err| panic!("{:?}", err)) - .for_each(|_| Ok(())), - ); - } -} +pub mod service; diff --git a/misc/mdns/src/service.rs b/misc/mdns/src/service.rs new file mode 100644 index 00000000..8fd093c6 --- /dev/null +++ b/misc/mdns/src/service.rs @@ -0,0 +1,586 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate data_encoding; +extern crate dns_parser; +extern crate futures; +extern crate libp2p_core; +extern crate multiaddr; +extern crate net2; +extern crate rand; +extern crate tokio_reactor; +extern crate tokio_timer; +extern crate tokio_udp; + +#[cfg(test)] +extern crate tokio; + +use crate::{SERVICE_NAME, META_QUERY_SERVICE, dns}; +use dns_parser::{Packet, RData}; +use futures::{prelude::*, task}; +use libp2p_core::{Multiaddr, PeerId}; +use multiaddr::Protocol; +use std::{fmt, io, net::Ipv4Addr, net::SocketAddr, str, time::Duration, time::Instant}; +use tokio_reactor::Handle; +use tokio_timer::Interval; +use tokio_udp::UdpSocket; + +pub use dns::MdnsResponseError; + +/// A running service that discovers libp2p peers and responds to other libp2p peers' queries on +/// the local network. +/// +/// # Usage +/// +/// In order to use mDNS to discover peers on the local network, use the `MdnsService`. This is +/// done by creating a `MdnsService` then polling it in the same way as you would poll a stream. +/// +/// Polling the `MdnsService` can produce either an `MdnsQuery`, corresponding to an mDNS query +/// received by another node on the local network, or an `MdnsResponse` corresponding to a response +/// to a query previously emitted locally. The `MdnsService` will automatically produce queries, +/// which means that you will receive responses automatically. +/// +/// When you receive an `MdnsQuery`, use the `respond` method to send back an answer to the node +/// that emitted the query. +/// +/// When you receive an `MdnsResponse`, use the provided methods to query the information received +/// in the response. +/// +/// # Example +/// +/// ```rust +/// # extern crate futures; +/// # extern crate libp2p_core; +/// # extern crate libp2p_mdns; +/// # use futures::prelude::*; +/// # use libp2p_mdns::service::{MdnsService, MdnsPacket}; +/// # use std::{io, time::Duration}; +/// # fn main() { +/// # let my_peer_id = libp2p_core::PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id(); +/// # let my_listened_addrs = Vec::new(); +/// let mut service = MdnsService::new().expect("Error while creating mDNS service"); +/// let _future_to_poll = futures::stream::poll_fn(move || -> Poll, io::Error> { +/// loop { +/// let packet = match service.poll() { +/// Async::Ready(packet) => packet, +/// Async::NotReady => return Ok(Async::NotReady), +/// }; +/// +/// match packet { +/// MdnsPacket::Query(query) => { +/// println!("Query from {:?}", query.remote_addr()); +/// query.respond( +/// my_peer_id.clone(), +/// my_listened_addrs.clone(), +/// Duration::from_secs(120), +/// ); +/// } +/// MdnsPacket::Response(response) => { +/// for peer in response.discovered_peers() { +/// println!("Discovered peer {:?}", peer.id()); +/// for addr in peer.addresses() { +/// println!("Address = {:?}", addr); +/// } +/// } +/// } +/// MdnsPacket::ServiceDiscovery(query) => { +/// query.respond(std::time::Duration::from_secs(120)); +/// } +/// } +/// } +/// }).for_each(|_| Ok(())); +/// # } +pub struct MdnsService { + /// Main socket for listening. + socket: UdpSocket, + /// Socket for sending queries on the network. + query_socket: UdpSocket, + /// Interval for sending queries. + query_interval: Interval, + /// Whether we send queries on the network at all. + /// Note that we still need to have an interval for querying, as we need to wake up the socket + /// regularly to recover from errors. Otherwise we could simply use an `Option`. + silent: bool, + /// Buffer used for receiving data from the main socket. + recv_buffer: [u8; 2048], + /// Buffers pending to send on the main socket. + send_buffers: Vec>, + /// Buffers pending to send on the query socket. + query_send_buffers: Vec>, +} + +impl MdnsService { + /// Starts a new mDNS service. + #[inline] + pub fn new() -> io::Result { + Self::new_inner(false) + } + + /// Same as `new`, but we don't send automatically send queries on the network. + #[inline] + pub fn silent() -> io::Result { + Self::new_inner(true) + } + + /// Starts a new mDNS service. + fn new_inner(silent: bool) -> io::Result { + let socket = { + #[cfg(unix)] + fn platform_specific(s: &net2::UdpBuilder) -> io::Result<()> { + net2::unix::UnixUdpBuilderExt::reuse_port(s, true)?; + Ok(()) + } + #[cfg(not(unix))] + fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> { Ok(()) } + let builder = net2::UdpBuilder::new_v4()?; + builder.reuse_address(true)?; + platform_specific(&builder)?; + builder.bind(("0.0.0.0", 5353))? + }; + + let socket = UdpSocket::from_std(socket, &Handle::default())?; + socket.set_multicast_loop_v4(true)?; + socket.set_multicast_ttl_v4(255)?; + // TODO: correct interfaces? + socket.join_multicast_v4(&From::from([224, 0, 0, 251]), &Ipv4Addr::UNSPECIFIED)?; + + Ok(MdnsService { + socket, + query_socket: UdpSocket::bind(&From::from(([0, 0, 0, 0], 0)))?, + query_interval: Interval::new(Instant::now(), Duration::from_secs(20)), + silent, + recv_buffer: [0; 2048], + send_buffers: Vec::new(), + query_send_buffers: Vec::new(), + }) + } + + /// Polls the service for packets. + pub fn poll(&mut self) -> Async { + // Send a query every time `query_interval` fires. + // Note that we don't use a loop here ; it is pretty unlikely that we need it, and there is + // no point in sending multiple requests in a row. + match self.query_interval.poll() { + Ok(Async::Ready(_)) => { + if !self.silent { + let query = dns::build_query(); + self.query_send_buffers.push(query.to_vec()); + } + } + Ok(Async::NotReady) => (), + _ => unreachable!("A tokio_timer::Interval never errors"), // TODO: is that true? + }; + + // Flush the send buffer of the main socket. + while !self.send_buffers.is_empty() { + let to_send = self.send_buffers.remove(0); + match self + .socket + .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) + { + Ok(Async::Ready(bytes_written)) => { + debug_assert_eq!(bytes_written, to_send.len()); + } + Ok(Async::NotReady) => { + self.send_buffers.insert(0, to_send); + break; + } + Err(_) => { + // Errors are non-fatal because they can happen for example if we lose + // connection to the network. + self.send_buffers.clear(); + break; + } + } + } + + // Flush the query send buffer. + // This has to be after the push to `query_send_buffers`. + while !self.query_send_buffers.is_empty() { + let to_send = self.query_send_buffers.remove(0); + match self + .query_socket + .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) + { + Ok(Async::Ready(bytes_written)) => { + debug_assert_eq!(bytes_written, to_send.len()); + } + Ok(Async::NotReady) => { + self.query_send_buffers.insert(0, to_send); + break; + } + Err(_) => { + // Errors are non-fatal because they can happen for example if we lose + // connection to the network. + self.query_send_buffers.clear(); + break; + } + } + } + + // Check for any incoming packet. + match self.socket.poll_recv_from(&mut self.recv_buffer) { + Ok(Async::Ready((len, from))) => { + match Packet::parse(&self.recv_buffer[..len]) { + Ok(packet) => { + if packet.header.query { + if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) + { + return Async::Ready(MdnsPacket::Query(MdnsQuery { + from, + query_id: packet.header.id, + send_buffers: &mut self.send_buffers, + })); + } else if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) + { + // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? + return Async::Ready(MdnsPacket::ServiceDiscovery( + MdnsServiceDiscovery { + from, + query_id: packet.header.id, + send_buffers: &mut self.send_buffers, + }, + )); + } else { + // Note that ideally we would use a loop instead. However as of the + // writing of this code non-lexical lifetimes haven't been merged + // yet, and I can't manage to write this code without having borrow + // issues. + task::current().notify(); + return Async::NotReady; + } + } else { + return Async::Ready(MdnsPacket::Response(MdnsResponse { + packet, + from, + })); + } + } + Err(_) => { + // Ignore errors while parsing the packet. We need to poll again for the + // next packet. + // Note that ideally we would use a loop instead. However as of the writing + // of this code non-lexical lifetimes haven't been merged yet, and I can't + // manage to write this code without having borrow issues. + task::current().notify(); + return Async::NotReady; + } + } + } + Ok(Async::NotReady) => (), + Err(_) => { + // Error are non-fatal and can happen if we get disconnected from example. + // The query interval will wake up the task at some point so that we can try again. + } + }; + + Async::NotReady + } +} + +impl fmt::Debug for MdnsService { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("MdnsService") + .field("silent", &self.silent) + .finish() + } +} + +/// A valid mDNS packet received by the service. +#[derive(Debug)] +pub enum MdnsPacket<'a> { + /// A query made by a remote. + Query(MdnsQuery<'a>), + /// A response sent by a remote in response to one of our queries. + Response(MdnsResponse<'a>), + /// A request for service discovery. + ServiceDiscovery(MdnsServiceDiscovery<'a>), +} + +/// A received mDNS query. +pub struct MdnsQuery<'a> { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, + /// Queue of pending buffers. + send_buffers: &'a mut Vec>, +} + +impl<'a> MdnsQuery<'a> { + /// Respond to the query. + /// + /// Pass the ID of the local peer, and the list of addresses we're listening on. + /// + /// If there are more than 2^16-1 addresses, ignores the others. + /// + /// > **Note**: Keep in mind that we will also receive this response in an `MdnsResponse`. + #[inline] + pub fn respond( + self, + peer_id: PeerId, + addresses: TAddresses, + ttl: Duration, + ) -> Result<(), MdnsResponseError> + where + TAddresses: IntoIterator, + TAddresses::IntoIter: ExactSizeIterator, + { + let response = + dns::build_query_response(self.query_id, peer_id, addresses.into_iter(), ttl)?; + self.send_buffers.push(response); + Ok(()) + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsQuery<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsQuery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS service discovery query. +pub struct MdnsServiceDiscovery<'a> { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, + /// Queue of pending buffers. + send_buffers: &'a mut Vec>, +} + +impl<'a> MdnsServiceDiscovery<'a> { + /// Respond to the query. + #[inline] + pub fn respond(self, ttl: Duration) { + let response = dns::build_service_discovery_response(self.query_id, ttl); + self.send_buffers.push(response); + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsServiceDiscovery<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsServiceDiscovery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS response. +pub struct MdnsResponse<'a> { + packet: Packet<'a>, + from: SocketAddr, +} + +impl<'a> MdnsResponse<'a> { + /// Returns the list of peers that have been reported in this packet. + /// + /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. + pub fn discovered_peers<'b>(&'b self) -> impl Iterator> { + let packet = &self.packet; + self.packet.answers.iter().filter_map(move |record| { + if record.name.to_string().as_bytes() != SERVICE_NAME { + return None; + } + + let record_value = match record.data { + RData::PTR(record) => record.0.to_string(), + _ => return None, + }; + + let peer_name = { + let mut iter = record_value.splitn(2, |c| c == '.'); + let name = match iter.next() { + Some(n) => n.to_owned(), + None => return None, + }; + if iter.next().map(|v| v.as_bytes()) != Some(SERVICE_NAME) { + return None; + } + name + }; + + let peer_id = match data_encoding::BASE32_DNSCURVE.decode(peer_name.as_bytes()) { + Ok(bytes) => match PeerId::from_bytes(bytes) { + Ok(id) => id, + Err(_) => return None, + }, + Err(_) => return None, + }; + + Some(MdnsPeer { + packet, + record_value, + peer_id, + }) + }) + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsResponse<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsResponse") + .field("from", self.remote_addr()) + .finish() + } +} + +/// A peer discovered by the service. +pub struct MdnsPeer<'a> { + /// The original packet ; will be used to determine the addresses. + packet: &'a Packet<'a>, + /// Cached value of `concat(base32(peer_id), service name)`. + record_value: String, + /// Id of the peer. + peer_id: PeerId, +} + +impl<'a> MdnsPeer<'a> { + /// Returns the id of the peer. + #[inline] + pub fn id(&self) -> &PeerId { + &self.peer_id + } + + /// Returns the list of addresses the peer says it is listening on. + /// + /// Filters out invalid addresses. + pub fn addresses<'b>(&'b self) -> impl Iterator + 'b { + let my_peer_id = &self.peer_id; + let record_value = &self.record_value; + self.packet + .additional + .iter() + .filter_map(move |add_record| { + if &add_record.name.to_string() != record_value { + return None; + } + + if let RData::TXT(ref txt) = add_record.data { + Some(txt) + } else { + None + } + }) + .flat_map(|txt| txt.iter()) + .filter_map(move |txt| { + // TODO: wrong, txt can be multiple character strings + let addr = match dns::decode_character_string(txt) { + Ok(a) => a, + Err(_) => return None, + }; + if !addr.starts_with(b"dnsaddr=") { + return None; + } + let addr = match str::from_utf8(&addr[8..]) { + Ok(a) => a, + Err(_) => return None, + }; + let mut addr = match addr.parse::() { + Ok(a) => a, + Err(_) => return None, + }; + match addr.pop() { + Some(Protocol::P2p(ref peer_id)) if peer_id == my_peer_id => (), + _ => return None, + }; + Some(addr) + }) + } +} + +impl<'a> fmt::Debug for MdnsPeer<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsPeer") + .field("peer_id", &self.peer_id) + .finish() + } +} + +#[cfg(test)] +mod tests { + use libp2p_core::PublicKey; + use std::{io, time::Duration}; + use tokio::{self, prelude::*}; + use crate::service::{MdnsPacket, MdnsService}; + + #[test] + fn discover_ourselves() { + let mut service = MdnsService::new().unwrap(); + let peer_id = + PublicKey::Rsa((0..32).map(|_| rand::random::()).collect()).into_peer_id(); + let stream = stream::poll_fn(move || -> Poll, io::Error> { + loop { + let packet = match service.poll() { + Async::Ready(packet) => packet, + Async::NotReady => return Ok(Async::NotReady), + }; + + match packet { + MdnsPacket::Query(query) => { + query.respond(peer_id.clone(), None, Duration::from_secs(120)).unwrap(); + } + MdnsPacket::Response(response) => { + for peer in response.discovered_peers() { + if peer.id() == &peer_id { + return Ok(Async::Ready(None)); + } + } + } + MdnsPacket::ServiceDiscovery(_) => {} + } + } + }); + + tokio::run( + stream + .map_err(|err| panic!("{:?}", err)) + .for_each(|_| Ok(())), + ); + } +}