[libp2p-dns] Use trust-dns-resolver (with either async-std or tokio). Remove thread pool. (#1927)

* [libp2p-dns] Use trust-dns-resolver.

Use the `trust-dns-resolver` library for DNS resolution,
thereby removing current use of the thread pool.

Since `trust-dns-resolver` and related crates already
provide support for both `async-std` and `tokio`, we
make use of that here in our own feature flags.

Since `trust-dns-resolver` provides many useful
configuration options and error detail, central
types of `trust-dns-resolver` like `ResolverConfig`,
`ResolverOpts` and `ResolveError` are re-exposed
in the API of `libp2p-dns`. Full encapsulation
does not seem preferable in this case.

* Cleanup

* Fix two intra-doc links.

* Simplify slightly.

* Incorporate review feedback.

* Remove git dependency and fix example.

* Update version and changelogs.
This commit is contained in:
Roman Borschel
2021-03-16 11:48:48 +01:00
committed by GitHub
parent 9dbc90efe7
commit cd15bc9c62
12 changed files with 334 additions and 211 deletions

View File

@ -44,6 +44,13 @@
## Version 0.36.0 [unreleased]
- Consolidate top-level utility functions for constructing development
transports. There is now just `development_transport()` (available with default features)
and `tokio_development_transport()` (available when the corresponding tokio features are enabled).
Furthermore, these are now `async fn`s. The minor variations that also included `pnet`
support have been removed.
[PR 1927](https://github.com/libp2p/rust-libp2p/pull/1927)
- Update libp2p crates.
- Do not leak default features from libp2p crates.

View File

@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[features]
default = [
"deflate",
"dns",
"dns-async-std",
"floodsub",
"identify",
"kad",
@ -33,7 +33,8 @@ default = [
"yamux",
]
deflate = ["libp2p-deflate"]
dns = ["libp2p-dns"]
dns-async-std = ["libp2p-dns", "libp2p-dns/async-std"]
dns-tokio = ["libp2p-dns", "libp2p-dns/tokio"]
floodsub = ["libp2p-floodsub"]
identify = ["libp2p-identify"]
kad = ["libp2p-kad"]
@ -88,7 +89,7 @@ wasm-timer = "0.2.4"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.27.1", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.27.0", path = "transports/dns", optional = true }
libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.27.1", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true }

View File

@ -63,7 +63,8 @@ use libp2p::{
};
use std::{error::Error, task::{Context, Poll}};
fn main() -> Result<(), Box<dyn Error>> {
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Create a random PeerId
@ -72,7 +73,7 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Local peer id: {:?}", local_peer_id);
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key)?;
let transport = libp2p::development_transport(local_key).await?;
// Create a Floodsub topic
let floodsub_topic = floodsub::Topic::new("chat");

View File

@ -58,14 +58,15 @@ use libp2p::{
NetworkBehaviour,
PeerId,
Swarm,
build_development_transport,
development_transport,
identity,
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::NetworkBehaviourEventProcess
};
use std::{error::Error, task::{Context, Poll}};
fn main() -> Result<(), Box<dyn Error>> {
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Create a random key for ourselves.
@ -73,7 +74,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let local_peer_id = PeerId::from(local_key.public());
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol.
let transport = build_development_transport(local_key)?;
let transport = development_transport(local_key).await?;
// We create a custom network behaviour that combines Kademlia and mDNS.
#[derive(NetworkBehaviour)]

View File

@ -62,7 +62,8 @@ use std::{
task::{Context, Poll},
};
fn main() -> Result<(), Box<dyn Error>> {
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
Builder::from_env(Env::default().default_filter_or("info")).init();
// Create a random PeerId
@ -71,7 +72,7 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Local peer id: {:?}", local_peer_id);
// Set up an encrypted TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key.clone())?;
let transport = libp2p::development_transport(local_key.clone()).await?;
// Create a Gossipsub topic
let topic = Topic::new("test-net");

View File

@ -28,7 +28,7 @@ use libp2p::{
Swarm,
PeerId,
identity,
build_development_transport
development_transport
};
use libp2p::kad::{
Kademlia,
@ -40,7 +40,8 @@ use libp2p::kad::{
use libp2p::kad::record::store::MemoryStore;
use std::{env, error::Error, time::Duration};
fn main() -> Result<(), Box<dyn Error>> {
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Create a random key for ourselves.
@ -48,7 +49,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let local_peer_id = PeerId::from(local_key.public());
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol
let transport = build_development_transport(local_key)?;
let transport = development_transport(local_key).await?;
// Create a swarm to manage peers and events.
let mut swarm = {

View File

@ -31,7 +31,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Local peer id: {:?}", peer_id);
// Create a transport.
let transport = libp2p::build_development_transport(id_keys)?;
let transport = libp2p::development_transport(id_keys).await?;
// Create an MDNS network behaviour.
let behaviour = Mdns::new(MdnsConfig::default()).await?;

View File

@ -43,7 +43,8 @@ use futures::{future, prelude::*};
use libp2p::{identity, PeerId, ping::{Ping, PingConfig}, Swarm};
use std::{error::Error, task::{Context, Poll}};
fn main() -> Result<(), Box<dyn Error>> {
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Create a random PeerId.
@ -52,7 +53,7 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Local peer id: {:?}", peer_id);
// Create a transport.
let transport = libp2p::build_development_transport(id_keys)?;
let transport = libp2p::development_transport(id_keys).await?;
// Create a ping network behaviour.
//

View File

@ -56,7 +56,7 @@
//! the dialing to take place and eventually resolve to a connection. Polling
//! futures is typically done through a [tokio] runtime.
//!
//! The easiest way to create a transport is to use [`build_development_transport`].
//! The easiest way to create a transport is to use [`development_transport`].
//! This function provides support for the most common protocols but it is also
//! subject to change over time and should thus not be used in production
//! configurations.
@ -65,8 +65,8 @@
//!
//! ```rust
//! let keypair = libp2p::identity::Keypair::generate_ed25519();
//! let _transport = libp2p::build_development_transport(keypair);
//! // _transport.dial(...);
//! let _transport = libp2p::development_transport(keypair);
//! // _transport.await?.dial(...);
//! ```
//!
//! The keypair that is passed as an argument in the above example is used
@ -85,7 +85,7 @@
//! Example ([`noise`] + [`yamux`] Protocol Upgrade):
//!
//! ```rust
//! # #[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "noise", feature = "yamux"))] {
//! # #[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp-async-io", feature = "noise", feature = "yamux"))] {
//! use libp2p::{Transport, core::upgrade, tcp::TcpConfig, noise, identity::Keypair, yamux};
//! let tcp = TcpConfig::new();
//! let id_keys = Keypair::generate_ed25519();
@ -130,7 +130,7 @@
//! identity of the remote peer of the established connection, which is
//! usually obtained through a transport encryption protocol such as
//! [`noise`] that authenticates the peer. See the implementation of
//! [`build_development_transport`] for an example.
//! [`development_transport`] for an example.
//! 3. Creating a struct that implements the [`NetworkBehaviour`] trait and combines all the
//! desired network behaviours, implementing the event handlers as per the
//! desired application's networking logic.
@ -154,9 +154,6 @@
#![doc(html_logo_url = "https://libp2p.io/img/logo_small.png")]
#![doc(html_favicon_url = "https://libp2p.io/img/favicon.png")]
#[cfg(feature = "pnet")]
use libp2p_pnet::{PnetConfig, PreSharedKey};
pub use bytes;
pub use futures;
#[doc(inline)]
@ -171,8 +168,8 @@ pub use libp2p_core as core;
#[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]
#[doc(inline)]
pub use libp2p_deflate as deflate;
#[cfg(feature = "dns")]
#[cfg_attr(docsrs, doc(cfg(feature = "dns")))]
#[cfg(any(feature = "dns-async-std", feature = "dns-tokio"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "dns-async-std", feature = "dns-tokio"))))]
#[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]
#[doc(inline)]
pub use libp2p_dns as dns;
@ -268,35 +265,27 @@ pub use self::simple::SimpleProtocol;
pub use self::swarm::Swarm;
pub use self::transport_ext::TransportExt;
/// Builds a `Transport` that supports the most commonly-used protocols that libp2p supports.
/// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p:
///
/// * DNS resolution.
/// * Noise protocol encryption.
/// * Websockets.
/// * Both Yamux and Mplex for substream multiplexing.
///
/// All async I/O of the transport is based on `async-std`.
///
/// > **Note**: This `Transport` is not suitable for production usage, as its implementation
/// > reserves the right to support additional protocols or remove deprecated protocols.
#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))]
#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))]
pub fn build_development_transport(keypair: identity::Keypair)
-> std::io::Result<core::transport::Boxed<(PeerId, core::muxing::StreamMuxerBox)>>
{
build_tcp_ws_noise_mplex_yamux(keypair)
}
/// Builds an implementation of `Transport` that is suitable for usage with the `Swarm`.
///
/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer,
/// and mplex or yamux as the multiplexing layer.
#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))]
#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))]
pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair)
#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp-async-io", feature = "dns-async-std", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))]
#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp-async-io", feature = "dns-async-std", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))]
pub async fn development_transport(keypair: identity::Keypair)
-> std::io::Result<core::transport::Boxed<(PeerId, core::muxing::StreamMuxerBox)>>
{
let transport = {
#[cfg(feature = "tcp-async-io")]
let tcp = tcp::TcpConfig::new().nodelay(true);
#[cfg(feature = "tcp-tokio")]
let tcp = tcp::TokioTcpConfig::new().nodelay(true);
let transport = dns::DnsConfig::new(tcp)?;
let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone))
let transport = dns::DnsConfig::system(tcp).await?;
let websockets = websocket::WsConfig::new(transport.clone());
transport.or_transport(websockets)
};
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
@ -311,23 +300,27 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair)
.boxed())
}
/// Builds an implementation of `Transport` that is suitable for usage with the `Swarm`.
/// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p:
///
/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer,
/// and mplex or yamux as the multiplexing layer.
#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))]
#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))))]
pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreSharedKey)
/// * DNS resolution.
/// * Noise protocol encryption.
/// * Websockets.
/// * Both Yamux and Mplex for substream multiplexing.
///
/// All async I/O of the transport is based on `tokio`.
///
/// > **Note**: This `Transport` is not suitable for production usage, as its implementation
/// > reserves the right to support additional protocols or remove deprecated protocols.
#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp-tokio", feature = "dns-tokio", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))]
#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp-tokio", feature = "dns-tokio", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))]
pub fn tokio_development_transport(keypair: identity::Keypair)
-> std::io::Result<core::transport::Boxed<(PeerId, core::muxing::StreamMuxerBox)>>
{
let transport = {
#[cfg(feature = "tcp-async-io")]
let tcp = tcp::TcpConfig::new().nodelay(true);
#[cfg(feature = "tcp-tokio")]
let tcp = tcp::TokioTcpConfig::new().nodelay(true);
let transport = dns::DnsConfig::new(tcp)?;
let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone))
let transport = dns::TokioDnsConfig::system(tcp)?;
let websockets = websocket::WsConfig::new(transport.clone());
transport.or_transport(websockets)
};
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
@ -335,7 +328,6 @@ pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreS
.expect("Signing libp2p-noise static DH keypair failed.");
Ok(transport
.and_then(move |socket, _| PnetConfig::new(psk).handshake(socket))
.upgrade(core::upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(core::upgrade::SelectUpgrade::new(yamux::YamuxConfig::default(), mplex::MplexConfig::default()))

View File

@ -1,3 +1,10 @@
# 0.28.0 [unreleased]
- Use `trust-dns-resolver`, removing the internal thread pool and
expanding the configurability of `libp2p-dns` by largely exposing the
configuration of `trust-dns-resolver`.
[PR 1927](https://github.com/libp2p/rust-libp2p/pull/1927)
# 0.27.0 [2021-01-12]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-dns"
edition = "2018"
description = "DNS transport implementation for libp2p"
version = "0.27.0"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,3 +13,21 @@ categories = ["network-programming", "asynchronous"]
libp2p-core = { version = "0.27.0", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"
trust-dns-resolver = { version = "0.20", default-features = false, features = ["system-config"] }
async-std-resolver = { version = "0.20", optional = true }
[dev-dependencies]
env_logger = "0.6"
tokio-crate = { package = "tokio", version = "1.0", default-features = false, features = ["rt", "time"] }
async-std-crate = { package = "async-std", version = "1.6" }
[features]
default = ["async-std"]
async-std = ["async-std-resolver"]
tokio = ["trust-dns-resolver/tokio-runtime"]
# The `tokio-` prefix and feature dependency is just to be explicit,
# since these features of `trust-dns-resolver` are currently only
# available for `tokio`.
tokio-dns-over-rustls = ["tokio", "trust-dns-resolver/dns-over-rustls"]
tokio-dns-over-https-rustls = ["tokio", "trust-dns-resolver/dns-over-https-rustls"]

View File

@ -20,85 +20,135 @@
//! # libp2p-dns
//!
//! This crate provides the type `DnsConfig` that allows one to resolve the `/dns4/` and `/dns6/`
//! components of multiaddresses.
//! This crate provides the type [`GenDnsConfig`] with its instantiations
//! [`DnsConfig`] and `TokioDnsConfig` for use with `async-std` and `tokio`,
//! respectively.
//!
//! ## Usage
//! A [`GenDnsConfig`] is a [`Transport`] wrapper that is created around
//! an inner `Transport`. The composed transport behaves like the inner
//! transport, except that [`Transport::dial`] resolves `/dns`, `/dns4/` and
//! `/dns6/` components of a given `Multiaddr` through a DNS.
//!
//! In order to use this crate, create a `DnsConfig` with one of its constructors and pass it an
//! implementation of the `Transport` trait.
//!
//! Whenever we want to dial an address through the `DnsConfig` and that address contains a
//! `/dns/`, `/dns4/`, or `/dns6/` component, a DNS resolve will be performed and the component
//! will be replaced with `/ip4/` and/or `/ip6/` components.
//! The `async-std` feature and hence the `DnsConfig` are
//! enabled by default. Tokio users can furthermore opt-in
//! to the `tokio-dns-over-rustls` and `tokio-dns-over-https-rustls`
//! features. For more information about these features, please
//! refer to the documentation of [trust-dns-resolver].
//!
//![trust-dns-resolver]: https://docs.rs/trust-dns-resolver/latest/trust_dns_resolver/#dns-over-tls-and-dns-over-https
use futures::{prelude::*, channel::oneshot, future::BoxFuture};
use futures::{prelude::*, future::BoxFuture, stream::FuturesOrdered};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{TransportError, ListenerEvent}
};
use log::{error, debug, trace};
use std::{error, fmt, io, net::ToSocketAddrs};
use log::{debug, trace};
use std::{error, fmt, net::IpAddr};
#[cfg(any(feature = "async-std", feature = "tokio"))]
use std::io;
#[cfg(any(feature = "async-std", feature = "tokio"))]
use trust_dns_resolver::system_conf;
use trust_dns_resolver::{
AsyncResolver,
ConnectionProvider,
proto::xfer::dns_handle::DnsHandle,
};
#[cfg(feature = "tokio")]
use trust_dns_resolver::{TokioAsyncResolver, TokioConnection, TokioConnectionProvider};
#[cfg(feature = "async-std")]
use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider};
/// Represents the configuration for a DNS transport capability of libp2p.
///
/// This struct implements the `Transport` trait and holds an underlying transport. Any call to
/// `dial` with a multiaddr that contains `/dns/`, `/dns4/`, or `/dns6/` will be first be resolved,
/// then passed to the underlying transport.
///
/// Listening is unaffected.
pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
/// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses
/// using `async-std` for all async I/O.
#[cfg(feature = "async-std")]
pub type DnsConfig<T> = GenDnsConfig<T, AsyncStdConnection, AsyncStdConnectionProvider>;
/// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses
/// using `tokio` for all async I/O.
#[cfg(feature = "tokio")]
pub type TokioDnsConfig<T> = GenDnsConfig<T, TokioConnection, TokioConnectionProvider>;
/// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses.
#[derive(Clone)]
pub struct DnsConfig<T> {
/// Underlying transport to use once the DNS addresses have been resolved.
pub struct GenDnsConfig<T, C, P>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>
{
/// The underlying transport.
inner: T,
/// Pool of threads to use when resolving DNS addresses.
thread_pool: futures::executor::ThreadPool,
/// The DNS resolver used when dialing addresses with DNS components.
resolver: AsyncResolver<C, P>,
}
#[cfg(feature = "async-std")]
impl<T> DnsConfig<T> {
/// Creates a new configuration object for DNS.
pub fn new(inner: T) -> Result<DnsConfig<T>, io::Error> {
DnsConfig::with_resolve_threads(inner, 1)
/// Creates a new [`DnsConfig`] from the OS's DNS configuration and defaults.
pub async fn system(inner: T) -> Result<DnsConfig<T>, io::Error> {
let (cfg, opts) = system_conf::read_system_conf()?;
Self::custom(inner, cfg, opts).await
}
/// Same as `new`, but allows specifying a number of threads for the resolving.
pub fn with_resolve_threads(inner: T, num_threads: usize) -> Result<DnsConfig<T>, io::Error> {
let thread_pool = futures::executor::ThreadPool::builder()
.pool_size(num_threads)
.name_prefix("libp2p-dns-")
.create()?;
trace!("Created a DNS thread pool");
/// Creates a [`DnsConfig`] with a custom resolver configuration and options.
pub async fn custom(inner: T, cfg: ResolverConfig, opts: ResolverOpts)
-> Result<DnsConfig<T>, io::Error>
{
Ok(DnsConfig {
inner,
thread_pool,
resolver: async_std_resolver::resolver(cfg, opts).await?
})
}
}
impl<T> fmt::Debug for DnsConfig<T>
where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("DnsConfig").field(&self.inner).finish()
#[cfg(feature = "tokio")]
impl<T> TokioDnsConfig<T> {
/// Creates a new [`TokioDnsConfig`] from the OS's DNS configuration and defaults.
pub fn system(inner: T) -> Result<TokioDnsConfig<T>, io::Error> {
let (cfg, opts) = system_conf::read_system_conf()?;
Self::custom(inner, cfg, opts)
}
/// Creates a [`TokioDnsConfig`] with a custom resolver configuration
/// and options.
pub fn custom(inner: T, cfg: ResolverConfig, opts: ResolverOpts)
-> Result<TokioDnsConfig<T>, io::Error>
{
Ok(TokioDnsConfig {
inner,
resolver: TokioAsyncResolver::tokio(cfg, opts)?
})
}
}
impl<T> Transport for DnsConfig<T>
impl<T, C, P> fmt::Debug for GenDnsConfig<T, C, P>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("GenDnsConfig").field(&self.inner).finish()
}
}
impl<T, C, P> Transport for GenDnsConfig<T, C, P>
where
T: Transport + Send + 'static,
T::Error: Send,
T::Dial: Send
T::Dial: Send,
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
type Output = T::Output;
type Error = DnsErr<T::Error>;
type Listener = stream::MapErr<
stream::MapOk<T::Listener,
fn(ListenerEvent<T::ListenerUpgrade, T::Error>) -> ListenerEvent<Self::ListenerUpgrade, Self::Error>>,
fn(ListenerEvent<T::ListenerUpgrade, T::Error>)
-> ListenerEvent<Self::ListenerUpgrade, Self::Error>>,
fn(T::Error) -> Self::Error>;
type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
type Dial = future::Either<
@ -107,100 +157,59 @@ where
>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let listener = self.inner.listen_on(addr).map_err(|err| err.map(DnsErr::Underlying))?;
let listener = self.inner.listen_on(addr).map_err(|err| err.map(DnsErr::Transport))?;
let listener = listener
.map_ok::<_, fn(_) -> _>(|event| {
event
.map(|upgr| {
upgr.map_err::<_, fn(_) -> _>(DnsErr::Underlying)
upgr.map_err::<_, fn(_) -> _>(DnsErr::Transport)
})
.map_err(DnsErr::Underlying)
.map_err(DnsErr::Transport)
})
.map_err::<_, fn(_) -> _>(DnsErr::Underlying);
.map_err::<_, fn(_) -> _>(DnsErr::Transport);
Ok(listener)
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// As an optimization, we immediately pass through if no component of the address contain
// a DNS protocol.
let contains_dns = addr.iter().any(|cmp| match cmp {
Protocol::Dns(_) => true,
Protocol::Dns4(_) => true,
Protocol::Dns6(_) => true,
_ => false,
});
if !contains_dns {
// Check if there are any domain names in the address. If not, proceed
// straight away with dialing on the underlying transport.
if !addr.iter().any(|p| match p {
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => true,
_ => false
}) {
trace!("Pass-through address without DNS: {}", addr);
let inner_dial = self.inner.dial(addr)
.map_err(|err| err.map(DnsErr::Underlying))?;
return Ok(inner_dial.map_err::<_, fn(_) -> _>(DnsErr::Underlying).left_future());
.map_err(|err| err.map(DnsErr::Transport))?;
return Ok(inner_dial.map_err::<_, fn(_) -> _>(DnsErr::Transport).left_future());
}
trace!("Dialing address with DNS: {}", addr);
let resolve_futs = addr.iter()
.map(|cmp| match cmp {
Protocol::Dns(ref name) | Protocol::Dns4(ref name) | Protocol::Dns6(ref name) => {
let name = name.to_string();
let to_resolve = format!("{}:0", name);
let (tx, rx) = oneshot::channel();
self.thread_pool.spawn_ok(async {
let to_resolve = to_resolve;
let _ = tx.send(match to_resolve[..].to_socket_addrs() {
Ok(list) => Ok(list.map(|s| s.ip()).collect::<Vec<_>>()),
Err(e) => Err(e),
});
});
// Asynchronlously resolve all DNS names in the address before proceeding
// with dialing on the underlying transport.
Ok(async move {
let resolver = self.resolver;
let inner = self.inner;
let (dns4, dns6) = match cmp {
Protocol::Dns(_) => (true, true),
Protocol::Dns4(_) => (true, false),
Protocol::Dns6(_) => (false, true),
_ => unreachable!(),
};
trace!("Resolving DNS: {}", addr);
async move {
let list = rx.await
.map_err(|_| {
error!("DNS resolver crashed");
DnsErr::ResolveFail(name.clone())
})?
.map_err(|err| DnsErr::ResolveError {
domain_name: name.clone(),
error: err,
})?;
let resolved = addr.into_iter()
.map(|proto| resolve(proto, &resolver))
.collect::<FuturesOrdered<_>>()
.collect::<Vec<Result<Protocol<'_>, Self::Error>>>()
.await
.into_iter()
.collect::<Result<Vec<Protocol<'_>>, Self::Error>>()?
.into_iter()
.collect::<Multiaddr>();
list.into_iter()
.filter_map(|addr| {
if (dns4 && addr.is_ipv4()) || (dns6 && addr.is_ipv6()) {
Some(Protocol::from(addr))
} else {
None
}
})
.next()
.ok_or_else(|| DnsErr::ResolveFail(name))
}.left_future()
},
cmp => future::ready(Ok(cmp.acquire())).right_future()
})
.collect::<stream::FuturesOrdered<_>>();
debug!("DNS resolved: {} => {}", addr, resolved);
let future = resolve_futs.collect::<Vec<_>>()
.then(move |outcome| async move {
let outcome = outcome.into_iter().collect::<Result<Vec<_>, _>>()?;
let outcome = outcome.into_iter().collect::<Multiaddr>();
debug!("DNS resolution outcome: {} => {}", addr, outcome);
match self.inner.dial(outcome) {
Ok(d) => d.await.map_err(DnsErr::Underlying),
Err(TransportError::MultiaddrNotSupported(_addr)) =>
Err(DnsErr::MultiaddrNotSupported),
Err(TransportError::Other(err)) => Err(DnsErr::Underlying(err))
}
});
Ok(future.boxed().right_future())
match inner.dial(resolved) {
Ok(out) => out.await.map_err(DnsErr::Transport),
Err(TransportError::MultiaddrNotSupported(a)) =>
Err(DnsErr::MultiaddrNotSupported(a)),
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err))
}
}.boxed().right_future())
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
@ -208,20 +217,15 @@ where
}
}
/// Error that can be generated by the DNS layer.
/// The possible errors of a [`GenDnsConfig`] wrapped transport.
#[derive(Debug)]
pub enum DnsErr<TErr> {
/// Error in the underlying transport layer.
Underlying(TErr),
/// Failed to find any IP address for this DNS address.
ResolveFail(String),
/// Error while resolving a DNS address.
ResolveError {
domain_name: String,
error: io::Error,
},
/// Found an IP address, but the underlying transport doesn't support the multiaddr.
MultiaddrNotSupported,
/// The underlying transport encountered an error.
Transport(TErr),
/// DNS resolution failed.
ResolveError(ResolveError),
/// DNS resolution was successful, but the underlying transport refused the resolved address.
MultiaddrNotSupported(Multiaddr),
}
impl<TErr> fmt::Display for DnsErr<TErr>
@ -229,12 +233,9 @@ where TErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DnsErr::Underlying(err) => write!(f, "{}", err),
DnsErr::ResolveFail(addr) => write!(f, "Failed to resolve DNS address: {:?}", addr),
DnsErr::ResolveError { domain_name, error } => {
write!(f, "Failed to resolve DNS address: {:?}; {:?}", domain_name, error)
},
DnsErr::MultiaddrNotSupported => write!(f, "Resolve multiaddr not supported"),
DnsErr::Transport(err) => write!(f, "{}", err),
DnsErr::ResolveError(err) => write!(f, "{}", err),
DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a),
}
}
}
@ -244,18 +245,67 @@ where TErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DnsErr::Underlying(err) => Some(err),
DnsErr::ResolveFail(_) => None,
DnsErr::ResolveError { error, .. } => Some(error),
DnsErr::MultiaddrNotSupported => None,
DnsErr::Transport(err) => Some(err),
DnsErr::ResolveError(err) => Some(err),
DnsErr::MultiaddrNotSupported(_) => None,
}
}
}
/// Asynchronously resolves the domain name of a `Dns`, `Dns4` or `Dns6` protocol
/// component. If the given protocol is not a DNS component, it is returned unchanged.
fn resolve<'a, E: 'a, C, P>(proto: Protocol<'a>, resolver: &'a AsyncResolver<C,P>)
-> impl Future<Output = Result<Protocol<'a>, DnsErr<E>>> + 'a
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
match proto {
Protocol::Dns(ref name) => {
resolver.lookup_ip(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter()
.next()
.map(Protocol::from)
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
Err(e) => return Err(DnsErr::ResolveError(e))
}).left_future()
}
Protocol::Dns4(ref name) => {
resolver.ipv4_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter()
.map(IpAddr::from)
.next()
.map(Protocol::from)
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
Err(e) => return Err(DnsErr::ResolveError(e))
}).left_future().left_future().right_future()
}
Protocol::Dns6(ref name) => {
resolver.ipv6_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter()
.map(IpAddr::from)
.next()
.map(Protocol::from)
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
Err(e) => return Err(DnsErr::ResolveError(e))
}).right_future().left_future().right_future()
},
proto => future::ready(Ok(proto)).right_future().right_future()
}
}
fn fqdn(name: &std::borrow::Cow<'_, str>) -> String {
if name.ends_with(".") {
name.to_string()
} else {
format!("{}.", name)
}
}
#[cfg(test)]
mod tests {
use super::DnsConfig;
use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
use super::*;
use futures::{future::BoxFuture, stream::BoxStream};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
@ -265,6 +315,8 @@ mod tests {
#[test]
fn basic_resolve() {
let _ = env_logger::try_init();
#[derive(Clone)]
struct CustomTransport;
@ -299,9 +351,15 @@ mod tests {
}
}
futures::executor::block_on(async move {
let transport = DnsConfig::new(CustomTransport).unwrap();
async fn run<T, C, P>(transport: GenDnsConfig<T, C, P>)
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
T: Transport + Clone + Send + 'static,
T::Error: Send,
T::Dial: Send,
{
// Success due to existing A record for example.com.
let _ = transport
.clone()
.dial("/dns4/example.com/tcp/20000".parse().unwrap())
@ -309,6 +367,7 @@ mod tests {
.await
.unwrap();
// Success due to existing AAAA record for example.com.
let _ = transport
.clone()
.dial("/dns6/example.com/tcp/20000".parse().unwrap())
@ -316,11 +375,45 @@ mod tests {
.await
.unwrap();
// Success due to pass-through, i.e. nothing to resolve.
let _ = transport
.clone()
.dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
});
// Failure due to no records.
match transport
.clone()
.dial("/dns4/example.invalid/tcp/20000".parse().unwrap())
.unwrap()
.await
{
Err(DnsErr::ResolveError(e)) => match e.kind() {
ResolveErrorKind::NoRecordsFound { .. } => {},
_ => panic!("Unexpected DNS error: {:?}", e),
},
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => panic!("Unexpected success."),
}
}
#[cfg(feature = "async-std")]
{
async_std_crate::task::block_on(
DnsConfig::system(CustomTransport).then(|dns| run(dns.unwrap()))
);
}
#[cfg(feature = "tokio")]
{
let rt = tokio_crate::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
rt.block_on(run(TokioDnsConfig::system(CustomTransport).unwrap()));
}
}
}