1
0
mirror of https://github.com/fluencelabs/rust-libp2p synced 2025-06-17 03:51:22 +00:00

[libp2p-dns] Implement /dnsaddr resolution. ()

* Implement `/dnsaddr` support on `libp2p-dns`.

To that end, since resolving `/dnsaddr` addresses needs
"fully qualified" multiaddresses when dialing, i.e. those
that end with the `/p2p/...` protocol, we make sure that
dialing always uses such fully qualified addresses by
appending the `/p2p` protocol as necessary. As a side-effect,
this adds support for dialing peers via "fully qualified"
addresses, as an alternative to using a `PeerId` together
with a `Multiaddr` with or without the `/p2p` protocol.

* Adapt libp2p-relay.

* Update versions, changelogs and small cleanups.
This commit is contained in:
Roman Borschel
2021-03-17 10:53:19 +01:00
committed by GitHub
parent c1f75eee81
commit 45f07bf863
57 changed files with 738 additions and 309 deletions

@ -64,35 +64,35 @@ atomic = "0.5.0"
bytes = "1" bytes = "1"
futures = "0.3.1" futures = "0.3.1"
lazy_static = "1.2" lazy_static = "1.2"
libp2p-core = { version = "0.27.2", path = "core", default-features = false } libp2p-core = { version = "0.28.0", path = "core", default-features = false }
libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true } libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true } libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true } libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true } libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.27.2", path = "muxers/mplex", optional = true } libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.29.0", path = "transports/noise", optional = true } libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.28.0", path = "protocols/ping", optional = true } libp2p-ping = { version = "0.28.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.27.1", path = "transports/plaintext", optional = true } libp2p-plaintext = { version = "0.28.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.20.0", path = "transports/pnet", optional = true } libp2p-pnet = { version = "0.20.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.1.0", path = "protocols/relay", optional = true } libp2p-relay = { version = "0.1.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.10.0", path = "protocols/request-response", optional = true } libp2p-request-response = { version = "0.10.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.28.0", path = "swarm" } libp2p-swarm = { version = "0.28.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.22.0", path = "swarm-derive" } libp2p-swarm-derive = { version = "0.22.0", path = "swarm-derive" }
libp2p-uds = { version = "0.27.0", path = "transports/uds", optional = true } libp2p-uds = { version = "0.28.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.27.0", path = "transports/wasm-ext", default-features = false, optional = true } libp2p-wasm-ext = { version = "0.28.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.30.1", path = "muxers/yamux", optional = true } libp2p-yamux = { version = "0.31.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.11.1", path = "misc/multiaddr" } multiaddr = { package = "parity-multiaddr", version = "0.11.2", path = "misc/multiaddr" }
parking_lot = "0.11.0" parking_lot = "0.11.0"
pin-project = "1.0.0" pin-project = "1.0.0"
smallvec = "1.6.1" smallvec = "1.6.1"
wasm-timer = "0.2.4" wasm-timer = "0.2.4"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.27.1", path = "transports/deflate", optional = true } libp2p-deflate = { version = "0.28.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false } libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true } libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.27.1", path = "transports/tcp", default-features = false, optional = true } libp2p-tcp = { version = "0.28.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true } libp2p-websocket = { version = "0.29.0", path = "transports/websocket", optional = true }
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] } async-std = { version = "1.6.2", features = ["attributes"] }

@ -1,4 +1,12 @@
# 0.27.2 [unreleased] # 0.28.0 [unreleased]
- `Network::dial()` understands `/p2p` addresses and `Transport::dial`
gets a "fully qualified" `/p2p` address when dialing a specific peer,
whether through the `Network::peer()` API or via `Network::dial()`
with a `/p2p` address.
- `Network::dial()` and `network::Peer::dial()` return a `DialError`
on error.
- Shorten and unify `Debug` impls of public keys. - Shorten and unify `Debug` impls of public keys.

@ -2,7 +2,7 @@
name = "libp2p-core" name = "libp2p-core"
edition = "2018" edition = "2018"
description = "Core traits and structs of libp2p" description = "Core traits and structs of libp2p"
version = "0.27.2" version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"

@ -554,7 +554,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Returns an iterator over all connected peers, i.e. those that have /// Returns an iterator over all connected peers, i.e. those that have
/// at least one established connection in the pool. /// at least one established connection in the pool.
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a { pub fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
self.established.keys() self.established.keys()
} }

@ -209,13 +209,14 @@ where
&self.local_peer_id &self.local_peer_id
} }
/// Dials a multiaddress without expecting a particular remote peer ID. /// Dials a [`Multiaddr`] that may or may not encapsulate a
/// specific expected remote peer ID.
/// ///
/// The given `handler` will be used to create the /// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the /// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned. /// connection ID is returned.
pub fn dial(&mut self, address: &Multiaddr, handler: THandler) pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, ConnectionLimit> -> Result<ConnectionId, DialError>
where where
TTrans: Transport<Output = (PeerId, TMuxer)>, TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static, TTrans::Error: Send + 'static,
@ -225,15 +226,32 @@ where
TInEvent: Send + 'static, TInEvent: Send + 'static,
TOutEvent: Send + 'static, TOutEvent: Send + 'static,
{ {
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() {
if let Ok(peer) = PeerId::try_from(ma) {
return self.dial_peer(DialingOpts {
peer,
address: address.clone(),
handler,
remaining: Vec::new(),
})
}
}
// The address does not specify an expected peer, so just try to dial it as-is,
// accepting any peer ID that the remote identifies as.
let info = OutgoingInfo { address, peer_id: None }; let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) { match self.transport().clone().dial(address.clone()) {
Ok(f) => { Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info) self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
} }
Err(err) => { Err(err) => {
let f = future::err(PendingConnectionError::Transport(err)); let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info) self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
} }
} }
} }
@ -430,7 +448,7 @@ where
/// Initiates a connection attempt to a known peer. /// Initiates a connection attempt to a known peer.
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>) fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
-> Result<ConnectionId, ConnectionLimit> -> Result<ConnectionId, DialError>
where where
TTrans: Transport<Output = (PeerId, TMuxer)>, TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static, TTrans::Dial: Send + 'static,
@ -460,7 +478,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
<THandler::Handler as ConnectionHandler>::Error>, <THandler::Handler as ConnectionHandler>::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>, dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler> opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit> ) -> Result<ConnectionId, DialError>
where where
THandler: IntoConnectionHandler + Send + 'static, THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static, <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
@ -478,23 +496,28 @@ where
TInEvent: Send + 'static, TInEvent: Send + 'static,
TOutEvent: Send + 'static, TOutEvent: Send + 'static,
{ {
let result = match transport.dial(opts.address.clone()) { // Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
// to work with.
let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?;
let result = match transport.dial(addr.clone()) {
Ok(fut) => { Ok(fut) => {
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e))); let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) }; let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info) pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
}, },
Err(err) => { Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err)); let fut = future::err(PendingConnectionError::Transport(err));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) }; let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info) pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
}, },
}; };
if let Ok(id) = &result { if let Ok(id) = &result {
dialing.entry(opts.peer).or_default().push( dialing.entry(opts.peer).or_default().push(
peer::DialingState { peer::DialingState {
current: (*id, opts.address), current: (*id, addr),
remaining: opts.remaining, remaining: opts.remaining,
}, },
); );
@ -668,6 +691,37 @@ impl NetworkConfig {
} }
} }
/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
///
/// If the given address is already a `p2p` address for the given peer,
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
///
/// If the given address is already a `p2p` address for a different peer
/// than the one given, the given `Multiaddr` is returned as an `Err`.
///
/// If the given address is not yet a `p2p` address for the given peer,
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() {
if &hash != peer.as_ref() {
return Err(addr)
}
Ok(addr)
} else {
Ok(addr.with(multiaddr::Protocol::P2p(peer.into())))
}
}
/// Possible (synchronous) errors when dialing a peer.
#[derive(Clone, Debug)]
pub enum DialError {
/// The dialing attempt is rejected because of a connection limit.
ConnectionLimit(ConnectionLimit),
/// The address being dialed is invalid, e.g. if it refers to a different
/// remote peer than the one being dialed.
InvalidAddress(Multiaddr),
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

@ -45,7 +45,7 @@ use std::{
error, error,
fmt, fmt,
}; };
use super::{Network, DialingOpts}; use super::{Network, DialingOpts, DialError};
/// The possible representations of a peer in a [`Network`], as /// The possible representations of a peer in a [`Network`], as
/// seen by the local node. /// seen by the local node.
@ -210,7 +210,7 @@ where
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler) pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result< -> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>), (ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
ConnectionLimit DialError
> >
where where
I: IntoIterator<Item = Multiaddr>, I: IntoIterator<Item = Multiaddr>,
@ -219,7 +219,9 @@ where
Peer::Connected(p) => (p.peer_id, p.network), Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network), Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network), Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 }) Peer::Local => return Err(DialError::ConnectionLimit(ConnectionLimit {
current: 0, limit: 0
}))
}; };
let id = network.dial_peer(DialingOpts { let id = network.dial_peer(DialingOpts {

@ -263,19 +263,14 @@ impl Drop for Listener {
/// If the address is `/memory/n`, returns the value of `n`. /// If the address is `/memory/n`, returns the value of `n`.
fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> { fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> {
let mut iter = a.iter(); let mut protocols = a.iter();
match protocols.next() {
let port = if let Some(Protocol::Memory(port)) = iter.next() { Some(Protocol::Memory(port)) => match protocols.next() {
port None | Some(Protocol::P2p(_)) => Ok(port),
} else { _ => Err(())
return Err(()); }
}; _ => Err(())
if iter.next().is_some() {
return Err(());
} }
Ok(port)
} }
/// A channel represents an established, in-memory, logical connection between two endpoints. /// A channel represents an established, in-memory, logical connection between two endpoints.

@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{ use libp2p_core::{
PeerId, PeerId,
connection::PendingConnectionError, connection::PendingConnectionError,
network::{NetworkEvent, NetworkConfig, ConnectionLimits}, network::{NetworkEvent, NetworkConfig, ConnectionLimits, DialError},
}; };
use rand::Rng; use rand::Rng;
use std::task::Poll; use std::task::Poll;
@ -47,12 +47,16 @@ fn max_outgoing() {
.expect("Unexpected connection limit."); .expect("Unexpected connection limit.");
} }
let err = network.peer(target.clone()) match network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler()) .dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success."); .expect_err("Unexpected dialing success.")
{
assert_eq!(err.current, outgoing_limit); DialError::ConnectionLimit(err) => {
assert_eq!(err.limit, outgoing_limit); assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
}
e => panic!("Unexpected error: {:?}", e),
}
let info = network.info(); let info = network.info();
assert_eq!(info.num_peers(), 0); assert_eq!(info.num_peers(), 0);

@ -25,6 +25,7 @@ use libp2p_core::multiaddr::multiaddr;
use libp2p_core::{ use libp2p_core::{
PeerId, PeerId,
connection::PendingConnectionError, connection::PendingConnectionError,
multiaddr::Protocol,
network::{NetworkEvent, NetworkConfig}, network::{NetworkEvent, NetworkConfig},
}; };
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
@ -70,7 +71,7 @@ fn deny_incoming_connec() {
error: PendingConnectionError::Transport(_) error: PendingConnectionError::Transport(_)
}) => { }) => {
assert_eq!(&peer_id, swarm1.local_peer_id()); assert_eq!(&peer_id, swarm1.local_peer_id());
assert_eq!(multiaddr, address); assert_eq!(multiaddr, address.clone().with(Protocol::P2p(peer_id.into())));
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
}, },
Poll::Ready(_) => unreachable!(), Poll::Ready(_) => unreachable!(),
@ -162,21 +163,27 @@ fn dial_self_by_id() {
fn multiple_addresses_err() { fn multiple_addresses_err() {
// Tries dialing multiple addresses, and makes sure there's one dialing error per address. // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
let target = PeerId::random();
let mut swarm = test_network(NetworkConfig::default()); let mut swarm = test_network(NetworkConfig::default());
let mut addresses = Vec::new(); let mut addresses = Vec::new();
for _ in 0 .. 3 { for _ in 0 .. 3 {
addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]); addresses.push(multiaddr![
Ip4([0, 0, 0, 0]),
Tcp(rand::random::<u16>())
]);
} }
for _ in 0 .. 5 { for _ in 0 .. 5 {
addresses.push(multiaddr![Udp(rand::random::<u16>())]); addresses.push(multiaddr![
Udp(rand::random::<u16>())
]);
} }
addresses.shuffle(&mut rand::thread_rng()); addresses.shuffle(&mut rand::thread_rng());
let first = addresses[0].clone(); let first = addresses[0].clone();
let rest = (&addresses[1..]).iter().cloned(); let rest = (&addresses[1..]).iter().cloned();
let target = PeerId::random();
swarm.peer(target.clone()) swarm.peer(target.clone())
.dial(first, rest, TestHandler()) .dial(first, rest, TestHandler())
.unwrap(); .unwrap();
@ -191,7 +198,7 @@ fn multiple_addresses_err() {
error: PendingConnectionError::Transport(_) error: PendingConnectionError::Transport(_)
}) => { }) => {
assert_eq!(peer_id, target); assert_eq!(peer_id, target);
let expected = addresses.remove(0); let expected = addresses.remove(0).with(Protocol::P2p(target.clone().into()));
assert_eq!(multiaddr, expected); assert_eq!(multiaddr, expected);
if addresses.is_empty() { if addresses.is_empty() {
assert_eq!(attempts_remaining, 0); assert_eq!(attempts_remaining, 0);

@ -25,6 +25,7 @@
use async_std::task; use async_std::task;
use libp2p::{ use libp2p::{
Multiaddr,
Swarm, Swarm,
PeerId, PeerId,
identity, identity,
@ -38,7 +39,14 @@ use libp2p::kad::{
QueryResult, QueryResult,
}; };
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use std::{env, error::Error, time::Duration}; use std::{env, error::Error, str::FromStr, time::Duration};
const BOOTNODES: [&'static str; 4] = [
"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt"
];
#[async_std::main] #[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
@ -59,28 +67,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
let store = MemoryStore::new(local_peer_id.clone()); let store = MemoryStore::new(local_peer_id.clone());
let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg); let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg);
// TODO: the /dnsaddr/ scheme is not supported (https://github.com/libp2p/rust-libp2p/issues/967) // Add the bootnodes to the local routing table. `libp2p-dns` built
/*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap()); // into the `transport` resolves the `dnsaddr` when Kademlia tries
behaviour.add_address(&"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap()); // to dial these nodes.
behaviour.add_address(&"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap()); let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io")?;
behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/ for peer in &BOOTNODES {
behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone());
}
// The only address that currently works.
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse()?, "/ip4/104.131.131.82/tcp/4001".parse()?);
// The following addresses always fail signature verification, possibly due to
// RSA keys with < 2048 bits.
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
// The following addresses are permanently unreachable:
// Other(Other(A(Transport(A(Underlying(Os { code: 101, kind: Other, message: "Network is unreachable" }))))))
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
Swarm::new(transport, behaviour, local_peer_id) Swarm::new(transport, behaviour, local_peer_id)
}; };

@ -1,3 +1,7 @@
# 0.11.2 [unreleased]
- Add `Multiaddr::ends_with()`.
# 0.11.1 [2021-02-15] # 0.11.1 [2021-02-15]
- Update dependencies - Update dependencies

@ -6,7 +6,7 @@ description = "Implementation of the multiaddr format"
homepage = "https://github.com/libp2p/rust-libp2p" homepage = "https://github.com/libp2p/rust-libp2p"
keywords = ["multiaddr", "ipfs"] keywords = ["multiaddr", "ipfs"]
license = "MIT" license = "MIT"
version = "0.11.1" version = "0.11.2"
[features] [features]
default = ["url"] default = ["url"]

@ -174,6 +174,16 @@ impl Multiaddr {
if replaced { Some(address) } else { None } if replaced { Some(address) } else { None }
} }
/// Checks whether the given `Multiaddr` is a suffix of this `Multiaddr`.
pub fn ends_with(&self, other: &Multiaddr) -> bool {
let n = self.bytes.len();
let m = other.bytes.len();
if n < m {
return false
}
self.bytes[(n - m) ..] == other.bytes[..]
}
} }
impl fmt::Debug for Multiaddr { impl fmt::Debug for Multiaddr {

@ -56,6 +56,18 @@ fn push_pop_identity() {
QuickCheck::new().quickcheck(prop as fn(Ma, Proto) -> bool) QuickCheck::new().quickcheck(prop as fn(Ma, Proto) -> bool)
} }
#[test]
fn ends_with() {
fn prop(Ma(m): Ma) {
let n = m.iter().count();
for i in 0 .. n {
let suffix = m.iter().skip(i).collect::<Multiaddr>();
assert!(m.ends_with(&suffix));
}
}
QuickCheck::new().quickcheck(prop as fn(_))
}
// Arbitrary impls // Arbitrary impls

@ -260,7 +260,7 @@ where
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
*this.state = SeqState::SendProtocol { io, protocol } *this.state = SeqState::SendProtocol { io, protocol }
} }
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
} }
} }

@ -1,4 +1,4 @@
# 0.27.2 [unreleased] # 0.28.0 [unreleased]
- Update dependencies. - Update dependencies.

@ -2,7 +2,7 @@
name = "libp2p-mplex" name = "libp2p-mplex"
edition = "2018" edition = "2018"
description = "Mplex multiplexing protocol for libp2p" description = "Mplex multiplexing protocol for libp2p"
version = "0.27.2" version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
bytes = "1" bytes = "1"
futures = "0.3.1" futures = "0.3.1"
asynchronous-codec = "0.6" asynchronous-codec = "0.6"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4" log = "0.4"
nohash-hasher = "0.2" nohash-hasher = "0.2"
parking_lot = "0.11" parking_lot = "0.11"

@ -238,7 +238,7 @@ where
num_buffered += 1; num_buffered += 1;
} }
Frame::Close { stream_id } => { Frame::Close { stream_id } => {
self.on_close(stream_id.into_local())?; self.on_close(stream_id.into_local());
} }
Frame::Reset { stream_id } => { Frame::Reset { stream_id } => {
self.on_reset(stream_id.into_local()) self.on_reset(stream_id.into_local())
@ -460,7 +460,7 @@ where
} }
Frame::Close { stream_id } => { Frame::Close { stream_id } => {
let stream_id = stream_id.into_local(); let stream_id = stream_id.into_local();
self.on_close(stream_id)?; self.on_close(stream_id);
if id == stream_id { if id == stream_id {
return Poll::Ready(Ok(None)) return Poll::Ready(Ok(None))
} }
@ -683,7 +683,7 @@ where
} }
/// Processes an inbound `Close` frame. /// Processes an inbound `Close` frame.
fn on_close(&mut self, id: LocalStreamId) -> io::Result<()> { fn on_close(&mut self, id: LocalStreamId) {
if let Some(state) = self.substreams.remove(&id) { if let Some(state) = self.substreams.remove(&id) {
match state { match state {
SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => { SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => {
@ -715,8 +715,6 @@ where
trace!("{}: Ignoring `Close` for unknown substream {}. Possibly dropped earlier.", trace!("{}: Ignoring `Close` for unknown substream {}. Possibly dropped earlier.",
self.id, id); self.id, id);
} }
Ok(())
} }
/// Generates the next outbound stream ID. /// Generates the next outbound stream ID.

@ -1,3 +1,7 @@
# 0.31.0 [unreleased]
- Update `libp2p-core`.
# 0.30.1 [2021-02-17] # 0.30.1 [2021-02-17]
- Update `yamux` to `0.8.1`. - Update `yamux` to `0.8.1`.

@ -2,7 +2,7 @@
name = "libp2p-yamux" name = "libp2p-yamux"
edition = "2018" edition = "2018"
description = "Yamux multiplexing protocol for libp2p" description = "Yamux multiplexing protocol for libp2p"
version = "0.30.1" version = "0.31.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
parking_lot = "0.11" parking_lot = "0.11"
thiserror = "1.0" thiserror = "1.0"
yamux = "0.8.1" yamux = "0.8.1"

@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
cuckoofilter = "0.5.0" cuckoofilter = "0.5.0"
fnv = "1.0" fnv = "1.0"
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4" log = "0.4"
prost = "0.7" prost = "0.7"

@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
libp2p-swarm = { version = "0.28.0", path = "../../swarm" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
bytes = "1.0" bytes = "1.0"
byteorder = "1.3.4" byteorder = "1.3.4"
fnv = "1.0.7" fnv = "1.0.7"

@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.1" log = "0.4.1"
prost = "0.7" prost = "0.7"

@ -17,7 +17,7 @@ fnv = "1.0"
asynchronous-codec = "0.6" asynchronous-codec = "0.6"
futures = "0.3.1" futures = "0.3.1"
log = "0.4" log = "0.4"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
prost = "0.7" prost = "0.7"
rand = "0.7.2" rand = "0.7.2"

@ -1072,7 +1072,11 @@ fn manual_bucket_inserts() {
let mut swarms = build_connected_nodes_with_config(3, 1, cfg); let mut swarms = build_connected_nodes_with_config(3, 1, cfg);
// The peers and their addresses for which we expect `RoutablePeer` events. // The peers and their addresses for which we expect `RoutablePeer` events.
let mut expected = swarms.iter().skip(2) let mut expected = swarms.iter().skip(2)
.map(|(a, s)| (a.clone(), Swarm::local_peer_id(s).clone())) .map(|(a, s)| {
let pid = *Swarm::local_peer_id(s);
let addr = a.clone().with(Protocol::P2p(pid.into()));
(addr, pid)
})
.collect::<HashMap<_,_>>(); .collect::<HashMap<_,_>>();
// We collect the peers for which a `RoutablePeer` event // We collect the peers for which a `RoutablePeer` event
// was received in here to check at the end of the test // was received in here to check at the end of the test
@ -1087,7 +1091,7 @@ fn manual_bucket_inserts() {
Poll::Ready(Some(KademliaEvent::RoutablePeer { Poll::Ready(Some(KademliaEvent::RoutablePeer {
peer, address peer, address
})) => { })) => {
assert_eq!(peer, expected.remove(&address).expect("Unexpected address")); assert_eq!(peer, expected.remove(&address).expect("Missing address"));
routable.push(peer); routable.push(peer);
if expected.is_empty() { if expected.is_empty() {
for peer in routable.iter() { for peer in routable.iter() {

@ -16,7 +16,7 @@ dns-parser = "0.8.0"
futures = "0.3.13" futures = "0.3.13"
if-watch = "0.2.0" if-watch = "0.2.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.14" log = "0.4.14"
rand = "0.8.3" rand = "0.8.3"

@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.1" log = "0.4.1"
rand = "0.7.2" rand = "0.7.2"

@ -14,7 +14,7 @@ asynchronous-codec = "0.6"
bytes = "1" bytes = "1"
futures = "0.3.1" futures = "0.3.1"
futures-timer = "3" futures-timer = "3"
libp2p-core = { version = "0.27", path = "../../core" } libp2p-core = { version = "0.28", path = "../../core" }
libp2p-swarm = { version = "0.28", path = "../../swarm" } libp2p-swarm = { version = "0.28", path = "../../swarm" }
log = "0.4" log = "0.4"
pin-project = "1" pin-project = "1"

@ -401,14 +401,12 @@ impl<T: Transport> Stream for RelayListener<T> {
Poll::Ready(Some(BehaviourToListenerMsg::IncomingRelayedConnection { Poll::Ready(Some(BehaviourToListenerMsg::IncomingRelayedConnection {
stream, stream,
src_peer_id, src_peer_id,
relay_peer_id,
relay_addr, relay_addr,
relay_peer_id: _
})) => { })) => {
return Poll::Ready(Some(Ok(ListenerEvent::Upgrade { return Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
upgrade: RelayedListenerUpgrade::Relayed(Some(stream)), upgrade: RelayedListenerUpgrade::Relayed(Some(stream)),
local_addr: relay_addr local_addr: relay_addr.with(Protocol::P2pCircuit),
.with(Protocol::P2p(relay_peer_id.into()))
.with(Protocol::P2pCircuit),
remote_addr: Protocol::P2p(src_peer_id.into()).into(), remote_addr: Protocol::P2p(src_peer_id.into()).into(),
}))); })));
} }

@ -381,9 +381,10 @@ fn src_try_connect_to_offline_dst() {
loop { loop {
match src_swarm.next_event().await { match src_swarm.next_event().await {
SwarmEvent::UnknownPeerUnreachableAddr { address, .. } SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay => if address == dst_addr_via_relay =>
{ {
assert_eq!(peer_id, dst_peer_id);
break; break;
} }
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
@ -437,9 +438,10 @@ fn src_try_connect_to_unsupported_dst() {
loop { loop {
match src_swarm.next_event().await { match src_swarm.next_event().await {
SwarmEvent::UnknownPeerUnreachableAddr { address, .. } SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay => if address == dst_addr_via_relay =>
{ {
assert_eq!(peer_id, dst_peer_id);
break; break;
} }
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {}
@ -486,8 +488,10 @@ fn src_try_connect_to_offline_dst_via_offline_relay() {
// Source Node fail to reach Destination Node due to failure reaching Relay. // Source Node fail to reach Destination Node due to failure reaching Relay.
match src_swarm.next_event().await { match src_swarm.next_event().await {
SwarmEvent::UnknownPeerUnreachableAddr { address, .. } SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay => {} if address == dst_addr_via_relay => {
assert_eq!(peer_id, dst_peer_id);
}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
}); });
@ -573,6 +577,9 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
} }
} }
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
..
})) => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
} }
@ -1005,9 +1012,10 @@ fn yield_incoming_connection_through_correct_listener() {
} }
match src_3_swarm.next_event().boxed().poll_unpin(cx) { match src_3_swarm.next_event().boxed().poll_unpin(cx) {
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, .. }) Poll::Ready(SwarmEvent::UnreachableAddr { address, peer_id, .. })
if address == dst_addr_via_relay_3 => if address == dst_addr_via_relay_3 =>
{ {
assert_eq!(peer_id, dst_peer_id);
return Poll::Ready(()); return Poll::Ready(());
} }
Poll::Ready(SwarmEvent::Dialing { .. }) => {} Poll::Ready(SwarmEvent::Dialing { .. }) => {}

@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
async-trait = "0.1" async-trait = "0.1"
bytes = "1" bytes = "1"
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.11" log = "0.4.11"
lru = "0.6" lru = "0.6"

@ -283,9 +283,9 @@ pub async fn development_transport(keypair: identity::Keypair)
{ {
let transport = { let transport = {
let tcp = tcp::TcpConfig::new().nodelay(true); let tcp = tcp::TcpConfig::new().nodelay(true);
let transport = dns::DnsConfig::system(tcp).await?; let dns_tcp = dns::DnsConfig::system(tcp).await?;
let websockets = websocket::WsConfig::new(transport.clone()); let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone());
transport.or_transport(websockets) dns_tcp.or_transport(ws_dns_tcp)
}; };
let noise_keys = noise::Keypair::<noise::X25519Spec>::new() let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
@ -318,9 +318,9 @@ pub fn tokio_development_transport(keypair: identity::Keypair)
{ {
let transport = { let transport = {
let tcp = tcp::TokioTcpConfig::new().nodelay(true); let tcp = tcp::TokioTcpConfig::new().nodelay(true);
let transport = dns::TokioDnsConfig::system(tcp)?; let dns_tcp = dns::TokioDnsConfig::system(tcp)?;
let websockets = websocket::WsConfig::new(transport.clone()); let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone());
transport.or_transport(websockets) dns_tcp.or_transport(ws_dns_tcp)
}; };
let noise_keys = noise::Keypair::<noise::X25519Spec>::new() let noise_keys = noise::Keypair::<noise::X25519Spec>::new()

@ -1,5 +1,9 @@
# 0.28.0 [unreleased] # 0.28.0 [unreleased]
- New error variant `DialError::InvalidAddress`
- `Swarm::dial_addr()` now returns a `DialError` on error.
- Remove the option for a substream-specific multistream select protocol override. - Remove the option for a substream-specific multistream select protocol override.
The override at this granularity is no longer deemed useful, in particular because The override at this granularity is no longer deemed useful, in particular because
it can usually not be configured for existing protocols like `libp2p-kad` and others. it can usually not be configured for existing protocols like `libp2p-kad` and others.

@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
either = "1.6.0" either = "1.6.0"
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../core" } libp2p-core = { version = "0.28.0", path = "../core" }
log = "0.4" log = "0.4"
rand = "0.7" rand = "0.7"
smallvec = "1.6.1" smallvec = "1.6.1"

@ -113,6 +113,7 @@ use libp2p_core::{
transport::{self, TransportError}, transport::{self, TransportError},
muxing::StreamMuxerBox, muxing::StreamMuxerBox,
network::{ network::{
self,
ConnectionLimits, ConnectionLimits,
Network, Network,
NetworkInfo, NetworkInfo,
@ -359,11 +360,11 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
} }
/// Initiates a new dialing attempt to the given address. /// Initiates a new dialing attempt to the given address.
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> { pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), DialError> {
let handler = me.behaviour.new_handler() let handler = me.behaviour.new_handler()
.into_node_handler_builder() .into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override); .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.dial(&addr, handler).map(|_id| ()) Ok(me.network.dial(&addr, handler).map(|_id| ())?)
} }
/// Initiates a new dialing attempt to the given peer. /// Initiates a new dialing attempt to the given peer.
@ -386,7 +387,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
me.network.peer(*peer_id) me.network.peer(*peer_id)
.dial(first, addrs, handler) .dial(first, addrs, handler)
.map(|_| ()) .map(|_| ())
.map_err(DialError::ConnectionLimit) .map_err(DialError::from)
} else { } else {
Err(DialError::NoAddresses) Err(DialError::NoAddresses)
}; };
@ -1053,16 +1054,28 @@ pub enum DialError {
/// The configured limit for simultaneous outgoing connections /// The configured limit for simultaneous outgoing connections
/// has been reached. /// has been reached.
ConnectionLimit(ConnectionLimit), ConnectionLimit(ConnectionLimit),
/// The address given for dialing is invalid.
InvalidAddress(Multiaddr),
/// [`NetworkBehaviour::addresses_of_peer`] returned no addresses /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
/// for the peer to dial. /// for the peer to dial.
NoAddresses NoAddresses
} }
impl From<network::DialError> for DialError {
fn from(err: network::DialError) -> DialError {
match err {
network::DialError::ConnectionLimit(l) => DialError::ConnectionLimit(l),
network::DialError::InvalidAddress(a) => DialError::InvalidAddress(a),
}
}
}
impl fmt::Display for DialError { impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err), DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::InvalidAddress(a) => write!(f, "Dial error: invalid address: {}", a),
DialError::Banned => write!(f, "Dial error: peer is banned.") DialError::Banned => write!(f, "Dial error: peer is banned.")
} }
} }
@ -1072,6 +1085,7 @@ impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> { fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self { match self {
DialError::ConnectionLimit(err) => Some(err), DialError::ConnectionLimit(err) => Some(err),
DialError::InvalidAddress(_) => None,
DialError::NoAddresses => None, DialError::NoAddresses => None,
DialError::Banned => None DialError::Banned => None
} }

@ -1,3 +1,7 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
# 0.27.1 [2021-01-27] # 0.27.1 [2021-01-27]
- Ensure read buffers are initialised. - Ensure read buffers are initialised.

@ -2,7 +2,7 @@
name = "libp2p-deflate" name = "libp2p-deflate"
edition = "2018" edition = "2018"
description = "Deflate encryption protocol for libp2p" description = "Deflate encryption protocol for libp2p"
version = "0.27.1" version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
flate2 = "1.0" flate2 = "1.0"
[dev-dependencies] [dev-dependencies]

@ -1,5 +1,9 @@
# 0.28.0 [unreleased] # 0.28.0 [unreleased]
- Update `libp2p-core`.
- Add support for resolving `/dnsaddr` addresses.
- Use `trust-dns-resolver`, removing the internal thread pool and - Use `trust-dns-resolver`, removing the internal thread pool and
expanding the configurability of `libp2p-dns` by largely exposing the expanding the configurability of `libp2p-dns` by largely exposing the
configuration of `trust-dns-resolver`. configuration of `trust-dns-resolver`.

@ -10,11 +10,12 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.1" log = "0.4.1"
futures = "0.3.1" futures = "0.3.1"
trust-dns-resolver = { version = "0.20", default-features = false, features = ["system-config"] } trust-dns-resolver = { version = "0.20", default-features = false, features = ["system-config"] }
async-std-resolver = { version = "0.20", optional = true } async-std-resolver = { version = "0.20", optional = true }
smallvec = "1.6"
[dev-dependencies] [dev-dependencies]
env_logger = "0.6" env_logger = "0.6"

@ -24,10 +24,11 @@
//! [`DnsConfig`] and `TokioDnsConfig` for use with `async-std` and `tokio`, //! [`DnsConfig`] and `TokioDnsConfig` for use with `async-std` and `tokio`,
//! respectively. //! respectively.
//! //!
//! A [`GenDnsConfig`] is a [`Transport`] wrapper that is created around //! A [`GenDnsConfig`] is an address-rewriting [`Transport`] wrapper around
//! an inner `Transport`. The composed transport behaves like the inner //! an inner `Transport`. The composed transport behaves like the inner
//! transport, except that [`Transport::dial`] resolves `/dns`, `/dns4/` and //! transport, except that [`Transport::dial`] resolves `/dns/...`, `/dns4/...`,
//! `/dns6/` components of a given `Multiaddr` through a DNS. //! `/dns6/...` and `/dnsaddr/...` components of the given `Multiaddr` through
//! a DNS, replacing them with the resolved protocols (typically TCP/IP).
//! //!
//! The `async-std` feature and hence the `DnsConfig` are //! The `async-std` feature and hence the `DnsConfig` are
//! enabled by default. Tokio users can furthermore opt-in //! enabled by default. Tokio users can furthermore opt-in
@ -37,14 +38,14 @@
//! //!
//![trust-dns-resolver]: https://docs.rs/trust-dns-resolver/latest/trust_dns_resolver/#dns-over-tls-and-dns-over-https //![trust-dns-resolver]: https://docs.rs/trust-dns-resolver/latest/trust_dns_resolver/#dns-over-tls-and-dns-over-https
use futures::{prelude::*, future::BoxFuture, stream::FuturesOrdered}; use futures::{prelude::*, future::BoxFuture};
use libp2p_core::{ use libp2p_core::{
Transport, Transport,
multiaddr::{Protocol, Multiaddr}, multiaddr::{Protocol, Multiaddr},
transport::{TransportError, ListenerEvent} transport::{TransportError, ListenerEvent}
}; };
use log::{debug, trace}; use smallvec::SmallVec;
use std::{error, fmt, net::IpAddr}; use std::{borrow::Cow, convert::TryFrom, error, fmt, iter, net::IpAddr, str};
#[cfg(any(feature = "async-std", feature = "tokio"))] #[cfg(any(feature = "async-std", feature = "tokio"))]
use std::io; use std::io;
#[cfg(any(feature = "async-std", feature = "tokio"))] #[cfg(any(feature = "async-std", feature = "tokio"))]
@ -62,6 +63,24 @@ use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider};
pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind}; pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
/// The prefix for `dnsaddr` protocol TXT record lookups.
const DNSADDR_PREFIX: &'static str = "_dnsaddr.";
/// The maximum number of dialing attempts to resolved addresses.
const MAX_DIAL_ATTEMPTS: usize = 16;
/// The maximum number of DNS lookups when dialing.
///
/// This limit is primarily a safeguard against too many, possibly
/// even cyclic, indirections in the addresses obtained from the
/// TXT records of a `/dnsaddr`.
const MAX_DNS_LOOKUPS: usize = 32;
/// The maximum number of TXT records applicable for the address
/// being dialed that are considered for further lookups as a
/// result of a single `/dnsaddr` lookup.
const MAX_TXT_RECORDS: usize = 16;
/// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses /// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses
/// using `async-std` for all async I/O. /// using `async-std` for all async I/O.
#[cfg(feature = "async-std")] #[cfg(feature = "async-std")]
@ -137,7 +156,7 @@ where
impl<T, C, P> Transport for GenDnsConfig<T, C, P> impl<T, C, P> Transport for GenDnsConfig<T, C, P>
where where
T: Transport + Send + 'static, T: Transport + Clone + Send + 'static,
T::Error: Send, T::Error: Send,
T::Dial: Send, T::Dial: Send,
C: DnsHandle<Error = ResolveError>, C: DnsHandle<Error = ResolveError>,
@ -171,44 +190,120 @@ where
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// Check if there are any domain names in the address. If not, proceed
// straight away with dialing on the underlying transport.
if !addr.iter().any(|p| match p {
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => true,
_ => false
}) {
trace!("Pass-through address without DNS: {}", addr);
let inner_dial = self.inner.dial(addr)
.map_err(|err| err.map(DnsErr::Transport))?;
return Ok(inner_dial.map_err::<_, fn(_) -> _>(DnsErr::Transport).left_future());
}
// Asynchronlously resolve all DNS names in the address before proceeding // Asynchronlously resolve all DNS names in the address before proceeding
// with dialing on the underlying transport. // with dialing on the underlying transport.
Ok(async move { Ok(async move {
let resolver = self.resolver; let resolver = self.resolver;
let inner = self.inner; let inner = self.inner;
trace!("Resolving DNS: {}", addr); let mut last_err = None;
let mut dns_lookups = 0;
let mut dial_attempts = 0;
// We optimise for the common case of a single DNS component
// in the address that is resolved with a single lookup.
let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
unresolved.push(addr.clone());
let resolved = addr.into_iter() // Resolve (i.e. replace) all DNS protocol components, initiating
.map(|proto| resolve(proto, &resolver)) // dialing attempts as soon as there is another fully resolved
.collect::<FuturesOrdered<_>>() // address.
.collect::<Vec<Result<Protocol<'_>, Self::Error>>>() while let Some(addr) = unresolved.pop() {
.await if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| match p {
.into_iter() Protocol::Dns(_) |
.collect::<Result<Vec<Protocol<'_>>, Self::Error>>()? Protocol::Dns4(_) |
.into_iter() Protocol::Dns6(_) |
.collect::<Multiaddr>(); Protocol::Dnsaddr(_) => true,
_ => false
}) {
if dns_lookups == MAX_DNS_LOOKUPS {
log::debug!("Too many DNS lookups. Dropping unresolved {}.", addr);
last_err = Some(DnsErr::TooManyLookups);
// There may still be fully resolved addresses in `unresolved`,
// so keep going until `unresolved` is empty.
continue
}
dns_lookups += 1;
match resolve(&name, &resolver).await {
Err(e) => {
if unresolved.is_empty() {
return Err(e)
}
// If there are still unresolved addresses, there is
// a chance of success, but we track the last error.
last_err = Some(e);
}
Ok(Resolved::One(ip)) => {
log::trace!("Resolved {} -> {}", name, ip);
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
Ok(Resolved::Many(ips)) => {
for ip in ips {
log::trace!("Resolved {} -> {}", name, ip);
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
}
Ok(Resolved::Addrs(addrs)) => {
let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
let prefix = addr.iter().take(i).collect::<Multiaddr>();
let mut n = 0;
for a in addrs {
if a.ends_with(&suffix) {
if n < MAX_TXT_RECORDS {
n += 1;
log::trace!("Resolved {} -> {}", name, a);
let addr = prefix.iter().chain(a.iter()).collect::<Multiaddr>();
unresolved.push(addr);
} else {
log::debug!("Too many TXT records. Dropping resolved {}.", a);
}
}
}
}
}
} else {
// We have a fully resolved address, so try to dial it.
log::debug!("Dialing {}", addr);
debug!("DNS resolved: {} => {}", addr, resolved); let transport = inner.clone();
let result = match transport.dial(addr) {
Ok(out) => {
// We only count attempts that the inner transport
// actually accepted, i.e. for which it produced
// a dialing future.
dial_attempts += 1;
out.await.map_err(DnsErr::Transport)
}
Err(TransportError::MultiaddrNotSupported(a)) =>
Err(DnsErr::MultiaddrNotSupported(a)),
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err))
};
match inner.dial(resolved) { match result {
Ok(out) => out.await.map_err(DnsErr::Transport), Ok(out) => return Ok(out),
Err(TransportError::MultiaddrNotSupported(a)) => Err(err) => {
Err(DnsErr::MultiaddrNotSupported(a)), log::debug!("Dial error: {:?}.", err);
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err)) if unresolved.is_empty() {
return Err(err)
}
if dial_attempts == MAX_DIAL_ATTEMPTS {
log::debug!("Aborting dialing after {} attempts.", MAX_DIAL_ATTEMPTS);
return Err(err)
}
last_err = Some(err);
}
}
}
} }
// At this point, if there was at least one failed dialing
// attempt, return that error. Otherwise there were no valid DNS records
// for the given address to begin with (i.e. DNS lookups succeeded but
// produced no records relevant for the given `addr`).
Err(last_err.unwrap_or_else(||
DnsErr::ResolveError(
ResolveErrorKind::Message("No matching records found.").into())))
}.boxed().right_future()) }.boxed().right_future())
} }
@ -226,6 +321,13 @@ pub enum DnsErr<TErr> {
ResolveError(ResolveError), ResolveError(ResolveError),
/// DNS resolution was successful, but the underlying transport refused the resolved address. /// DNS resolution was successful, but the underlying transport refused the resolved address.
MultiaddrNotSupported(Multiaddr), MultiaddrNotSupported(Multiaddr),
/// DNS resolution involved too many lookups.
///
/// DNS resolution on dialing performs up to 32 DNS lookups. If these
/// are not sufficient to obtain a fully-resolved address, this error
/// is returned and the DNS records for the domain(s) being dialed
/// should be investigated.
TooManyLookups,
} }
impl<TErr> fmt::Display for DnsErr<TErr> impl<TErr> fmt::Display for DnsErr<TErr>
@ -236,6 +338,7 @@ where TErr: fmt::Display
DnsErr::Transport(err) => write!(f, "{}", err), DnsErr::Transport(err) => write!(f, "{}", err),
DnsErr::ResolveError(err) => write!(f, "{}", err), DnsErr::ResolveError(err) => write!(f, "{}", err),
DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a), DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a),
DnsErr::TooManyLookups => write!(f, "Too many DNS lookups"),
} }
} }
} }
@ -248,14 +351,33 @@ where TErr: error::Error + 'static
DnsErr::Transport(err) => Some(err), DnsErr::Transport(err) => Some(err),
DnsErr::ResolveError(err) => Some(err), DnsErr::ResolveError(err) => Some(err),
DnsErr::MultiaddrNotSupported(_) => None, DnsErr::MultiaddrNotSupported(_) => None,
DnsErr::TooManyLookups => None,
} }
} }
} }
/// Asynchronously resolves the domain name of a `Dns`, `Dns4` or `Dns6` protocol /// The successful outcome of [`resolve`] for a given [`Protocol`].
/// component. If the given protocol is not a DNS component, it is returned unchanged. enum Resolved<'a> {
fn resolve<'a, E: 'a, C, P>(proto: Protocol<'a>, resolver: &'a AsyncResolver<C,P>) /// The given `Protocol` has been resolved to a single `Protocol`,
-> impl Future<Output = Result<Protocol<'a>, DnsErr<E>>> + 'a /// which may be identical to the one given, in case it is not
/// a DNS protocol component.
One(Protocol<'a>),
/// The given `Protocol` has been resolved to multiple alternative
/// `Protocol`s as a result of a DNS lookup.
Many(Vec<Protocol<'a>>),
/// The given `Protocol` has been resolved to a new list of `Multiaddr`s
/// obtained from DNS TXT records representing possible alternatives.
/// These addresses may contain further DNS names that need resolving.
Addrs(Vec<Multiaddr>),
}
/// Asynchronously resolves the domain name of a `Dns`, `Dns4`, `Dns6` or `Dnsaddr` protocol
/// component. If the given protocol is of a different type, it is returned unchanged as a
/// [`Resolved::One`].
fn resolve<'a, E: 'a + Send, C, P>(
proto: &Protocol<'a>,
resolver: &'a AsyncResolver<C,P>,
) -> BoxFuture<'a, Result<Resolved<'a>, DnsErr<E>>>
where where
C: DnsHandle<Error = ResolveError>, C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>, P: ConnectionProvider<Conn = C>,
@ -263,39 +385,105 @@ where
match proto { match proto {
Protocol::Dns(ref name) => { Protocol::Dns(ref name) => {
resolver.lookup_ip(fqdn(name)).map(move |res| match res { resolver.lookup_ip(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter() Ok(ips) => {
.next() let mut ips = ips.into_iter();
.map(Protocol::from) let one = ips.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")), .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
Err(e) => return Err(DnsErr::ResolveError(e)) if let Some(two) = ips.next() {
}).left_future() Ok(Resolved::Many(
iter::once(one).chain(iter::once(two))
.chain(ips)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(one)))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
} }
Protocol::Dns4(ref name) => { Protocol::Dns4(ref name) => {
resolver.ipv4_lookup(fqdn(name)).map(move |res| match res { resolver.ipv4_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter() Ok(ips) => {
.map(IpAddr::from) let mut ips = ips.into_iter();
.next() let one = ips.next()
.map(Protocol::from) .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")), if let Some(two) = ips.next() {
Err(e) => return Err(DnsErr::ResolveError(e)) Ok(Resolved::Many(
}).left_future().left_future().right_future() iter::once(one).chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
} }
Protocol::Dns6(ref name) => { Protocol::Dns6(ref name) => {
resolver.ipv6_lookup(fqdn(name)).map(move |res| match res { resolver.ipv6_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter() Ok(ips) => {
.map(IpAddr::from) let mut ips = ips.into_iter();
.next() let one = ips.next()
.map(Protocol::from) .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")), if let Some(two) = ips.next() {
Err(e) => return Err(DnsErr::ResolveError(e)) Ok(Resolved::Many(
}).right_future().left_future().right_future() iter::once(one).chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}, },
proto => future::ready(Ok(proto)).right_future().right_future() Protocol::Dnsaddr(ref name) => {
let name = Cow::Owned([DNSADDR_PREFIX, name].concat());
resolver.txt_lookup(fqdn(&name)).map(move |res| match res {
Ok(txts) => {
let mut addrs = Vec::new();
for txt in txts {
if let Some(chars) = txt.txt_data().first() {
match parse_dnsaddr_txt(chars) {
Err(e) => {
// Skip over seemingly invalid entries.
log::debug!("Invalid TXT record: {:?}", e);
}
Ok(a) => {
addrs.push(a);
}
}
}
}
Ok(Resolved::Addrs(addrs))
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}
proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed()
} }
} }
fn fqdn(name: &std::borrow::Cow<'_, str>) -> String { /// Parses a `<character-string>` of a `dnsaddr` TXT record.
if name.ends_with(".") { fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
let s = str::from_utf8(txt).map_err(invalid_data)?;
match s.strip_prefix("dnsaddr=") {
None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?)
}
}
fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, e)
}
fn fqdn(name: &Cow<'_, str>) -> String {
if name.ends_with('.') {
name.to_string() name.to_string()
} else { } else {
format!("{}.", name) format!("{}.", name)
@ -308,6 +496,7 @@ mod tests {
use futures::{future::BoxFuture, stream::BoxStream}; use futures::{future::BoxFuture, stream::BoxStream};
use libp2p_core::{ use libp2p_core::{
Transport, Transport,
PeerId,
multiaddr::{Protocol, Multiaddr}, multiaddr::{Protocol, Multiaddr},
transport::ListenerEvent, transport::ListenerEvent,
transport::TransportError, transport::TransportError,
@ -332,17 +521,12 @@ mod tests {
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = addr.iter().collect::<Vec<_>>(); // Check that all DNS components have been resolved, i.e. replaced.
assert_eq!(addr.len(), 2); assert!(!addr.iter().any(|p| match p {
match addr[1] { Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
Protocol::Tcp(_) => (), => true,
_ => panic!(), _ => false,
}; }));
match addr[0] {
Protocol::Ip4(_) => (),
Protocol::Ip6(_) => (),
_ => panic!(),
};
Ok(Box::pin(future::ready(Ok(())))) Ok(Box::pin(future::ready(Ok(()))))
} }
@ -383,6 +567,37 @@ mod tests {
.await .await
.unwrap(); .unwrap();
// Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io.
let _ = transport
.clone()
.dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap())
.unwrap()
.await
.unwrap();
// Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io having
// an entry with suffix `/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN`,
// i.e. a bootnode with such a peer ID.
let _ = transport
.clone()
.dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap())
.unwrap()
.await
.unwrap();
// Failure due to the DNS TXT records at _dnsaddr.libp2p.io not having
// an entry with a random `p2p` suffix.
match transport
.clone()
.dial(format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random()).parse().unwrap())
.unwrap()
.await
{
Err(DnsErr::ResolveError(_)) => {},
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => panic!("Unexpected success.")
}
// Failure due to no records. // Failure due to no records.
match transport match transport
.clone() .clone()
@ -401,19 +616,27 @@ mod tests {
#[cfg(feature = "async-std")] #[cfg(feature = "async-std")]
{ {
// Be explicit about the resolver used. At least on github CI, TXT
// type record lookups may not work with the system DNS resolver.
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
async_std_crate::task::block_on( async_std_crate::task::block_on(
DnsConfig::system(CustomTransport).then(|dns| run(dns.unwrap())) DnsConfig::custom(CustomTransport, config, opts).then(|dns| run(dns.unwrap()))
); );
} }
#[cfg(feature = "tokio")] #[cfg(feature = "tokio")]
{ {
// Be explicit about the resolver used. At least on github CI, TXT
// type record lookups may not work with the system DNS resolver.
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
let rt = tokio_crate::runtime::Builder::new_current_thread() let rt = tokio_crate::runtime::Builder::new_current_thread()
.enable_io() .enable_io()
.enable_time() .enable_time()
.build() .build()
.unwrap(); .unwrap();
rt.block_on(run(TokioDnsConfig::system(CustomTransport).unwrap())); rt.block_on(run(TokioDnsConfig::custom(CustomTransport, config, opts).unwrap()));
} }
} }
} }

@ -1,3 +1,7 @@
# 0.30.0 [unreleased]
- Update `libp2p-core`.
# 0.29.0 [2021-01-12] # 0.29.0 [2021-01-12]
- Update dependencies. - Update dependencies.

@ -1,7 +1,7 @@
[package] [package]
name = "libp2p-noise" name = "libp2p-noise"
description = "Cryptographic handshake protocol using the noise framework." description = "Cryptographic handshake protocol using the noise framework."
version = "0.29.0" version = "0.30.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -12,7 +12,7 @@ bytes = "1"
curve25519-dalek = "3.0.0" curve25519-dalek = "3.0.0"
futures = "0.3.1" futures = "0.3.1"
lazy_static = "1.2" lazy_static = "1.2"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4" log = "0.4"
prost = "0.7" prost = "0.7"
rand = "0.7.2" rand = "0.7.2"

@ -1,3 +1,7 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
# 0.27.1 [2021-02-15] # 0.27.1 [2021-02-15]
- Update dependencies. - Update dependencies.

@ -2,7 +2,7 @@
name = "libp2p-plaintext" name = "libp2p-plaintext"
edition = "2018" edition = "2018"
description = "Plaintext encryption dummy protocol for libp2p" description = "Plaintext encryption dummy protocol for libp2p"
version = "0.27.1" version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
bytes = "1" bytes = "1"
futures = "0.3.1" futures = "0.3.1"
asynchronous-codec = "0.6" asynchronous-codec = "0.6"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.8" log = "0.4.8"
prost = "0.7" prost = "0.7"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }

@ -51,7 +51,7 @@ pub struct Remote {
} }
impl HandshakeContext<Local> { impl HandshakeContext<Local> {
fn new(config: PlainText2Config) -> Result<Self, PlainTextError> { fn new(config: PlainText2Config) -> Self {
let exchange = Exchange { let exchange = Exchange {
id: Some(config.local_public_key.clone().into_peer_id().to_bytes()), id: Some(config.local_public_key.clone().into_peer_id().to_bytes()),
pubkey: Some(config.local_public_key.clone().into_protobuf_encoding()) pubkey: Some(config.local_public_key.clone().into_protobuf_encoding())
@ -59,12 +59,12 @@ impl HandshakeContext<Local> {
let mut buf = Vec::with_capacity(exchange.encoded_len()); let mut buf = Vec::with_capacity(exchange.encoded_len());
exchange.encode(&mut buf).expect("Vec<u8> provides capacity as needed"); exchange.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
Ok(Self { Self {
config, config,
state: Local { state: Local {
exchange_bytes: buf exchange_bytes: buf
} }
}) }
} }
fn with_remote(self, exchange_bytes: BytesMut) fn with_remote(self, exchange_bytes: BytesMut)
@ -119,7 +119,7 @@ where
let mut framed_socket = Framed::new(socket, UviBytes::default()); let mut framed_socket = Framed::new(socket, UviBytes::default());
trace!("starting handshake"); trace!("starting handshake");
let context = HandshakeContext::new(config)?; let context = HandshakeContext::new(config);
trace!("sending exchange to remote"); trace!("sending exchange to remote");
framed_socket.send(BytesMut::from(&context.state.exchange_bytes[..])).await?; framed_socket.send(BytesMut::from(&context.state.exchange_bytes[..])).await?;

@ -1,4 +1,8 @@
# 0.27.2 [unreleased] # 0.28.0 [unreleased]
- Update `libp2p-core`.
- Permit `/p2p` addresses.
- Update to `if-watch-0.2`. - Update to `if-watch-0.2`.

@ -2,7 +2,7 @@
name = "libp2p-tcp" name = "libp2p-tcp"
edition = "2018" edition = "2018"
description = "TCP/IP transport protocol for libp2p" description = "TCP/IP transport protocol for libp2p"
version = "0.27.2" version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -17,7 +17,7 @@ if-watch = { version = "0.2.0", optional = true }
if-addrs = { version = "0.6.4", optional = true } if-addrs = { version = "0.6.4", optional = true }
ipnet = "2.0.0" ipnet = "2.0.0"
libc = "0.2.80" libc = "0.2.80"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.11" log = "0.4.11"
socket2 = { version = "0.4.0", features = ["all"] } socket2 = { version = "0.4.0", features = ["all"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net"], optional = true } tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net"], optional = true }

@ -379,7 +379,7 @@ where
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>; type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> { fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) { let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) {
sa sa
} else { } else {
return Err(TransportError::MultiaddrNotSupported(addr)); return Err(TransportError::MultiaddrNotSupported(addr));
@ -390,7 +390,7 @@ where
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
return Err(TransportError::MultiaddrNotSupported(addr)); return Err(TransportError::MultiaddrNotSupported(addr));
} }
@ -653,21 +653,34 @@ where
} }
} }
// This type of logic should probably be moved into the multiaddr package /// Extracts a `SocketAddr` from a given `Multiaddr`.
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> { ///
let mut iter = addr.iter(); /// Fails if the given `Multiaddr` does not begin with an IP
let proto1 = iter.next().ok_or(())?; /// protocol encapsulating a TCP port.
let proto2 = iter.next().ok_or(())?; fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
// "Pop" the IP address and TCP port from the end of the address,
if iter.next().is_some() { // ignoring a `/p2p/...` suffix as well as any prefix of possibly
return Err(()); // outer protocols, if present.
} let mut port = None;
while let Some(proto) = addr.pop() {
match (proto1, proto2) { match proto {
(Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), Protocol::Ip4(ipv4) => match port {
(Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
_ => Err(()), None => return Err(())
},
Protocol::Ip6(ipv6) => match port {
Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
None => return Err(())
},
Protocol::Tcp(portnum) => match port {
Some(_) => return Err(()),
None => { port = Some(portnum) }
}
Protocol::P2p(_) => {}
_ => return Err(())
}
} }
Err(())
} }
// Create a [`Multiaddr`] from the given IP address and port number. // Create a [`Multiaddr`] from the given IP address and port number.
@ -687,12 +700,12 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
assert!( assert!(
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap()) multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
.is_err() .is_err()
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()), multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new( Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
12345, 12345,
@ -700,7 +713,7 @@ mod tests {
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr( multiaddr_to_socketaddr(
&"/ip4/255.255.255.255/tcp/8080" "/ip4/255.255.255.255/tcp/8080"
.parse::<Multiaddr>() .parse::<Multiaddr>()
.unwrap() .unwrap()
), ),
@ -710,7 +723,7 @@ mod tests {
)) ))
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()), multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new( Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
12345, 12345,
@ -718,7 +731,7 @@ mod tests {
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr( multiaddr_to_socketaddr(
&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080" "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
.parse::<Multiaddr>() .parse::<Multiaddr>()
.unwrap() .unwrap()
), ),

@ -1,3 +1,9 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
- Permit `/p2p` addresses.
# 0.27.0 [2021-01-12] # 0.27.0 [2021-01-12]
- Update dependencies. - Update dependencies.

@ -2,7 +2,7 @@
name = "libp2p-uds" name = "libp2p-uds"
edition = "2018" edition = "2018"
description = "Unix domain sockets transport for libp2p" description = "Unix domain sockets transport for libp2p"
version = "0.27.0" version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies] [target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies]
async-std = { version = "1.6.2", optional = true } async-std = { version = "1.6.2", optional = true }
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.1" log = "0.4.1"
futures = "0.3.1" futures = "0.3.1"
tokio = { version = "1.0.1", default-features = false, features = ["net"], optional = true } tokio = { version = "1.0.1", default-features = false, features = ["net"], optional = true }

@ -140,23 +140,20 @@ codegen!(
/// paths. /// paths.
// This type of logic should probably be moved into the multiaddr package // This type of logic should probably be moved into the multiaddr package
fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> { fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> {
let mut iter = addr.iter(); let mut protocols = addr.iter();
let path = iter.next(); match protocols.next() {
Some(Protocol::Unix(ref path)) => {
if iter.next().is_some() { let path = PathBuf::from(path.as_ref());
return Err(()); if !path.is_absolute() {
return Err(())
}
match protocols.next() {
None | Some(Protocol::P2p(_)) => Ok(path),
Some(_) => Err(())
}
}
_ => Err(())
} }
let out: PathBuf = match path {
Some(Protocol::Unix(ref path)) => path.as_ref().into(),
_ => return Err(())
};
if !out.is_absolute() {
return Err(());
}
Ok(out)
} }
#[cfg(all(test, feature = "async-std"))] #[cfg(all(test, feature = "async-std"))]

@ -1,3 +1,7 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
# 0.27.0 [2021-01-12] # 0.27.0 [2021-01-12]
- Update dependencies. - Update dependencies.

@ -1,6 +1,6 @@
[package] [package]
name = "libp2p-wasm-ext" name = "libp2p-wasm-ext"
version = "0.27.0" version = "0.28.0"
authors = ["Pierre Krieger <pierre.krieger1708@gmail.com>"] authors = ["Pierre Krieger <pierre.krieger1708@gmail.com>"]
edition = "2018" edition = "2018"
description = "Allows passing in an external transport in a WASM environment" description = "Allows passing in an external transport in a WASM environment"
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
futures = "0.3.1" futures = "0.3.1"
js-sys = "0.3.19" js-sys = "0.3.19"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
parity-send-wrapper = "0.1.0" parity-send-wrapper = "0.1.0"
wasm-bindgen = "0.2.42" wasm-bindgen = "0.2.42"
wasm-bindgen-futures = "0.4.4" wasm-bindgen-futures = "0.4.4"

@ -1,3 +1,9 @@
# 0.29.0 [unreleased]
- Update `libp2p-core`.
- Permit dialing `/p2p` addresses.
# 0.28.0 [2021-01-12] # 0.28.0 [2021-01-12]
- Update dependencies. - Update dependencies.

@ -2,7 +2,7 @@
name = "libp2p-websocket" name = "libp2p-websocket"
edition = "2018" edition = "2018"
description = "WebSocket transport for libp2p" description = "WebSocket transport for libp2p"
version = "0.28.0" version = "0.29.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
futures-rustls = "0.21" futures-rustls = "0.21"
either = "1.5.3" either = "1.5.3"
futures = "0.3.1" futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" } libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.8" log = "0.4.8"
quicksink = "0.1" quicksink = "0.1"
rw-stream-sink = "0.2.0" rw-stream-sink = "0.2.0"

@ -231,13 +231,11 @@ where
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// Quick sanity check of the provided Multiaddr. let addr = match parse_ws_dial_addr(addr) {
if let Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) = addr.iter().last() { Ok(addr) => addr,
// ok Err(Error::InvalidMultiaddr(a)) => return Err(TransportError::MultiaddrNotSupported(a)),
} else { Err(e) => return Err(TransportError::Other(e)),
debug!("{} is not a websocket multiaddr", addr); };
return Err(TransportError::MultiaddrNotSupported(addr))
}
// We are looping here in order to follow redirects (if any): // We are looping here in order to follow redirects (if any):
let mut remaining_redirects = self.max_redirects; let mut remaining_redirects = self.max_redirects;
@ -248,11 +246,11 @@ where
match this.dial_once(addr).await { match this.dial_once(addr).await {
Ok(Either::Left(redirect)) => { Ok(Either::Left(redirect)) => {
if remaining_redirects == 0 { if remaining_redirects == 0 {
debug!("too many redirects"); debug!("Too many redirects (> {})", self.max_redirects);
return Err(Error::TooManyRedirects) return Err(Error::TooManyRedirects)
} }
remaining_redirects -= 1; remaining_redirects -= 1;
addr = location_to_multiaddr(&redirect)? addr = parse_ws_dial_addr(location_to_multiaddr(&redirect)?)?
} }
Ok(Either::Right(conn)) => return Ok(conn), Ok(Either::Right(conn)) => return Ok(conn),
Err(e) => return Err(e) Err(e) => return Err(e)
@ -273,46 +271,26 @@ where
T: Transport, T: Transport,
T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static
{ {
/// Attempty to dial the given address and perform a websocket handshake. /// Attempts to dial the given address and perform a websocket handshake.
async fn dial_once(self, address: Multiaddr) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> { async fn dial_once(self, addr: WsAddress) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> {
trace!("dial address: {}", address); trace!("Dialing websocket address: {:?}", addr);
let (host_port, dns_name) = host_and_dnsname(&address)?; let dial = self.transport.dial(addr.tcp_addr)
let mut inner_addr = address.clone();
let (use_tls, path) =
match inner_addr.pop() {
Some(Protocol::Ws(path)) => (false, path),
Some(Protocol::Wss(path)) => {
if dns_name.is_none() {
debug!("no DNS name in {}", address);
return Err(Error::InvalidMultiaddr(address))
}
(true, path)
}
_ => {
debug!("{} is not a websocket multiaddr", address);
return Err(Error::InvalidMultiaddr(address))
}
};
let dial = self.transport.dial(inner_addr)
.map_err(|e| match e { .map_err(|e| match e {
TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a), TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a),
TransportError::Other(e) => Error::Transport(e) TransportError::Other(e) => Error::Transport(e)
})?; })?;
let stream = dial.map_err(Error::Transport).await?; let stream = dial.map_err(Error::Transport).await?;
trace!("connected to {}", address); trace!("TCP connection to {} established.", addr.host_port);
let stream = let stream =
if use_tls { // begin TLS session if addr.use_tls { // begin TLS session
let dns_name = dns_name.expect("for use_tls we have checked that dns_name is some"); let dns_name = addr.dns_name.expect("for use_tls we have checked that dns_name is some");
trace!("starting TLS handshake with {}", address); trace!("Starting TLS handshake with {:?}", dns_name);
let stream = self.tls_config.client.connect(dns_name.as_ref(), stream) let stream = self.tls_config.client.connect(dns_name.as_ref(), stream)
.map_err(|e| { .map_err(|e| {
debug!("TLS handshake with {} failed: {}", address, e); debug!("TLS handshake with {:?} failed: {}", dns_name, e);
Error::Tls(tls::Error::from(e)) Error::Tls(tls::Error::from(e))
}) })
.await?; .await?;
@ -323,9 +301,9 @@ where
EitherOutput::Second(stream) EitherOutput::Second(stream)
}; };
trace!("sending websocket handshake request to {}", address); trace!("Sending websocket handshake to {}", addr.host_port);
let mut client = handshake::Client::new(stream, &host_port, path.as_ref()); let mut client = handshake::Client::new(stream, &addr.host_port, addr.path.as_ref());
if self.use_deflate { if self.use_deflate {
client.add_extension(Box::new(Deflate::new(connection::Mode::Client))); client.add_extension(Box::new(Deflate::new(connection::Mode::Client)));
@ -341,32 +319,87 @@ where
Err(Error::Handshake(msg.into())) Err(Error::Handshake(msg.into()))
} }
handshake::ServerResponse::Accepted { .. } => { handshake::ServerResponse::Accepted { .. } => {
trace!("websocket handshake with {} successful", address); trace!("websocket handshake with {} successful", addr.host_port);
Ok(Either::Right(Connection::new(client.into_builder()))) Ok(Either::Right(Connection::new(client.into_builder())))
} }
} }
} }
} }
// Extract host, port and optionally the DNS name from the given [`Multiaddr`]. #[derive(Debug)]
fn host_and_dnsname<T>(addr: &Multiaddr) -> Result<(String, Option<webpki::DNSName>), Error<T>> { struct WsAddress {
let mut iter = addr.iter(); host_port: String,
match (iter.next(), iter.next()) { path: String,
(Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) => dns_name: Option<webpki::DNSName>,
Ok((format!("{}:{}", ip, port), None)), use_tls: bool,
(Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) => tcp_addr: Multiaddr,
Ok((format!("{}:{}", ip, port), None)), }
(Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), /// Tries to parse the given `Multiaddr` into a `WsAddress` used
(Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) => /// for dialing.
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), ///
(Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) => /// Fails if the given `Multiaddr` does not represent a TCP/IP-based
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), /// websocket protocol stack.
_ => { fn parse_ws_dial_addr<T>(addr: Multiaddr) -> Result<WsAddress, Error<T>> {
debug!("multi-address format not supported: {}", addr); // The encapsulating protocol must be based on TCP/IP, possibly via DNS.
Err(Error::InvalidMultiaddr(addr.clone())) // We peek at it in order to learn the hostname and port to use for
// the websocket handshake.
let mut protocols = addr.iter();
let mut ip = protocols.next();
let mut tcp = protocols.next();
let (host_port, dns_name) = loop {
match (ip, tcp) {
(Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)))
=> break (format!("{}:{}", ip, port), None),
(Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)))
=> break (format!("{}:{}", ip, port), None),
(Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) |
(Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) |
(Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) |
(Some(Protocol::Dnsaddr(h)), Some(Protocol::Tcp(port)))
=> break (format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned())),
(Some(_), Some(p)) => {
ip = Some(p);
tcp = protocols.next();
}
_ => return Err(Error::InvalidMultiaddr(addr))
} }
} };
// Now consume the `Ws` / `Wss` protocol from the end of the address,
// preserving the trailing `P2p` protocol that identifies the remote,
// if any.
let mut protocols = addr.clone();
let mut p2p = None;
let (use_tls, path) = loop {
match protocols.pop() {
p@Some(Protocol::P2p(_)) => { p2p = p }
Some(Protocol::Ws(path)) => break (false, path.into_owned()),
Some(Protocol::Wss(path)) => {
if dns_name.is_none() {
debug!("Missing DNS name in WSS address: {}", addr);
return Err(Error::InvalidMultiaddr(addr))
}
break (true, path.into_owned())
}
_ => return Err(Error::InvalidMultiaddr(addr))
}
};
// The original address, stripped of the `/ws` and `/wss` protocols,
// makes up the the address for the inner TCP-based transport.
let tcp_addr = match p2p {
Some(p) => protocols.with(p),
None => protocols
};
Ok(WsAddress {
host_port,
dns_name,
path,
use_tls,
tcp_addr,
})
} }
// Given a location URL, build a new websocket [`Multiaddr`]. // Given a location URL, build a new websocket [`Multiaddr`].

@ -44,6 +44,13 @@ pub struct WsConfig<T> {
impl<T> WsConfig<T> { impl<T> WsConfig<T> {
/// Create a new websocket transport based on the given transport. /// Create a new websocket transport based on the given transport.
///
/// > **Note*: The given transport must be based on TCP/IP and should
/// > usually incorporate DNS resolution, though the latter is not
/// > strictly necessary if one wishes to only use the `Ws` protocol
/// > with known IP addresses and ports. See [`libp2p-tcp`](https://docs.rs/libp2p-tcp/)
/// > and [`libp2p-dns`](https://docs.rs/libp2p-dns) for constructing
/// > the inner transport.
pub fn new(transport: T) -> Self { pub fn new(transport: T) -> Self {
framed::WsConfig::new(transport).into() framed::WsConfig::new(transport).into()
} }
@ -187,10 +194,9 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use libp2p_core::Multiaddr; use libp2p_core::{Multiaddr, PeerId, Transport, multiaddr::Protocol};
use libp2p_tcp as tcp; use libp2p_tcp as tcp;
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::{Transport, multiaddr::Protocol};
use super::WsConfig; use super::WsConfig;
#[test] #[test]
@ -230,7 +236,7 @@ mod tests {
conn.await conn.await
}; };
let outbound = ws_config.dial(addr).unwrap(); let outbound = ws_config.dial(addr.with(Protocol::P2p(PeerId::random().into()))).unwrap();
let (a, b) = futures::join!(inbound, outbound); let (a, b) = futures::join!(inbound, outbound);
a.and(b).unwrap(); a.and(b).unwrap();