protocols/mdns: Allow users to choose between async-io and tokio runtime (#2748)

Allow users to choose between async-io and tokio runtime
in the mdns protocol implementation. `async-io` is a default
feature, with an additional `tokio` feature.

Fix high CPU usage with Tokio library.
This commit is contained in:
Yolier Galan Tasse 2022-09-01 23:53:38 -04:00 committed by GitHub
parent 36a2773861
commit 89f898c69f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 593 additions and 133 deletions

View File

@ -19,7 +19,7 @@ default = [
"identify", "identify",
"kad", "kad",
"gossipsub", "gossipsub",
"mdns", "mdns-async-io",
"mplex", "mplex",
"noise", "noise",
"ping", "ping",
@ -46,7 +46,8 @@ identify = ["dep:libp2p-identify", "libp2p-metrics?/identify"]
kad = ["dep:libp2p-kad", "libp2p-metrics?/kad"] kad = ["dep:libp2p-kad", "libp2p-metrics?/kad"]
gossipsub = ["dep:libp2p-gossipsub", "libp2p-metrics?/gossipsub"] gossipsub = ["dep:libp2p-gossipsub", "libp2p-metrics?/gossipsub"]
metrics = ["dep:libp2p-metrics"] metrics = ["dep:libp2p-metrics"]
mdns = ["dep:libp2p-mdns"] mdns-async-io = ["dep:libp2p-mdns", "libp2p-mdns?/async-io"]
mdns-tokio = ["dep:libp2p-mdns", "libp2p-mdns?/tokio"]
mplex = ["dep:libp2p-mplex"] mplex = ["dep:libp2p-mplex"]
noise = ["dep:libp2p-noise"] noise = ["dep:libp2p-noise"]
ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"] ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"]
@ -106,7 +107,7 @@ smallvec = "1.6.1"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true } libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false } libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true } libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, default-features = false }
libp2p-tcp = { version = "0.35.0", path = "transports/tcp", default-features = false, optional = true } libp2p-tcp = { version = "0.35.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true } libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true }
@ -160,7 +161,7 @@ required-features = ["floodsub"]
[[example]] [[example]]
name = "chat-tokio" name = "chat-tokio"
required-features = ["tcp-tokio", "mdns"] required-features = ["tcp-tokio", "mdns-tokio"]
[[example]] [[example]]
name = "file-sharing" name = "file-sharing"

View File

@ -25,7 +25,7 @@
//! The example is run per node as follows: //! The example is run per node as follows:
//! //!
//! ```sh //! ```sh
//! cargo run --example chat-tokio --features="tcp-tokio mdns" //! cargo run --example chat-tokio --features="tcp-tokio mdns-tokio"
//! ``` //! ```
//! //!
//! Alternatively, to run with the minimal set of features and crates: //! Alternatively, to run with the minimal set of features and crates:
@ -33,7 +33,7 @@
//! ```sh //! ```sh
//!cargo run --example chat-tokio \\ //!cargo run --example chat-tokio \\
//! --no-default-features \\ //! --no-default-features \\
//! --features="floodsub mplex noise tcp-tokio mdns" //! --features="floodsub mplex noise tcp-tokio mdns-tokio"
//! ``` //! ```
use futures::StreamExt; use futures::StreamExt;
@ -41,7 +41,11 @@ use libp2p::{
core::upgrade, core::upgrade,
floodsub::{self, Floodsub, FloodsubEvent}, floodsub::{self, Floodsub, FloodsubEvent},
identity, identity,
mdns::{Mdns, MdnsEvent}, mdns::{
MdnsEvent,
// `TokioMdns` is available through the `mdns-tokio` feature.
TokioMdns,
},
mplex, mplex,
noise, noise,
swarm::{SwarmBuilder, SwarmEvent}, swarm::{SwarmBuilder, SwarmEvent},
@ -88,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[behaviour(out_event = "MyBehaviourEvent")] #[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour { struct MyBehaviour {
floodsub: Floodsub, floodsub: Floodsub,
mdns: Mdns, mdns: TokioMdns,
} }
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
@ -111,7 +115,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Swarm to manage peers and events. // Create a Swarm to manage peers and events.
let mut swarm = { let mut swarm = {
let mdns = Mdns::new(Default::default()).await?; let mdns = TokioMdns::new(Default::default()).await?;
let mut behaviour = MyBehaviour { let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id), floodsub: Floodsub::new(peer_id),
mdns, mdns,

View File

@ -2,6 +2,14 @@
- Update to `libp2p-swarm` `v0.39.0`. - Update to `libp2p-swarm` `v0.39.0`.
- Allow users to choose between async-io and tokio runtime
in the mdns protocol implementation. `async-io` is a default
feature, with an additional `tokio` feature (see [PR 2748])
- Fix high CPU usage with Tokio library (see [PR 2748]).
[PR 2748]: https://github.com/libp2p/rust-libp2p/pull/2748
# 0.39.0 # 0.39.0
- Update to `libp2p-swarm` `v0.38.0`. - Update to `libp2p-swarm` `v0.38.0`.

View File

@ -11,7 +11,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
async-io = "1.3.1"
data-encoding = "2.3.2" data-encoding = "2.3.2"
dns-parser = "0.8.0" dns-parser = "0.8.0"
futures = "0.3.13" futures = "0.3.13"
@ -25,8 +24,26 @@ smallvec = "1.6.1"
socket2 = { version = "0.4.0", features = ["all"] } socket2 = { version = "0.4.0", features = ["all"] }
void = "1.0.2" void = "1.0.2"
async-io = { version = "1.3.1", optional = true }
tokio = { version = "1.19", default-features = false, features = ["net", "time"], optional = true}
[features]
default = ["async-io"]
tokio = ["dep:tokio"]
async-io = ["dep:async-io"]
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.9.0", features = ["attributes"] } async-std = { version = "1.9.0", features = ["attributes"] }
env_logger = "0.9.0" env_logger = "0.9.0"
libp2p = { path = "../..", default-features = false, features = ["mdns", "tcp-async-io", "dns-async-std", "websocket", "noise", "mplex", "yamux"] } libp2p = { path = "../..", default-features = false, features = ["mdns-async-io", "tcp-async-io", "dns-async-std", "tcp-tokio", "dns-tokio", "websocket", "noise", "mplex", "yamux"] }
tokio = { version = "1.15", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] }
[[test]]
name = "use-async-std"
required-features = ["async-io"]
[[test]]
name = "use-tokio"
required-features = ["tokio"]

View File

@ -19,11 +19,14 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
mod iface; mod iface;
mod socket;
mod timer;
use self::iface::InterfaceState; use self::iface::InterfaceState;
use crate::behaviour::{socket::AsyncSocket, timer::Builder};
use crate::MdnsConfig; use crate::MdnsConfig;
use async_io::Timer;
use futures::prelude::*; use futures::prelude::*;
use futures::Stream;
use if_watch::{IfEvent, IfWatcher}; use if_watch::{IfEvent, IfWatcher};
use libp2p_core::transport::ListenerId; use libp2p_core::transport::ListenerId;
use libp2p_core::{Multiaddr, PeerId}; use libp2p_core::{Multiaddr, PeerId};
@ -35,10 +38,24 @@ use smallvec::SmallVec;
use std::collections::hash_map::{Entry, HashMap}; use std::collections::hash_map::{Entry, HashMap};
use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant};
#[cfg(feature = "async-io")]
use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer};
/// The type of a [`GenMdns`] using the `async-io` implementation.
#[cfg(feature = "async-io")]
pub type Mdns = GenMdns<AsyncUdpSocket, AsyncTimer>;
#[cfg(feature = "tokio")]
use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer};
/// The type of a [`GenMdns`] using the `tokio` implementation.
#[cfg(feature = "tokio")]
pub type TokioMdns = GenMdns<TokioUdpSocket, TokioTimer>;
/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology. /// them to the topology.
#[derive(Debug)] #[derive(Debug)]
pub struct Mdns { pub struct GenMdns<S, T> {
/// InterfaceState config. /// InterfaceState config.
config: MdnsConfig, config: MdnsConfig,
@ -46,7 +63,7 @@ pub struct Mdns {
if_watch: IfWatcher, if_watch: IfWatcher,
/// Mdns interface states. /// Mdns interface states.
iface_states: HashMap<IpAddr, InterfaceState>, iface_states: HashMap<IpAddr, InterfaceState<S, T>>,
/// List of nodes that we have discovered, the address, and when their TTL expires. /// List of nodes that we have discovered, the address, and when their TTL expires.
/// ///
@ -57,10 +74,13 @@ pub struct Mdns {
/// Future that fires when the TTL of at least one node in `discovered_nodes` expires. /// Future that fires when the TTL of at least one node in `discovered_nodes` expires.
/// ///
/// `None` if `discovered_nodes` is empty. /// `None` if `discovered_nodes` is empty.
closest_expiration: Option<Timer>, closest_expiration: Option<T>,
} }
impl Mdns { impl<S, T> GenMdns<S, T>
where
T: Builder,
{
/// Builds a new `Mdns` behaviour. /// Builds a new `Mdns` behaviour.
pub async fn new(config: MdnsConfig) -> io::Result<Self> { pub async fn new(config: MdnsConfig) -> io::Result<Self> {
let if_watch = if_watch::IfWatcher::new().await?; let if_watch = if_watch::IfWatcher::new().await?;
@ -91,11 +111,15 @@ impl Mdns {
*expires = now; *expires = now;
} }
} }
self.closest_expiration = Some(Timer::at(now)); self.closest_expiration = Some(T::at(now));
} }
} }
impl NetworkBehaviour for Mdns { impl<S, T> NetworkBehaviour for GenMdns<S, T>
where
T: Builder + Stream,
S: AsyncSocket,
{
type ConnectionHandler = DummyConnectionHandler; type ConnectionHandler = DummyConnectionHandler;
type OutEvent = MdnsEvent; type OutEvent = MdnsEvent;
@ -219,8 +243,9 @@ impl NetworkBehaviour for Mdns {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
} }
if let Some(closest_expiration) = closest_expiration { if let Some(closest_expiration) = closest_expiration {
let mut timer = Timer::at(closest_expiration); let mut timer = T::at(closest_expiration);
let _ = Pin::new(&mut timer).poll(cx); let _ = Pin::new(&mut timer).poll_next(cx);
self.closest_expiration = Some(timer); self.closest_expiration = Some(timer);
} }
Poll::Pending Poll::Pending

View File

@ -23,9 +23,8 @@ mod query;
use self::dns::{build_query, build_query_response, build_service_discovery_response}; use self::dns::{build_query, build_query_response, build_service_discovery_response};
use self::query::MdnsPacket; use self::query::MdnsPacket;
use crate::behaviour::{socket::AsyncSocket, timer::Builder};
use crate::MdnsConfig; use crate::MdnsConfig;
use async_io::{Async, Timer};
use futures::prelude::*;
use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId};
use libp2p_swarm::PollParameters; use libp2p_swarm::PollParameters;
use socket2::{Domain, Socket, Type}; use socket2::{Domain, Socket, Type};
@ -34,20 +33,20 @@ use std::{
io, iter, io, iter,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
pin::Pin, pin::Pin,
task::Context, task::{Context, Poll},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
/// An mDNS instance for a networking interface. To discover all peers when having multiple /// An mDNS instance for a networking interface. To discover all peers when having multiple
/// interfaces an [`InterfaceState`] is required for each interface. /// interfaces an [`InterfaceState`] is required for each interface.
#[derive(Debug)] #[derive(Debug)]
pub struct InterfaceState { pub struct InterfaceState<U, T> {
/// Address this instance is bound to. /// Address this instance is bound to.
addr: IpAddr, addr: IpAddr,
/// Receive socket. /// Receive socket.
recv_socket: Async<UdpSocket>, recv_socket: U,
/// Send socket. /// Send socket.
send_socket: Async<UdpSocket>, send_socket: U,
/// Buffer used for receiving data from the main socket. /// Buffer used for receiving data from the main socket.
/// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000
/// bytes, if it can be ensured that all participating devices can handle such large packets. /// bytes, if it can be ensured that all participating devices can handle such large packets.
@ -60,7 +59,7 @@ pub struct InterfaceState {
/// Discovery interval. /// Discovery interval.
query_interval: Duration, query_interval: Duration,
/// Discovery timer. /// Discovery timer.
timeout: Timer, timeout: T,
/// Multicast address. /// Multicast address.
multicast_addr: IpAddr, multicast_addr: IpAddr,
/// Discovered addresses. /// Discovered addresses.
@ -69,7 +68,11 @@ pub struct InterfaceState {
ttl: Duration, ttl: Duration,
} }
impl InterfaceState { impl<U, T> InterfaceState<U, T>
where
U: AsyncSocket,
T: Builder + futures::Stream,
{
/// Builds a new [`InterfaceState`]. /// Builds a new [`InterfaceState`].
pub fn new(addr: IpAddr, config: MdnsConfig) -> io::Result<Self> { pub fn new(addr: IpAddr, config: MdnsConfig) -> io::Result<Self> {
log::info!("creating instance on iface {}", addr); log::info!("creating instance on iface {}", addr);
@ -83,7 +86,7 @@ impl InterfaceState {
socket.set_multicast_loop_v4(true)?; socket.set_multicast_loop_v4(true)?;
socket.set_multicast_ttl_v4(255)?; socket.set_multicast_ttl_v4(255)?;
socket.join_multicast_v4(&*crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?; socket.join_multicast_v4(&*crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?;
Async::new(UdpSocket::from(socket))? U::from_std(UdpSocket::from(socket))?
} }
IpAddr::V6(_) => { IpAddr::V6(_) => {
let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?; let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?;
@ -94,7 +97,7 @@ impl InterfaceState {
socket.set_multicast_loop_v6(true)?; socket.set_multicast_loop_v6(true)?;
// TODO: find interface matching addr. // TODO: find interface matching addr.
socket.join_multicast_v6(&*crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?; socket.join_multicast_v6(&*crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?;
Async::new(UdpSocket::from(socket))? U::from_std(UdpSocket::from(socket))?
} }
}; };
let bind_addr = match addr { let bind_addr = match addr {
@ -107,7 +110,8 @@ impl InterfaceState {
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0) SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
} }
}; };
let send_socket = Async::new(UdpSocket::bind(bind_addr)?)?; let send_socket = U::from_std(UdpSocket::bind(bind_addr)?)?;
// randomize timer to prevent all converging and firing at the same time. // randomize timer to prevent all converging and firing at the same time.
let query_interval = { let query_interval = {
use rand::Rng; use rand::Rng;
@ -127,19 +131,18 @@ impl InterfaceState {
send_buffer: Default::default(), send_buffer: Default::default(),
discovered: Default::default(), discovered: Default::default(),
query_interval, query_interval,
timeout: Timer::interval_at(Instant::now(), query_interval), timeout: T::interval_at(Instant::now(), query_interval),
multicast_addr, multicast_addr,
ttl: config.ttl, ttl: config.ttl,
}) })
} }
pub fn reset_timer(&mut self) { pub fn reset_timer(&mut self) {
self.timeout.set_interval(self.query_interval); self.timeout = T::interval(self.query_interval);
} }
pub fn fire_timer(&mut self) { pub fn fire_timer(&mut self) {
self.timeout self.timeout = T::interval_at(Instant::now(), self.query_interval);
.set_interval_at(Instant::now(), self.query_interval);
} }
fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) { fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
@ -171,17 +174,17 @@ impl InterfaceState {
let new_expiration = Instant::now() + peer.ttl(); let new_expiration = Instant::now() + peer.ttl();
let mut addrs: Vec<Multiaddr> = Vec::new();
for addr in peer.addresses() { for addr in peer.addresses() {
if let Some(new_addr) = address_translation(addr, &observed) { if let Some(new_addr) = address_translation(addr, &observed) {
addrs.push(new_addr.clone()) self.discovered.push_back((
*peer.id(),
new_addr.clone(),
new_expiration,
));
} }
addrs.push(addr.clone())
}
for addr in addrs {
self.discovered self.discovered
.push_back((*peer.id(), addr, new_expiration)); .push_back((*peer.id(), addr.clone(), new_expiration));
} }
} }
} }
@ -198,43 +201,49 @@ impl InterfaceState {
params: &impl PollParameters, params: &impl PollParameters,
) -> Option<(PeerId, Multiaddr, Instant)> { ) -> Option<(PeerId, Multiaddr, Instant)> {
// Poll receive socket. // Poll receive socket.
while self.recv_socket.poll_readable(cx).is_ready() { while let Poll::Ready(data) =
match self Pin::new(&mut self.recv_socket).poll_read(cx, &mut self.recv_buffer)
.recv_socket {
.recv_from(&mut self.recv_buffer) match data {
.now_or_never() Ok((len, from)) => {
{
Some(Ok((len, from))) => {
if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)
{ {
self.inject_mdns_packet(packet, params); self.inject_mdns_packet(packet, params);
} }
} }
Some(Err(err)) => log::error!("Failed reading datagram: {}", err), Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
None => {} // No more bytes available on the socket to read
} break;
} }
// Send responses. Err(err) => {
while self.send_socket.poll_writable(cx).is_ready() { log::error!("failed reading datagram: {}", err);
if let Some(packet) = self.send_buffer.pop_front() {
match self
.send_socket
.send_to(&packet, SocketAddr::new(self.multicast_addr, 5353))
.now_or_never()
{
Some(Ok(_)) => log::trace!("sent packet on iface {}", self.addr),
Some(Err(err)) => {
log::error!("error sending packet on iface {}: {}", self.addr, err)
}
None => self.send_buffer.push_front(packet),
} }
} else if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
} else {
break;
} }
} }
// Send responses.
while let Some(packet) = self.send_buffer.pop_front() {
match Pin::new(&mut self.send_socket).poll_write(
cx,
&packet,
SocketAddr::new(self.multicast_addr, 5353),
) {
Poll::Ready(Ok(_)) => log::trace!("sent packet on iface {}", self.addr),
Poll::Ready(Err(err)) => {
log::error!("error sending packet on iface {} {}", self.addr, err);
}
Poll::Pending => {
self.send_buffer.push_front(packet);
break;
}
}
}
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
}
// Emit discovered event. // Emit discovered event.
self.discovered.pop_front() self.discovered.pop_front()
} }

View File

@ -0,0 +1,134 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::{
io::Error,
marker::Unpin,
net::{SocketAddr, UdpSocket},
task::{Context, Poll},
};
/// Interface that must be implemented by the different runtimes to use the [`UdpSocket`] in async mode
pub trait AsyncSocket: Unpin + Send + 'static {
/// Create the async socket from the [`std::net::UdpSocket`]
fn from_std(socket: UdpSocket) -> std::io::Result<Self>
where
Self: Sized;
/// Attempts to receive a single packet on the socket from the remote address to which it is connected.
fn poll_read(
&mut self,
_cx: &mut Context,
_buf: &mut [u8],
) -> Poll<Result<(usize, SocketAddr), Error>>;
/// Attempts to send data on the socket to a given address.
fn poll_write(
&mut self,
_cx: &mut Context,
_packet: &[u8],
_to: SocketAddr,
) -> Poll<Result<(), Error>>;
}
#[cfg(feature = "async-io")]
pub mod asio {
use super::*;
use async_io::Async;
use futures::FutureExt;
/// AsyncIo UdpSocket
pub type AsyncUdpSocket = Async<UdpSocket>;
impl AsyncSocket for AsyncUdpSocket {
fn from_std(socket: UdpSocket) -> std::io::Result<Self> {
Async::new(socket)
}
fn poll_read(
&mut self,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<(usize, SocketAddr), Error>> {
// Poll receive socket.
futures::ready!(self.poll_readable(cx))?;
match self.recv_from(buf).now_or_never() {
Some(data) => Poll::Ready(data),
None => Poll::Pending,
}
}
fn poll_write(
&mut self,
cx: &mut Context,
packet: &[u8],
to: SocketAddr,
) -> Poll<Result<(), Error>> {
futures::ready!(self.poll_writable(cx))?;
match self.send_to(packet, to).now_or_never() {
Some(Ok(_)) => Poll::Ready(Ok(())),
Some(Err(err)) => Poll::Ready(Err(err)),
None => Poll::Pending,
}
}
}
}
#[cfg(feature = "tokio")]
pub mod tokio {
use super::*;
use ::tokio::{io::ReadBuf, net::UdpSocket as TkUdpSocket};
/// Tokio ASync Socket`
pub type TokioUdpSocket = TkUdpSocket;
impl AsyncSocket for TokioUdpSocket {
fn from_std(socket: UdpSocket) -> std::io::Result<Self> {
socket.set_nonblocking(true)?;
TokioUdpSocket::from_std(socket)
}
fn poll_read(
&mut self,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<(usize, SocketAddr), Error>> {
let mut rbuf = ReadBuf::new(buf);
match self.poll_recv_from(cx, &mut rbuf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Ready(Ok(addr)) => Poll::Ready(Ok((rbuf.filled().len(), addr))),
}
}
fn poll_write(
&mut self,
cx: &mut Context,
packet: &[u8],
to: SocketAddr,
) -> Poll<Result<(), Error>> {
match self.poll_send_to(cx, packet, to) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Ready(Ok(_len)) => Poll::Ready(Ok(())),
}
}
}
}

View File

@ -0,0 +1,128 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::{
marker::Unpin,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
/// Simple wrapper for the differents type of timers
#[derive(Debug)]
pub struct Timer<T> {
inner: T,
}
/// Builder interface to homogenize the differents implementations
pub trait Builder: Send + Unpin + 'static {
/// Creates a timer that emits an event once at the given time instant.
fn at(instant: Instant) -> Self;
/// Creates a timer that emits events periodically.
fn interval(duration: Duration) -> Self;
/// Creates a timer that emits events periodically, starting at start.
fn interval_at(start: Instant, duration: Duration) -> Self;
}
#[cfg(feature = "async-io")]
pub mod asio {
use super::*;
use async_io::Timer as AsioTimer;
use futures::Stream;
/// Async Timer
pub type AsyncTimer = Timer<AsioTimer>;
impl Builder for AsyncTimer {
fn at(instant: Instant) -> Self {
Self {
inner: AsioTimer::at(instant),
}
}
fn interval(duration: Duration) -> Self {
Self {
inner: AsioTimer::interval(duration),
}
}
fn interval_at(start: Instant, duration: Duration) -> Self {
Self {
inner: AsioTimer::interval_at(start, duration),
}
}
}
impl Stream for AsyncTimer {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
}
#[cfg(feature = "tokio")]
pub mod tokio {
use super::*;
use ::tokio::time::{self, Instant as TokioInstant, Interval, MissedTickBehavior};
use futures::Stream;
/// Tokio wrapper
pub type TokioTimer = Timer<Interval>;
impl Builder for TokioTimer {
fn at(instant: Instant) -> Self {
// Taken from: https://docs.rs/async-io/1.7.0/src/async_io/lib.rs.html#91
let mut inner = time::interval_at(
TokioInstant::from_std(instant),
Duration::new(std::u64::MAX, 1_000_000_000 - 1),
);
inner.set_missed_tick_behavior(MissedTickBehavior::Skip);
Self { inner }
}
fn interval(duration: Duration) -> Self {
let mut inner = time::interval_at(TokioInstant::now() + duration, duration);
inner.set_missed_tick_behavior(MissedTickBehavior::Skip);
Self { inner }
}
fn interval_at(start: Instant, duration: Duration) -> Self {
let mut inner = time::interval_at(TokioInstant::from_std(start), duration);
inner.set_missed_tick_behavior(MissedTickBehavior::Skip);
Self { inner }
}
}
impl Stream for TokioTimer {
type Item = TokioInstant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_tick(cx).map(Some)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(std::usize::MAX, None)
}
}
}

View File

@ -26,16 +26,22 @@
//! //!
//! # Usage //! # Usage
//! //!
//! This crate provides the `Mdns` struct which implements the `NetworkBehaviour` trait. This //! This crate provides a `Mdns` and `TokioMdns`, depending on the enabled features, which
//! struct will automatically discover other libp2p nodes on the local network. //! implements the `NetworkBehaviour` trait. This struct will automatically discover other
//! libp2p nodes on the local network.
//! //!
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{Ipv4Addr, Ipv6Addr};
use std::time::Duration; use std::time::Duration;
mod behaviour; mod behaviour;
pub use crate::behaviour::{GenMdns, MdnsEvent};
pub use crate::behaviour::{Mdns, MdnsEvent}; #[cfg(feature = "async-io")]
pub use crate::behaviour::Mdns;
#[cfg(feature = "tokio")]
pub use crate::behaviour::TokioMdns;
/// The DNS service name for all libp2p peers used to query for addresses. /// The DNS service name for all libp2p peers used to query for addresses.
const SERVICE_NAME: &[u8] = b"_p2p._udp.local"; const SERVICE_NAME: &[u8] = b"_p2p._udp.local";

View File

@ -28,6 +28,35 @@ use libp2p::{
use std::error::Error; use std::error::Error;
use std::time::Duration; use std::time::Duration;
#[async_std::test]
async fn test_discovery_async_std_ipv4() -> Result<(), Box<dyn Error>> {
run_discovery_test(MdnsConfig::default()).await
}
#[async_std::test]
async fn test_discovery_async_std_ipv6() -> Result<(), Box<dyn Error>> {
let config = MdnsConfig {
enable_ipv6: true,
..Default::default()
};
run_discovery_test(config).await
}
#[async_std::test]
async fn test_expired_async_std() -> Result<(), Box<dyn Error>> {
env_logger::try_init().ok();
let config = MdnsConfig {
ttl: Duration::from_secs(1),
query_interval: Duration::from_secs(10),
..Default::default()
};
async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config))
.await
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
async fn create_swarm(config: MdnsConfig) -> Result<Swarm<Mdns>, Box<dyn Error>> { async fn create_swarm(config: MdnsConfig) -> Result<Swarm<Mdns>, Box<dyn Error>> {
let id_keys = identity::Keypair::generate_ed25519(); let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public()); let peer_id = PeerId::from(id_keys.public());
@ -78,34 +107,6 @@ async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box<dyn Error>> {
} }
} }
#[async_std::test]
async fn test_discovery_async_std_ipv4() -> Result<(), Box<dyn Error>> {
run_discovery_test(MdnsConfig::default()).await
}
#[tokio::test]
async fn test_discovery_tokio_ipv4() -> Result<(), Box<dyn Error>> {
run_discovery_test(MdnsConfig::default()).await
}
#[async_std::test]
async fn test_discovery_async_std_ipv6() -> Result<(), Box<dyn Error>> {
let config = MdnsConfig {
enable_ipv6: true,
..Default::default()
};
run_discovery_test(config).await
}
#[tokio::test]
async fn test_discovery_tokio_ipv6() -> Result<(), Box<dyn Error>> {
let config = MdnsConfig {
enable_ipv6: true,
..Default::default()
};
run_discovery_test(config).await
}
async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box<dyn Error>> { async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box<dyn Error>> {
let mut a = create_swarm(config.clone()).await?; let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?; let mut b = create_swarm(config).await?;
@ -136,32 +137,3 @@ async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box<dyn Erro
} }
} }
} }
#[async_std::test]
async fn test_expired_async_std() -> Result<(), Box<dyn Error>> {
env_logger::try_init().ok();
let config = MdnsConfig {
ttl: Duration::from_secs(1),
query_interval: Duration::from_secs(10),
..Default::default()
};
async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config))
.await
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
#[tokio::test]
async fn test_expired_tokio() -> Result<(), Box<dyn Error>> {
env_logger::try_init().ok();
let config = MdnsConfig {
ttl: Duration::from_secs(1),
query_interval: Duration::from_secs(10),
..Default::default()
};
tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config))
.await
.unwrap()
}

View File

@ -0,0 +1,153 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.use futures::StreamExt;
use futures::StreamExt;
use libp2p::{
identity,
mdns::{MdnsConfig, MdnsEvent, TokioMdns},
swarm::{Swarm, SwarmEvent},
PeerId,
};
use std::error::Error;
use std::time::Duration;
#[tokio::test]
async fn test_discovery_tokio_ipv4() -> Result<(), Box<dyn Error>> {
run_discovery_test(MdnsConfig::default()).await
}
#[tokio::test]
async fn test_discovery_tokio_ipv6() -> Result<(), Box<dyn Error>> {
let config = MdnsConfig {
enable_ipv6: true,
..Default::default()
};
run_discovery_test(config).await
}
#[tokio::test]
async fn test_expired_tokio() -> Result<(), Box<dyn Error>> {
env_logger::try_init().ok();
let config = MdnsConfig {
ttl: Duration::from_secs(1),
query_interval: Duration::from_secs(10),
..Default::default()
};
run_peer_expiration_test(config).await
}
async fn create_swarm(config: MdnsConfig) -> Result<Swarm<TokioMdns>, Box<dyn Error>> {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
let transport = libp2p::tokio_development_transport(id_keys)?;
let behaviour = TokioMdns::new(config).await?;
let mut swarm = Swarm::new(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(swarm)
}
async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box<dyn Error>> {
env_logger::try_init().ok();
let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?;
let mut discovered_a = false;
let mut discovered_b = false;
loop {
futures::select! {
ev = a.select_next_some() => match ev {
SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
if discovered_a {
return Ok(());
} else {
discovered_b = true;
}
}
}
}
_ => {}
},
ev = b.select_next_some() => match ev {
SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
if discovered_b {
return Ok(());
} else {
discovered_a = true;
}
}
}
}
_ => {}
}
}
}
}
async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box<dyn Error>> {
let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?;
let expired_at = tokio::time::sleep(Duration::from_secs(15));
tokio::pin!(expired_at);
loop {
tokio::select! {
_ev = &mut expired_at => {
panic!();
},
ev = a.select_next_some() => match ev {
SwarmEvent::Behaviour(MdnsEvent::Expired(peers)) => {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
return Ok(());
}
}
}
SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
expired_at.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(2));
}
}
}
_ => {}
},
ev = b.select_next_some() => match ev {
SwarmEvent::Behaviour(MdnsEvent::Expired(peers)) => {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
return Ok(());
}
}
}
SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
expired_at.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(2));
}
}
}
_ => {}
}
}
}
}

View File

@ -79,8 +79,11 @@ pub use libp2p_identify as identify;
#[cfg_attr(docsrs, doc(cfg(feature = "kad")))] #[cfg_attr(docsrs, doc(cfg(feature = "kad")))]
#[doc(inline)] #[doc(inline)]
pub use libp2p_kad as kad; pub use libp2p_kad as kad;
#[cfg(feature = "mdns")] #[cfg(any(feature = "mdns-async-io", feature = "mdns-tokio"))]
#[cfg_attr(docsrs, doc(cfg(feature = "mdns")))] #[cfg_attr(
docsrs,
doc(cfg(any(feature = "mdns-tokio", feature = "mdns-async-io")))
)]
#[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]
#[doc(inline)] #[doc(inline)]
pub use libp2p_mdns as mdns; pub use libp2p_mdns as mdns;