misc/mdns: Update to futures-preview (#1247)

* misc/mdns/service: Use async std with stack pinned futures

* misc/mdns: Define mdns broadcast address as lazy static

* misc/mdns: Drop future before borrowing their arguments again

* misc/mdns: Send queries on query socket, not socket

* misc/mdns: Use poll_next_unpin on query interval stream

* misc/mdns: Ensure underlying task is woken up on next interval tick

* misc/mdns: Wrap match expression in scope to drop future early

* misc/mdns: Adjust 'discovery_ourselves' test

* misc/mdns: Make query interval fire instantly on first tick

This is an optimization only important for short lived use cases, e.g.
unit tests. Instead of waiting for 20 seconds at first, the query
interval fires right away and thereby the service makes progress
instantly.

* misc/mdns: Adjust MdnsService documentation tests

* misc/mdns: Do not drop UDP socket send and reicv futures

Libp2p-mdns uses the async-std crate for network io. This crate only
offers async send and receive functions. In order to use this in non
async/await functions one needs to keep the future returned by the crate
functions around across `poll` invocations.

The future returned by the crate functions references the io resource.
Thus one has to keep both the io resource as well as the future
referencing it. This results in a self-referencing struct which is not
possible to create with safe Rust.

Instead, by having `MdnsService::next` (former `MdnsService::poll`) take
ownership of `self`, the Rust async magic takes care of the above (See
code comments for more details).

As a (negative) side effect, given that `MdnsService::next` takes
ownership of `self`, there is nothing to bind the lifetime of the
returned `MdnsPacket` to. With no better solution in mind, this patch
makes `MdnsPacket` static, not referencing the `MdnsService` receive
buffer.

* misc/mdns: Fix code comments and remove *if Free* TODO

* misc/mdns: Minor refactorings

* misc/mdns: Build responses in behaviour.rs directly

* misc/mdns: Move response ttl duration to constant

* misc/mdns: Remove optimization todo comment

* misc/mdns: Add query interval test

* misc/mdns: Move packet parsing into MdnPacket impl

* misc/mdns: Don't have receiving packets starve the query interval

When we 'await' on receiving a packet on the udp socket without
receiving a single packet we starve the remaining logic of the mdns
service, in this case the logic triggered on the receive interval.

* misc/mdns: Add debug_assert to MaybeBusyMdnsService check

* misc/mdns: Implement Debug for MaybeBusyMdnsService

* misc/mdns: Make ownership note a normal comment, not a doc comment

* misc/mdns: Have discovered_peers return an iterator
This commit is contained in:
Max Inden
2019-11-20 13:25:12 +01:00
committed by Pierre Krieger
parent 02c5f34fc0
commit a26620bf39
3 changed files with 416 additions and 271 deletions

View File

@@ -13,7 +13,9 @@ categories = ["network-programming", "asynchronous"]
async-std = "0.99"
data-encoding = "2.0"
dns-parser = "0.8"
futures-preview = "0.3.0-alpha.18"
either = "1.5.3"
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await"] }
lazy_static = "1.2"
libp2p-core = { version = "0.13.0", path = "../../core" }
libp2p-swarm = { version = "0.3.0", path = "../../swarm" }
log = "0.4"
@@ -21,5 +23,8 @@ multiaddr = { package = "parity-multiaddr", version = "0.5.0", path = "../multia
net2 = "0.2"
rand = "0.6"
smallvec = "0.6"
wasm-timer = "0.2"
void = "1.0"
wasm-timer = "0.2"
[dev-dependencies]
get_if_addrs = "0.5.3"

View File

@@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::service::{MdnsService, MdnsPacket};
use crate::service::{MdnsService, MdnsPacket, build_query_response, build_service_discovery_response};
use futures::prelude::*;
use libp2p_core::{address_translation, ConnectedPoint, Multiaddr, PeerId, multiaddr::Protocol};
use libp2p_swarm::{
@@ -30,14 +30,16 @@ use libp2p_swarm::{
};
use log::warn;
use smallvec::SmallVec;
use std::{cmp, fmt, io, iter, marker::PhantomData, pin::Pin, time::Duration, task::Context, task::Poll};
use std::{cmp, fmt, io, iter, marker::PhantomData, mem, pin::Pin, time::Duration, task::Context, task::Poll};
use wasm_timer::{Delay, Instant};
const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60);
/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
pub struct Mdns<TSubstream> {
/// The inner service.
service: MdnsService,
service: MaybeBusyMdnsService,
/// List of nodes that we have discovered, the address, and when their TTL expires.
///
@@ -45,7 +47,7 @@ pub struct Mdns<TSubstream> {
/// can appear multiple times.
discovered_nodes: SmallVec<[(PeerId, Multiaddr, Instant); 8]>,
/// Future that fires when the TTL at least one node in `discovered_nodes` expires.
/// Future that fires when the TTL of at least one node in `discovered_nodes` expires.
///
/// `None` if `discovered_nodes` is empty.
closest_expiration: Option<Delay>,
@@ -54,11 +56,41 @@ pub struct Mdns<TSubstream> {
marker: PhantomData<TSubstream>,
}
/// `MdnsService::next` takes ownership of `self`, returning a future that resolves with both itself
/// and a `MdnsPacket` (similar to the old Tokio socket send style). The two states are thus `Free`
/// with an `MdnsService` or `Busy` with a future returning the original `MdnsService` and an
/// `MdnsPacket`.
enum MaybeBusyMdnsService {
Free(MdnsService),
Busy(Pin<Box<dyn Future<Output = (MdnsService, MdnsPacket)> + Send>>),
Poisoned,
}
impl fmt::Debug for MaybeBusyMdnsService {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MaybeBusyMdnsService::Free(service) => {
fmt.debug_struct("MaybeBusyMdnsService::Free")
.field("service", service)
.finish()
},
MaybeBusyMdnsService::Busy(_) => {
fmt.debug_struct("MaybeBusyMdnsService::Busy")
.finish()
}
MaybeBusyMdnsService::Poisoned => {
fmt.debug_struct("MaybeBusyMdnsService::Poisoned")
.finish()
}
}
}
}
impl<TSubstream> Mdns<TSubstream> {
/// Builds a new `Mdns` behaviour.
pub async fn new() -> io::Result<Mdns<TSubstream>> {
Ok(Mdns {
service: MdnsService::new().await?,
service: MaybeBusyMdnsService::Free(MdnsService::new().await?),
discovered_nodes: SmallVec::new(),
closest_expiration: None,
marker: PhantomData,
@@ -80,7 +112,7 @@ pub enum MdnsEvent {
/// The given combinations of `PeerId` and `Multiaddr` have expired.
///
/// Each discovered record has a time-to-live. When this TTL expires and the address hasn't
/// been refreshed, we remove it from the list emit it as an `Expired` event.
/// been refreshed, we remove it from the list and emit it as an `Expired` event.
Expired(ExpiredAddrsIter),
}
@@ -210,18 +242,40 @@ where
// Polling the mDNS service, and obtain the list of nodes discovered this round.
let discovered = loop {
let event = match self.service.poll(cx) {
Poll::Ready(ev) => ev,
Poll::Pending => return Poll::Pending,
let service = mem::replace(&mut self.service, MaybeBusyMdnsService::Poisoned);
let packet = match service {
MaybeBusyMdnsService::Free(service) => {
self.service = MaybeBusyMdnsService::Busy(Box::pin(service.next()));
continue;
},
MaybeBusyMdnsService::Busy(mut fut) => {
match fut.as_mut().poll(cx) {
Poll::Ready((service, packet)) => {
self.service = MaybeBusyMdnsService::Free(service);
packet
},
Poll::Pending => {
self.service = MaybeBusyMdnsService::Busy(fut);
return Poll::Pending;
}
}
},
MaybeBusyMdnsService::Poisoned => panic!("Mdns poisoned"),
};
match event {
match packet {
MdnsPacket::Query(query) => {
let _ = query.respond(
params.local_peer_id().clone(),
params.listened_addresses(),
Duration::from_secs(5 * 60)
);
// MaybeBusyMdnsService should always be Free.
if let MaybeBusyMdnsService::Free(ref mut service) = self.service {
let resp = build_query_response(
query.query_id(),
params.local_peer_id().clone(),
params.listened_addresses().into_iter(),
MDNS_RESPONSE_TTL,
);
service.enqueue_response(resp.unwrap());
} else { debug_assert!(false); }
},
MdnsPacket::Response(response) => {
// We replace the IP address with the address we observe the
@@ -240,12 +294,12 @@ where
let new_expiration = Instant::now() + peer.ttl();
let mut addrs = Vec::new();
let mut addrs: Vec<Multiaddr> = Vec::new();
for addr in peer.addresses() {
if let Some(new_addr) = address_translation(&addr, &observed) {
addrs.push(new_addr)
addrs.push(new_addr.clone())
}
addrs.push(addr)
addrs.push(addr.clone())
}
for addr in addrs {
@@ -264,17 +318,26 @@ where
break discovered;
},
MdnsPacket::ServiceDiscovery(disc) => {
disc.respond(Duration::from_secs(5 * 60));
// MaybeBusyMdnsService should always be Free.
if let MaybeBusyMdnsService::Free(ref mut service) = self.service {
let resp = build_service_discovery_response(
disc.query_id(),
MDNS_RESPONSE_TTL,
);
service.enqueue_response(resp);
} else { debug_assert!(false); }
},
}
};
// As the final step, we need to refresh `closest_expiration`.
// Getting this far implies that we discovered new nodes. As the final step, we need to
// refresh `closest_expiration`.
self.closest_expiration = self.discovered_nodes.iter()
.fold(None, |exp, &(_, _, elem_exp)| {
Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp))
})
.map(Delay::new_at);
Poll::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(DiscoveredAddrsIter {
inner: discovered.into_iter(),
})))
@@ -288,4 +351,3 @@ impl<TSubstream> fmt::Debug for Mdns<TSubstream> {
.finish()
}
}

View File

@@ -21,13 +21,22 @@
use crate::{SERVICE_NAME, META_QUERY_SERVICE, dns};
use async_std::net::UdpSocket;
use dns_parser::{Packet, RData};
use futures::prelude::*;
use either::Either::{Left, Right};
use futures::{future, prelude::*};
use libp2p_core::{Multiaddr, PeerId};
use multiaddr::Protocol;
use std::{fmt, io, net::Ipv4Addr, net::SocketAddr, pin::Pin, str, task::Context, task::Poll, time::Duration};
use std::{fmt, io, net::Ipv4Addr, net::SocketAddr, str, time::{Duration, Instant}};
use wasm_timer::Interval;
use lazy_static::lazy_static;
pub use dns::MdnsResponseError;
pub use dns::{MdnsResponseError, build_query_response, build_service_discovery_response};
lazy_static! {
static ref IPV4_MDNS_MULTICAST_ADDRESS: SocketAddr = SocketAddr::from((
Ipv4Addr::new(224, 0, 0, 251),
5353,
));
}
/// A running service that discovers libp2p peers and responds to other libp2p peers' queries on
/// the local network.
@@ -52,43 +61,47 @@ pub use dns::MdnsResponseError;
///
/// ```rust
/// # use futures::prelude::*;
/// # use libp2p_core::{identity, PeerId};
/// # use libp2p_mdns::service::{MdnsService, MdnsPacket};
/// # use std::{io, time::Duration};
/// # use futures::executor::block_on;
/// # use libp2p_core::{identity, Multiaddr, PeerId};
/// # use libp2p_mdns::service::{MdnsService, MdnsPacket, build_query_response, build_service_discovery_response};
/// # use std::{io, time::Duration, task::Poll};
/// # fn main() {
/// # let my_peer_id = PeerId::from(identity::Keypair::generate_ed25519().public());
/// # let my_listened_addrs = Vec::new();
/// let mut service = MdnsService::new().expect("Error while creating mDNS service");
/// let _future_to_poll = futures::stream::poll_fn(move || -> Poll<Option<()>, io::Error> {
/// loop {
/// let packet = match service.poll() {
/// Poll::Ready(packet) => packet,
/// Poll::Pending => return Poll::Pending,
/// };
/// # let my_listened_addrs: Vec<Multiaddr> = vec![];
/// # block_on(async {
/// let mut service = MdnsService::new().await.expect("Error while creating mDNS service");
/// let _future_to_poll = async {
/// let (mut service, packet) = service.next().await;
///
/// match packet {
/// MdnsPacket::Query(query) => {
/// println!("Query from {:?}", query.remote_addr());
/// query.respond(
/// my_peer_id.clone(),
/// my_listened_addrs.clone(),
/// Duration::from_secs(120),
/// );
/// }
/// MdnsPacket::Response(response) => {
/// for peer in response.discovered_peers() {
/// println!("Discovered peer {:?}", peer.id());
/// for addr in peer.addresses() {
/// println!("Address = {:?}", addr);
/// }
/// match packet {
/// MdnsPacket::Query(query) => {
/// println!("Query from {:?}", query.remote_addr());
/// let resp = build_query_response(
/// query.query_id(),
/// my_peer_id.clone(),
/// vec![].into_iter(),
/// Duration::from_secs(120),
/// ).unwrap();
/// service.enqueue_response(resp);
/// }
/// MdnsPacket::Response(response) => {
/// for peer in response.discovered_peers() {
/// println!("Discovered peer {:?}", peer.id());
/// for addr in peer.addresses() {
/// println!("Address = {:?}", addr);
/// }
/// }
/// MdnsPacket::ServiceDiscovery(query) => {
/// query.respond(std::time::Duration::from_secs(120));
/// }
/// }
/// MdnsPacket::ServiceDiscovery(disc) => {
/// let resp = build_service_discovery_response(
/// disc.query_id(),
/// Duration::from_secs(120),
/// );
/// service.enqueue_response(resp);
/// }
/// }
/// }).for_each(|_| Ok(()));
/// };
/// # })
/// # }
pub struct MdnsService {
/// Main socket for listening.
@@ -142,12 +155,12 @@ impl MdnsService {
socket.set_multicast_loop_v4(true)?;
socket.set_multicast_ttl_v4(255)?;
// TODO: correct interfaces?
socket.join_multicast_v4(&From::from([224, 0, 0, 251]), &Ipv4Addr::UNSPECIFIED)?;
socket.join_multicast_v4(From::from([224, 0, 0, 251]), Ipv4Addr::UNSPECIFIED)?;
Ok(MdnsService {
socket,
query_socket: UdpSocket::bind((Ipv4Addr::from([0u8, 0, 0, 0]), 0u16)).await?,
query_interval: Interval::new(Duration::from_secs(20)),
query_interval: Interval::new_at(Instant::now(), Duration::from_secs(20)),
silent,
recv_buffer: [0; 2048],
send_buffers: Vec::new(),
@@ -155,116 +168,102 @@ impl MdnsService {
})
}
pub async fn next_packet(&mut self) -> MdnsPacket {
// TODO: refactor this block
// Send a query every time `query_interval` fires.
// Note that we don't use a loop here—it is pretty unlikely that we need it, and there is
// no point in sending multiple requests in a row.
match Stream::poll_next(Pin::new(&mut self.query_interval), cx) {
Poll::Ready(_) => {
if !self.silent {
let query = dns::build_query();
self.query_send_buffers.push(query.to_vec());
}
}
Poll::Pending => (),
};
pub fn enqueue_response(&mut self, rsp: Vec<u8>) {
self.send_buffers.push(rsp);
}
// Flush the send buffer of the main socket.
while !self.send_buffers.is_empty() {
let to_send = self.send_buffers.remove(0);
match self.socket.send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))).await {
Ok(bytes_written) => {
debug_assert_eq!(bytes_written, to_send.len());
}
Err(_) => {
// Errors are non-fatal because they can happen for example if we lose
// connection to the network.
self.send_buffers.clear();
break;
}
}
}
/// Returns a future resolving to itself and the next received `MdnsPacket`.
//
// **Note**: Why does `next` take ownership of itself?
//
// `MdnsService::next` needs to be called from within `NetworkBehaviour`
// implementations. Given that traits can not have async methods the
// respective `NetworkBehaviour` implementation needs to somehow keep the
// Future returned by `MdnsService::next` across classic `poll`
// invocations. The instance method `next` can either take a reference or
// ownership of itself:
//
// 1. Taking a reference - If `MdnsService::poll` takes a reference to
// `&self` the respective `NetworkBehaviour` implementation would need to
// keep both the Future as well as its `MdnsService` instance across poll
// invocations. Given that in this case the Future would have a reference
// to `MdnsService`, the `NetworkBehaviour` implementation struct would
// need to be self-referential which is not possible without unsafe code in
// Rust.
//
// 2. Taking ownership - Instead `MdnsService::next` takes ownership of
// self and returns it alongside an `MdnsPacket` once the actual future
// resolves, not forcing self-referential structures on the caller.
pub async fn next(mut self) -> (Self, MdnsPacket) {
loop {
// Flush the send buffer of the main socket.
while !self.send_buffers.is_empty() {
let to_send = self.send_buffers.remove(0);
// Flush the query send buffer.
// This has to be after the push to `query_send_buffers`.
while !self.query_send_buffers.is_empty() {
let to_send = self.query_send_buffers.remove(0);
match self.socket.send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))).await {
Ok(bytes_written) => {
debug_assert_eq!(bytes_written, to_send.len());
}
Err(_) => {
// Errors are non-fatal because they can happen for example if we lose
// connection to the network.
self.query_send_buffers.clear();
break;
}
}
}
// TODO: block needs to be refactored
// Check for any incoming packet.
match AsyncDatagram::poll_recv_from(Pin::new(&mut self.socket), cx, &mut self.recv_buffer) {
Poll::Ready(Ok((len, from))) => {
match Packet::parse(&self.recv_buffer[..len]) {
Ok(packet) => {
if packet.header.query {
if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME)
{
return Poll::Ready(MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
send_buffers: &mut self.send_buffers,
}));
} else if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
return Poll::Ready(MdnsPacket::ServiceDiscovery(
MdnsServiceDiscovery {
from,
query_id: packet.header.id,
send_buffers: &mut self.send_buffers,
},
));
} else {
// Note that ideally we would use a loop instead. However as of the
// writing of this code non-lexical lifetimes haven't been merged
// yet, and I can't manage to write this code without having borrow
// issues.
cx.waker().wake_by_ref();
return Poll::Pending;
}
} else {
return Poll::Ready(MdnsPacket::Response(MdnsResponse {
packet,
from,
}));
}
match self.socket.send_to(&to_send, *IPV4_MDNS_MULTICAST_ADDRESS).await {
Ok(bytes_written) => {
debug_assert_eq!(bytes_written, to_send.len());
}
Err(_) => {
// Ignore errors while parsing the packet. We need to poll again for the
// next packet.
// Note that ideally we would use a loop instead. However as of the writing
// of this code non-lexical lifetimes haven't been merged yet, and I can't
// manage to write this code without having borrow issues.
cx.waker().wake_by_ref();
return Poll::Pending;
// Errors are non-fatal because they can happen for example if we lose
// connection to the network.
self.send_buffers.clear();
break;
}
}
}
Poll::Pending => (),
Poll::Ready(Err(_)) => {
// Error are non-fatal and can happen if we get disconnected from example.
// The query interval will wake up the task at some point so that we can try again.
// Flush the query send buffer.
while !self.query_send_buffers.is_empty() {
let to_send = self.query_send_buffers.remove(0);
match self.query_socket.send_to(&to_send, *IPV4_MDNS_MULTICAST_ADDRESS).await {
Ok(bytes_written) => {
debug_assert_eq!(bytes_written, to_send.len());
}
Err(_) => {
// Errors are non-fatal because they can happen for example if we lose
// connection to the network.
self.query_send_buffers.clear();
break;
}
}
}
};
// Either (left) listen for incoming packets or (right) send query packets whenever the
// query interval fires.
let selected_output = match futures::future::select(
Box::pin(self.socket.recv_from(&mut self.recv_buffer)),
Box::pin(self.query_interval.next()),
).await {
future::Either::Left((recved, _)) => Left(recved),
future::Either::Right(_) => Right(()),
};
match selected_output {
Left(left) => match left {
Ok((len, from)) => {
match MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) {
Some(packet) => return (self, packet),
None => {},
}
},
Err(_) => {
// Error are non-fatal and can happen if we get disconnected from example.
// The query interval will wake up the task at some point so that we can try again.
},
},
Right(_) => {
// Ensure underlying task is woken up on the next interval tick.
while let Some(_) = self.query_interval.next().now_or_never() {};
if !self.silent {
let query = dns::build_query();
self.query_send_buffers.push(query.to_vec());
}
}
};
}
}
}
@@ -278,58 +277,82 @@ impl fmt::Debug for MdnsService {
/// A valid mDNS packet received by the service.
#[derive(Debug)]
pub enum MdnsPacket<'a> {
pub enum MdnsPacket {
/// A query made by a remote.
Query(MdnsQuery<'a>),
Query(MdnsQuery),
/// A response sent by a remote in response to one of our queries.
Response(MdnsResponse<'a>),
Response(MdnsResponse),
/// A request for service discovery.
ServiceDiscovery(MdnsServiceDiscovery<'a>),
ServiceDiscovery(MdnsServiceDiscovery),
}
impl MdnsPacket {
fn new_from_bytes(buf: &[u8], from: SocketAddr) -> Option<MdnsPacket> {
match Packet::parse(buf) {
Ok(packet) => {
if packet.header.query {
if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME)
{
let query = MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
});
return Some(query);
} else if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
let discovery = MdnsPacket::ServiceDiscovery(
MdnsServiceDiscovery {
from,
query_id: packet.header.id,
},
);
return Some(discovery);
} else {
return None;
}
} else {
let resp = MdnsPacket::Response(MdnsResponse::new (
packet,
from,
));
return Some(resp);
}
}
Err(_) => {
return None;
}
}
}
}
/// A received mDNS query.
pub struct MdnsQuery<'a> {
pub struct MdnsQuery {
/// Sender of the address.
from: SocketAddr,
/// Id of the received DNS query. We need to pass this ID back in the results.
query_id: u16,
/// Queue of pending buffers.
send_buffers: &'a mut Vec<Vec<u8>>,
}
impl<'a> MdnsQuery<'a> {
/// Respond to the query.
///
/// Pass the ID of the local peer, and the list of addresses we're listening on.
///
/// If there are more than 2^16-1 addresses, ignores the others.
///
/// > **Note**: Keep in mind that we will also receive this response in an `MdnsResponse`.
#[inline]
pub fn respond<TAddresses>(
self,
peer_id: PeerId,
addresses: TAddresses,
ttl: Duration,
) -> Result<(), MdnsResponseError>
where
TAddresses: IntoIterator<Item = Multiaddr>,
TAddresses::IntoIter: ExactSizeIterator,
{
let response =
dns::build_query_response(self.query_id, peer_id, addresses.into_iter(), ttl)?;
self.send_buffers.push(response);
Ok(())
}
impl MdnsQuery {
/// Source address of the packet.
#[inline]
pub fn remote_addr(&self) -> &SocketAddr {
&self.from
}
/// Query id of the packet.
pub fn query_id(&self) -> u16 {
self.query_id
}
}
impl<'a> fmt::Debug for MdnsQuery<'a> {
impl fmt::Debug for MdnsQuery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MdnsQuery")
.field("from", self.remote_addr())
@@ -339,31 +362,26 @@ impl<'a> fmt::Debug for MdnsQuery<'a> {
}
/// A received mDNS service discovery query.
pub struct MdnsServiceDiscovery<'a> {
pub struct MdnsServiceDiscovery {
/// Sender of the address.
from: SocketAddr,
/// Id of the received DNS query. We need to pass this ID back in the results.
query_id: u16,
/// Queue of pending buffers.
send_buffers: &'a mut Vec<Vec<u8>>,
}
impl<'a> MdnsServiceDiscovery<'a> {
/// Respond to the query.
#[inline]
pub fn respond(self, ttl: Duration) {
let response = dns::build_service_discovery_response(self.query_id, ttl);
self.send_buffers.push(response);
}
impl MdnsServiceDiscovery {
/// Source address of the packet.
#[inline]
pub fn remote_addr(&self) -> &SocketAddr {
&self.from
}
/// Query id of the packet.
pub fn query_id(&self) -> u16 {
self.query_id
}
}
impl<'a> fmt::Debug for MdnsServiceDiscovery<'a> {
impl fmt::Debug for MdnsServiceDiscovery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MdnsServiceDiscovery")
.field("from", self.remote_addr())
@@ -373,18 +391,15 @@ impl<'a> fmt::Debug for MdnsServiceDiscovery<'a> {
}
/// A received mDNS response.
pub struct MdnsResponse<'a> {
packet: Packet<'a>,
pub struct MdnsResponse {
peers: Vec<MdnsPeer>,
from: SocketAddr,
}
impl<'a> MdnsResponse<'a> {
/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
pub fn discovered_peers<'b>(&'b self) -> impl Iterator<Item = MdnsPeer<'b>> {
let packet = &self.packet;
self.packet.answers.iter().filter_map(move |record| {
impl MdnsResponse {
/// Creates a new `MdnsResponse` based on the provided `Packet`.
fn new(packet: Packet, from: SocketAddr) -> MdnsResponse {
let peers = packet.answers.iter().filter_map(|record| {
if record.name.to_string().as_bytes() != SERVICE_NAME {
return None;
}
@@ -410,13 +425,25 @@ impl<'a> MdnsResponse<'a> {
Err(_) => return None,
};
Some(MdnsPeer {
packet,
Some(MdnsPeer::new (
&packet,
record_value,
peer_id,
ttl: record.ttl,
})
})
record.ttl,
))
}).collect();
MdnsResponse {
peers,
from,
}
}
/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
pub fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> {
self.peers.iter()
}
/// Source address of the packet.
@@ -426,7 +453,7 @@ impl<'a> MdnsResponse<'a> {
}
}
impl<'a> fmt::Debug for MdnsResponse<'a> {
impl fmt::Debug for MdnsResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MdnsResponse")
.field("from", self.remote_addr())
@@ -435,41 +462,22 @@ impl<'a> fmt::Debug for MdnsResponse<'a> {
}
/// A peer discovered by the service.
pub struct MdnsPeer<'a> {
/// The original packet which will be used to determine the addresses.
packet: &'a Packet<'a>,
/// Cached value of `concat(base32(peer_id), service name)`.
record_value: String,
pub struct MdnsPeer {
addrs: Vec<Multiaddr>,
/// Id of the peer.
peer_id: PeerId,
/// TTL of the record in seconds.
ttl: u32,
}
impl<'a> MdnsPeer<'a> {
/// Returns the id of the peer.
#[inline]
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the requested time-to-live for the record.
#[inline]
pub fn ttl(&self) -> Duration {
Duration::from_secs(u64::from(self.ttl))
}
/// Returns the list of addresses the peer says it is listening on.
///
/// Filters out invalid addresses.
pub fn addresses<'b>(&'b self) -> impl Iterator<Item = Multiaddr> + 'b {
let my_peer_id = &self.peer_id;
let record_value = &self.record_value;
self.packet
impl MdnsPeer {
/// Creates a new `MdnsPeer` based on the provided `Packet`.
pub fn new(packet: &Packet, record_value: String, my_peer_id: PeerId, ttl: u32) -> MdnsPeer {
let addrs = packet
.additional
.iter()
.filter_map(move |add_record| {
if &add_record.name.to_string() != record_value {
.filter_map(|add_record| {
if add_record.name.to_string() != record_value {
return None;
}
@@ -480,7 +488,7 @@ impl<'a> MdnsPeer<'a> {
}
})
.flat_map(|txt| txt.iter())
.filter_map(move |txt| {
.filter_map(|txt| {
// TODO: wrong, txt can be multiple character strings
let addr = match dns::decode_character_string(txt) {
Ok(a) => a,
@@ -498,15 +506,40 @@ impl<'a> MdnsPeer<'a> {
Err(_) => return None,
};
match addr.pop() {
Some(Protocol::P2p(ref peer_id)) if peer_id == my_peer_id => (),
Some(Protocol::P2p(ref peer_id)) if peer_id == &my_peer_id => (),
_ => return None,
};
Some(addr)
})
}).collect();
MdnsPeer {
addrs,
peer_id: my_peer_id.clone(),
ttl,
}
}
/// Returns the id of the peer.
#[inline]
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the requested time-to-live for the record.
#[inline]
pub fn ttl(&self) -> Duration {
Duration::from_secs(u64::from(self.ttl))
}
/// Returns the list of addresses the peer says it is listening on.
///
/// Filters out invalid addresses.
pub fn addresses(&self) -> &Vec<Multiaddr> {
&self.addrs
}
}
impl<'a> fmt::Debug for MdnsPeer<'a> {
impl fmt::Debug for MdnsPeer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MdnsPeer")
.field("peer_id", &self.peer_id)
@@ -516,42 +549,87 @@ impl<'a> fmt::Debug for MdnsPeer<'a> {
#[cfg(test)]
mod tests {
use futures::prelude::*;
use futures::executor::block_on;
use libp2p_core::PeerId;
use std::{io, task::Poll, time::Duration};
use std::{io::{Error, ErrorKind}, time::Duration};
use wasm_timer::ext::TryFutureExt;
use crate::service::{MdnsPacket, MdnsService};
use multiaddr::multihash::*;
fn discover(peer_id: PeerId) {
let mut service = MdnsService::new().unwrap();
let stream = stream::poll_fn(move |cx| -> Poll<Option<Result<(), io::Error>>> {
block_on(async {
let mut service = MdnsService::new().await.unwrap();
loop {
let packet = match service.poll(cx) {
Poll::Ready(packet) => packet,
Poll::Pending => return Poll::Pending,
};
let next = service.next().await;
service = next.0;
match packet {
match next.1 {
MdnsPacket::Query(query) => {
query.respond(peer_id.clone(), None, Duration::from_secs(120)).unwrap();
let resp = crate::dns::build_query_response(
query.query_id(),
peer_id.clone(),
vec![].into_iter(),
Duration::from_secs(120),
).unwrap();
service.enqueue_response(resp);
}
MdnsPacket::Response(response) => {
for peer in response.discovered_peers() {
if peer.id() == &peer_id {
return Poll::Ready(None);
return;
}
}
}
MdnsPacket::ServiceDiscovery(_) => {}
MdnsPacket::ServiceDiscovery(_) => panic!("did not expect a service discovery packet")
}
}
});
})
}
futures::executor::block_on(
stream
.map_err(|err| panic!("{:?}", err))
.for_each(|_| future::ready(())),
);
// As of today the underlying UDP socket is not stubbed out. Thus tests run in parallel to this
// unit tests inter fear with it. Test needs to be run in sequence to ensure test properties.
#[test]
fn respect_query_interval() {
let own_ips: Vec<std::net::IpAddr> = get_if_addrs::get_if_addrs().unwrap()
.into_iter()
.map(|i| i.addr.ip())
.collect();
let fut = async {
let mut service = MdnsService::new().await.unwrap();
let mut sent_queries = vec![];
loop {
let next = service.next().await;
service = next.0;
match next.1 {
MdnsPacket::Query(query) => {
// Ignore queries from other nodes.
let source_ip = query.remote_addr().ip();
if !own_ips.contains(&source_ip) {
continue;
}
sent_queries.push(query);
if sent_queries.len() > 1 {
return Ok(())
}
}
// Ignore response packets. We don't stub out the UDP socket, thus this is
// either random noise from the network, or noise from other unit tests running
// in parallel.
MdnsPacket::Response(_) => {},
MdnsPacket::ServiceDiscovery(_) => {
return Err(Error::new(ErrorKind::Other, "did not expect a service discovery packet"));
},
}
}
};
// TODO: This might be too long for a unit test.
block_on(fut.timeout(Duration::from_secs(41))).unwrap();
}
#[test]