mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 18:41:22 +00:00
[libp2p-dns] Implement /dnsaddr
resolution. (#1931)
* Implement `/dnsaddr` support on `libp2p-dns`. To that end, since resolving `/dnsaddr` addresses needs "fully qualified" multiaddresses when dialing, i.e. those that end with the `/p2p/...` protocol, we make sure that dialing always uses such fully qualified addresses by appending the `/p2p` protocol as necessary. As a side-effect, this adds support for dialing peers via "fully qualified" addresses, as an alternative to using a `PeerId` together with a `Multiaddr` with or without the `/p2p` protocol. * Adapt libp2p-relay. * Update versions, changelogs and small cleanups.
This commit is contained in:
22
Cargo.toml
22
Cargo.toml
@ -64,35 +64,35 @@ atomic = "0.5.0"
|
||||
bytes = "1"
|
||||
futures = "0.3.1"
|
||||
lazy_static = "1.2"
|
||||
libp2p-core = { version = "0.27.2", path = "core", default-features = false }
|
||||
libp2p-core = { version = "0.28.0", path = "core", default-features = false }
|
||||
libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true }
|
||||
libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true }
|
||||
libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true }
|
||||
libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true }
|
||||
libp2p-mplex = { version = "0.27.2", path = "muxers/mplex", optional = true }
|
||||
libp2p-noise = { version = "0.29.0", path = "transports/noise", optional = true }
|
||||
libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true }
|
||||
libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true }
|
||||
libp2p-ping = { version = "0.28.0", path = "protocols/ping", optional = true }
|
||||
libp2p-plaintext = { version = "0.27.1", path = "transports/plaintext", optional = true }
|
||||
libp2p-plaintext = { version = "0.28.0", path = "transports/plaintext", optional = true }
|
||||
libp2p-pnet = { version = "0.20.0", path = "transports/pnet", optional = true }
|
||||
libp2p-relay = { version = "0.1.0", path = "protocols/relay", optional = true }
|
||||
libp2p-request-response = { version = "0.10.0", path = "protocols/request-response", optional = true }
|
||||
libp2p-swarm = { version = "0.28.0", path = "swarm" }
|
||||
libp2p-swarm-derive = { version = "0.22.0", path = "swarm-derive" }
|
||||
libp2p-uds = { version = "0.27.0", path = "transports/uds", optional = true }
|
||||
libp2p-wasm-ext = { version = "0.27.0", path = "transports/wasm-ext", default-features = false, optional = true }
|
||||
libp2p-yamux = { version = "0.30.1", path = "muxers/yamux", optional = true }
|
||||
multiaddr = { package = "parity-multiaddr", version = "0.11.1", path = "misc/multiaddr" }
|
||||
libp2p-uds = { version = "0.28.0", path = "transports/uds", optional = true }
|
||||
libp2p-wasm-ext = { version = "0.28.0", path = "transports/wasm-ext", default-features = false, optional = true }
|
||||
libp2p-yamux = { version = "0.31.0", path = "muxers/yamux", optional = true }
|
||||
multiaddr = { package = "parity-multiaddr", version = "0.11.2", path = "misc/multiaddr" }
|
||||
parking_lot = "0.11.0"
|
||||
pin-project = "1.0.0"
|
||||
smallvec = "1.6.1"
|
||||
wasm-timer = "0.2.4"
|
||||
|
||||
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
|
||||
libp2p-deflate = { version = "0.27.1", path = "transports/deflate", optional = true }
|
||||
libp2p-deflate = { version = "0.28.0", path = "transports/deflate", optional = true }
|
||||
libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false }
|
||||
libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true }
|
||||
libp2p-tcp = { version = "0.27.1", path = "transports/tcp", default-features = false, optional = true }
|
||||
libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true }
|
||||
libp2p-tcp = { version = "0.28.0", path = "transports/tcp", default-features = false, optional = true }
|
||||
libp2p-websocket = { version = "0.29.0", path = "transports/websocket", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = { version = "1.6.2", features = ["attributes"] }
|
||||
|
@ -1,4 +1,12 @@
|
||||
# 0.27.2 [unreleased]
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- `Network::dial()` understands `/p2p` addresses and `Transport::dial`
|
||||
gets a "fully qualified" `/p2p` address when dialing a specific peer,
|
||||
whether through the `Network::peer()` API or via `Network::dial()`
|
||||
with a `/p2p` address.
|
||||
|
||||
- `Network::dial()` and `network::Peer::dial()` return a `DialError`
|
||||
on error.
|
||||
|
||||
- Shorten and unify `Debug` impls of public keys.
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-core"
|
||||
edition = "2018"
|
||||
description = "Core traits and structs of libp2p"
|
||||
version = "0.27.2"
|
||||
version = "0.28.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
|
@ -554,7 +554,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
|
||||
|
||||
/// Returns an iterator over all connected peers, i.e. those that have
|
||||
/// at least one established connection in the pool.
|
||||
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
|
||||
pub fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.established.keys()
|
||||
}
|
||||
|
||||
|
@ -209,13 +209,14 @@ where
|
||||
&self.local_peer_id
|
||||
}
|
||||
|
||||
/// Dials a multiaddress without expecting a particular remote peer ID.
|
||||
/// Dials a [`Multiaddr`] that may or may not encapsulate a
|
||||
/// specific expected remote peer ID.
|
||||
///
|
||||
/// The given `handler` will be used to create the
|
||||
/// [`Connection`](crate::connection::Connection) upon success and the
|
||||
/// connection ID is returned.
|
||||
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
|
||||
-> Result<ConnectionId, ConnectionLimit>
|
||||
-> Result<ConnectionId, DialError>
|
||||
where
|
||||
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
||||
TTrans::Error: Send + 'static,
|
||||
@ -225,15 +226,32 @@ where
|
||||
TInEvent: Send + 'static,
|
||||
TOutEvent: Send + 'static,
|
||||
{
|
||||
// If the address ultimately encapsulates an expected peer ID, dial that peer
|
||||
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
|
||||
// from the address, because it may be used by the `Transport`, i.e. `P2p`
|
||||
// is a protocol component that can influence any transport, like `libp2p-dns`.
|
||||
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() {
|
||||
if let Ok(peer) = PeerId::try_from(ma) {
|
||||
return self.dial_peer(DialingOpts {
|
||||
peer,
|
||||
address: address.clone(),
|
||||
handler,
|
||||
remaining: Vec::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// The address does not specify an expected peer, so just try to dial it as-is,
|
||||
// accepting any peer ID that the remote identifies as.
|
||||
let info = OutgoingInfo { address, peer_id: None };
|
||||
match self.transport().clone().dial(address.clone()) {
|
||||
Ok(f) => {
|
||||
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
|
||||
self.pool.add_outgoing(f, handler, info)
|
||||
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
|
||||
}
|
||||
Err(err) => {
|
||||
let f = future::err(PendingConnectionError::Transport(err));
|
||||
self.pool.add_outgoing(f, handler, info)
|
||||
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -430,7 +448,7 @@ where
|
||||
|
||||
/// Initiates a connection attempt to a known peer.
|
||||
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
|
||||
-> Result<ConnectionId, ConnectionLimit>
|
||||
-> Result<ConnectionId, DialError>
|
||||
where
|
||||
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
||||
TTrans::Dial: Send + 'static,
|
||||
@ -460,7 +478,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
|
||||
<THandler::Handler as ConnectionHandler>::Error>,
|
||||
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
|
||||
opts: DialingOpts<PeerId, THandler>
|
||||
) -> Result<ConnectionId, ConnectionLimit>
|
||||
) -> Result<ConnectionId, DialError>
|
||||
where
|
||||
THandler: IntoConnectionHandler + Send + 'static,
|
||||
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
|
||||
@ -478,23 +496,28 @@ where
|
||||
TInEvent: Send + 'static,
|
||||
TOutEvent: Send + 'static,
|
||||
{
|
||||
let result = match transport.dial(opts.address.clone()) {
|
||||
// Ensure the address to dial encapsulates the `p2p` protocol for the
|
||||
// targeted peer, so that the transport has a "fully qualified" address
|
||||
// to work with.
|
||||
let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?;
|
||||
|
||||
let result = match transport.dial(addr.clone()) {
|
||||
Ok(fut) => {
|
||||
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
|
||||
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
|
||||
pool.add_outgoing(fut, opts.handler, info)
|
||||
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
|
||||
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
|
||||
},
|
||||
Err(err) => {
|
||||
let fut = future::err(PendingConnectionError::Transport(err));
|
||||
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
|
||||
pool.add_outgoing(fut, opts.handler, info)
|
||||
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
|
||||
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
|
||||
},
|
||||
};
|
||||
|
||||
if let Ok(id) = &result {
|
||||
dialing.entry(opts.peer).or_default().push(
|
||||
peer::DialingState {
|
||||
current: (*id, opts.address),
|
||||
current: (*id, addr),
|
||||
remaining: opts.remaining,
|
||||
},
|
||||
);
|
||||
@ -668,6 +691,37 @@ impl NetworkConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
|
||||
///
|
||||
/// If the given address is already a `p2p` address for the given peer,
|
||||
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
|
||||
///
|
||||
/// If the given address is already a `p2p` address for a different peer
|
||||
/// than the one given, the given `Multiaddr` is returned as an `Err`.
|
||||
///
|
||||
/// If the given address is not yet a `p2p` address for the given peer,
|
||||
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
|
||||
fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
|
||||
if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() {
|
||||
if &hash != peer.as_ref() {
|
||||
return Err(addr)
|
||||
}
|
||||
Ok(addr)
|
||||
} else {
|
||||
Ok(addr.with(multiaddr::Protocol::P2p(peer.into())))
|
||||
}
|
||||
}
|
||||
|
||||
/// Possible (synchronous) errors when dialing a peer.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum DialError {
|
||||
/// The dialing attempt is rejected because of a connection limit.
|
||||
ConnectionLimit(ConnectionLimit),
|
||||
/// The address being dialed is invalid, e.g. if it refers to a different
|
||||
/// remote peer than the one being dialed.
|
||||
InvalidAddress(Multiaddr),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -45,7 +45,7 @@ use std::{
|
||||
error,
|
||||
fmt,
|
||||
};
|
||||
use super::{Network, DialingOpts};
|
||||
use super::{Network, DialingOpts, DialError};
|
||||
|
||||
/// The possible representations of a peer in a [`Network`], as
|
||||
/// seen by the local node.
|
||||
@ -210,7 +210,7 @@ where
|
||||
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
|
||||
-> Result<
|
||||
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
|
||||
ConnectionLimit
|
||||
DialError
|
||||
>
|
||||
where
|
||||
I: IntoIterator<Item = Multiaddr>,
|
||||
@ -219,7 +219,9 @@ where
|
||||
Peer::Connected(p) => (p.peer_id, p.network),
|
||||
Peer::Dialing(p) => (p.peer_id, p.network),
|
||||
Peer::Disconnected(p) => (p.peer_id, p.network),
|
||||
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
|
||||
Peer::Local => return Err(DialError::ConnectionLimit(ConnectionLimit {
|
||||
current: 0, limit: 0
|
||||
}))
|
||||
};
|
||||
|
||||
let id = network.dial_peer(DialingOpts {
|
||||
|
@ -263,19 +263,14 @@ impl Drop for Listener {
|
||||
|
||||
/// If the address is `/memory/n`, returns the value of `n`.
|
||||
fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> {
|
||||
let mut iter = a.iter();
|
||||
|
||||
let port = if let Some(Protocol::Memory(port)) = iter.next() {
|
||||
port
|
||||
} else {
|
||||
return Err(());
|
||||
};
|
||||
|
||||
if iter.next().is_some() {
|
||||
return Err(());
|
||||
let mut protocols = a.iter();
|
||||
match protocols.next() {
|
||||
Some(Protocol::Memory(port)) => match protocols.next() {
|
||||
None | Some(Protocol::P2p(_)) => Ok(port),
|
||||
_ => Err(())
|
||||
}
|
||||
_ => Err(())
|
||||
}
|
||||
|
||||
Ok(port)
|
||||
}
|
||||
|
||||
/// A channel represents an established, in-memory, logical connection between two endpoints.
|
||||
|
@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr};
|
||||
use libp2p_core::{
|
||||
PeerId,
|
||||
connection::PendingConnectionError,
|
||||
network::{NetworkEvent, NetworkConfig, ConnectionLimits},
|
||||
network::{NetworkEvent, NetworkConfig, ConnectionLimits, DialError},
|
||||
};
|
||||
use rand::Rng;
|
||||
use std::task::Poll;
|
||||
@ -47,12 +47,16 @@ fn max_outgoing() {
|
||||
.expect("Unexpected connection limit.");
|
||||
}
|
||||
|
||||
let err = network.peer(target.clone())
|
||||
match network.peer(target.clone())
|
||||
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
|
||||
.expect_err("Unexpected dialing success.");
|
||||
|
||||
assert_eq!(err.current, outgoing_limit);
|
||||
assert_eq!(err.limit, outgoing_limit);
|
||||
.expect_err("Unexpected dialing success.")
|
||||
{
|
||||
DialError::ConnectionLimit(err) => {
|
||||
assert_eq!(err.current, outgoing_limit);
|
||||
assert_eq!(err.limit, outgoing_limit);
|
||||
}
|
||||
e => panic!("Unexpected error: {:?}", e),
|
||||
}
|
||||
|
||||
let info = network.info();
|
||||
assert_eq!(info.num_peers(), 0);
|
||||
|
@ -25,6 +25,7 @@ use libp2p_core::multiaddr::multiaddr;
|
||||
use libp2p_core::{
|
||||
PeerId,
|
||||
connection::PendingConnectionError,
|
||||
multiaddr::Protocol,
|
||||
network::{NetworkEvent, NetworkConfig},
|
||||
};
|
||||
use rand::seq::SliceRandom;
|
||||
@ -70,7 +71,7 @@ fn deny_incoming_connec() {
|
||||
error: PendingConnectionError::Transport(_)
|
||||
}) => {
|
||||
assert_eq!(&peer_id, swarm1.local_peer_id());
|
||||
assert_eq!(multiaddr, address);
|
||||
assert_eq!(multiaddr, address.clone().with(Protocol::P2p(peer_id.into())));
|
||||
return Poll::Ready(Ok(()));
|
||||
},
|
||||
Poll::Ready(_) => unreachable!(),
|
||||
@ -162,21 +163,27 @@ fn dial_self_by_id() {
|
||||
fn multiple_addresses_err() {
|
||||
// Tries dialing multiple addresses, and makes sure there's one dialing error per address.
|
||||
|
||||
let target = PeerId::random();
|
||||
|
||||
let mut swarm = test_network(NetworkConfig::default());
|
||||
|
||||
let mut addresses = Vec::new();
|
||||
for _ in 0 .. 3 {
|
||||
addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]);
|
||||
addresses.push(multiaddr![
|
||||
Ip4([0, 0, 0, 0]),
|
||||
Tcp(rand::random::<u16>())
|
||||
]);
|
||||
}
|
||||
for _ in 0 .. 5 {
|
||||
addresses.push(multiaddr![Udp(rand::random::<u16>())]);
|
||||
addresses.push(multiaddr![
|
||||
Udp(rand::random::<u16>())
|
||||
]);
|
||||
}
|
||||
addresses.shuffle(&mut rand::thread_rng());
|
||||
|
||||
let first = addresses[0].clone();
|
||||
let rest = (&addresses[1..]).iter().cloned();
|
||||
|
||||
let target = PeerId::random();
|
||||
swarm.peer(target.clone())
|
||||
.dial(first, rest, TestHandler())
|
||||
.unwrap();
|
||||
@ -191,7 +198,7 @@ fn multiple_addresses_err() {
|
||||
error: PendingConnectionError::Transport(_)
|
||||
}) => {
|
||||
assert_eq!(peer_id, target);
|
||||
let expected = addresses.remove(0);
|
||||
let expected = addresses.remove(0).with(Protocol::P2p(target.clone().into()));
|
||||
assert_eq!(multiaddr, expected);
|
||||
if addresses.is_empty() {
|
||||
assert_eq!(attempts_remaining, 0);
|
||||
|
@ -25,6 +25,7 @@
|
||||
|
||||
use async_std::task;
|
||||
use libp2p::{
|
||||
Multiaddr,
|
||||
Swarm,
|
||||
PeerId,
|
||||
identity,
|
||||
@ -38,7 +39,14 @@ use libp2p::kad::{
|
||||
QueryResult,
|
||||
};
|
||||
use libp2p::kad::record::store::MemoryStore;
|
||||
use std::{env, error::Error, time::Duration};
|
||||
use std::{env, error::Error, str::FromStr, time::Duration};
|
||||
|
||||
const BOOTNODES: [&'static str; 4] = [
|
||||
"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
|
||||
"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
|
||||
"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
|
||||
"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt"
|
||||
];
|
||||
|
||||
#[async_std::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
@ -59,28 +67,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let store = MemoryStore::new(local_peer_id.clone());
|
||||
let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg);
|
||||
|
||||
// TODO: the /dnsaddr/ scheme is not supported (https://github.com/libp2p/rust-libp2p/issues/967)
|
||||
/*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
|
||||
behaviour.add_address(&"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
|
||||
behaviour.add_address(&"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
|
||||
behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/
|
||||
// Add the bootnodes to the local routing table. `libp2p-dns` built
|
||||
// into the `transport` resolves the `dnsaddr` when Kademlia tries
|
||||
// to dial these nodes.
|
||||
let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io")?;
|
||||
for peer in &BOOTNODES {
|
||||
behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone());
|
||||
}
|
||||
|
||||
// The only address that currently works.
|
||||
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse()?, "/ip4/104.131.131.82/tcp/4001".parse()?);
|
||||
|
||||
// The following addresses always fail signature verification, possibly due to
|
||||
// RSA keys with < 2048 bits.
|
||||
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
|
||||
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
|
||||
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
|
||||
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
|
||||
|
||||
// The following addresses are permanently unreachable:
|
||||
// Other(Other(A(Transport(A(Underlying(Os { code: 101, kind: Other, message: "Network is unreachable" }))))))
|
||||
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
|
||||
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
|
||||
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
|
||||
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
|
||||
Swarm::new(transport, behaviour, local_peer_id)
|
||||
};
|
||||
|
||||
|
@ -1,3 +1,7 @@
|
||||
# 0.11.2 [unreleased]
|
||||
|
||||
- Add `Multiaddr::ends_with()`.
|
||||
|
||||
# 0.11.1 [2021-02-15]
|
||||
|
||||
- Update dependencies
|
||||
|
@ -6,7 +6,7 @@ description = "Implementation of the multiaddr format"
|
||||
homepage = "https://github.com/libp2p/rust-libp2p"
|
||||
keywords = ["multiaddr", "ipfs"]
|
||||
license = "MIT"
|
||||
version = "0.11.1"
|
||||
version = "0.11.2"
|
||||
|
||||
[features]
|
||||
default = ["url"]
|
||||
|
@ -174,6 +174,16 @@ impl Multiaddr {
|
||||
|
||||
if replaced { Some(address) } else { None }
|
||||
}
|
||||
|
||||
/// Checks whether the given `Multiaddr` is a suffix of this `Multiaddr`.
|
||||
pub fn ends_with(&self, other: &Multiaddr) -> bool {
|
||||
let n = self.bytes.len();
|
||||
let m = other.bytes.len();
|
||||
if n < m {
|
||||
return false
|
||||
}
|
||||
self.bytes[(n - m) ..] == other.bytes[..]
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Multiaddr {
|
||||
|
@ -56,6 +56,18 @@ fn push_pop_identity() {
|
||||
QuickCheck::new().quickcheck(prop as fn(Ma, Proto) -> bool)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ends_with() {
|
||||
fn prop(Ma(m): Ma) {
|
||||
let n = m.iter().count();
|
||||
for i in 0 .. n {
|
||||
let suffix = m.iter().skip(i).collect::<Multiaddr>();
|
||||
assert!(m.ends_with(&suffix));
|
||||
}
|
||||
}
|
||||
QuickCheck::new().quickcheck(prop as fn(_))
|
||||
}
|
||||
|
||||
|
||||
// Arbitrary impls
|
||||
|
||||
|
@ -260,7 +260,7 @@ where
|
||||
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
|
||||
*this.state = SeqState::SendProtocol { io, protocol }
|
||||
}
|
||||
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
|
||||
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
# 0.27.2 [unreleased]
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- Update dependencies.
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-mplex"
|
||||
edition = "2018"
|
||||
description = "Mplex multiplexing protocol for libp2p"
|
||||
version = "0.27.2"
|
||||
version = "0.28.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
bytes = "1"
|
||||
futures = "0.3.1"
|
||||
asynchronous-codec = "0.6"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
log = "0.4"
|
||||
nohash-hasher = "0.2"
|
||||
parking_lot = "0.11"
|
||||
|
@ -238,7 +238,7 @@ where
|
||||
num_buffered += 1;
|
||||
}
|
||||
Frame::Close { stream_id } => {
|
||||
self.on_close(stream_id.into_local())?;
|
||||
self.on_close(stream_id.into_local());
|
||||
}
|
||||
Frame::Reset { stream_id } => {
|
||||
self.on_reset(stream_id.into_local())
|
||||
@ -460,7 +460,7 @@ where
|
||||
}
|
||||
Frame::Close { stream_id } => {
|
||||
let stream_id = stream_id.into_local();
|
||||
self.on_close(stream_id)?;
|
||||
self.on_close(stream_id);
|
||||
if id == stream_id {
|
||||
return Poll::Ready(Ok(None))
|
||||
}
|
||||
@ -683,7 +683,7 @@ where
|
||||
}
|
||||
|
||||
/// Processes an inbound `Close` frame.
|
||||
fn on_close(&mut self, id: LocalStreamId) -> io::Result<()> {
|
||||
fn on_close(&mut self, id: LocalStreamId) {
|
||||
if let Some(state) = self.substreams.remove(&id) {
|
||||
match state {
|
||||
SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => {
|
||||
@ -715,8 +715,6 @@ where
|
||||
trace!("{}: Ignoring `Close` for unknown substream {}. Possibly dropped earlier.",
|
||||
self.id, id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generates the next outbound stream ID.
|
||||
|
@ -1,3 +1,7 @@
|
||||
# 0.31.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
# 0.30.1 [2021-02-17]
|
||||
|
||||
- Update `yamux` to `0.8.1`.
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-yamux"
|
||||
edition = "2018"
|
||||
description = "Yamux multiplexing protocol for libp2p"
|
||||
version = "0.30.1"
|
||||
version = "0.31.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
parking_lot = "0.11"
|
||||
thiserror = "1.0"
|
||||
yamux = "0.8.1"
|
||||
|
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
cuckoofilter = "0.5.0"
|
||||
fnv = "1.0"
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
|
||||
log = "0.4"
|
||||
prost = "0.7"
|
||||
|
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[dependencies]
|
||||
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
bytes = "1.0"
|
||||
byteorder = "1.3.4"
|
||||
fnv = "1.0.7"
|
||||
|
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
|
||||
log = "0.4.1"
|
||||
prost = "0.7"
|
||||
|
@ -17,7 +17,7 @@ fnv = "1.0"
|
||||
asynchronous-codec = "0.6"
|
||||
futures = "0.3.1"
|
||||
log = "0.4"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
|
||||
prost = "0.7"
|
||||
rand = "0.7.2"
|
||||
|
@ -1072,7 +1072,11 @@ fn manual_bucket_inserts() {
|
||||
let mut swarms = build_connected_nodes_with_config(3, 1, cfg);
|
||||
// The peers and their addresses for which we expect `RoutablePeer` events.
|
||||
let mut expected = swarms.iter().skip(2)
|
||||
.map(|(a, s)| (a.clone(), Swarm::local_peer_id(s).clone()))
|
||||
.map(|(a, s)| {
|
||||
let pid = *Swarm::local_peer_id(s);
|
||||
let addr = a.clone().with(Protocol::P2p(pid.into()));
|
||||
(addr, pid)
|
||||
})
|
||||
.collect::<HashMap<_,_>>();
|
||||
// We collect the peers for which a `RoutablePeer` event
|
||||
// was received in here to check at the end of the test
|
||||
@ -1087,7 +1091,7 @@ fn manual_bucket_inserts() {
|
||||
Poll::Ready(Some(KademliaEvent::RoutablePeer {
|
||||
peer, address
|
||||
})) => {
|
||||
assert_eq!(peer, expected.remove(&address).expect("Unexpected address"));
|
||||
assert_eq!(peer, expected.remove(&address).expect("Missing address"));
|
||||
routable.push(peer);
|
||||
if expected.is_empty() {
|
||||
for peer in routable.iter() {
|
||||
|
@ -16,7 +16,7 @@ dns-parser = "0.8.0"
|
||||
futures = "0.3.13"
|
||||
if-watch = "0.2.0"
|
||||
lazy_static = "1.4.0"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
|
||||
log = "0.4.14"
|
||||
rand = "0.8.3"
|
||||
|
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
|
||||
log = "0.4.1"
|
||||
rand = "0.7.2"
|
||||
|
@ -14,7 +14,7 @@ asynchronous-codec = "0.6"
|
||||
bytes = "1"
|
||||
futures = "0.3.1"
|
||||
futures-timer = "3"
|
||||
libp2p-core = { version = "0.27", path = "../../core" }
|
||||
libp2p-core = { version = "0.28", path = "../../core" }
|
||||
libp2p-swarm = { version = "0.28", path = "../../swarm" }
|
||||
log = "0.4"
|
||||
pin-project = "1"
|
||||
|
@ -401,14 +401,12 @@ impl<T: Transport> Stream for RelayListener<T> {
|
||||
Poll::Ready(Some(BehaviourToListenerMsg::IncomingRelayedConnection {
|
||||
stream,
|
||||
src_peer_id,
|
||||
relay_peer_id,
|
||||
relay_addr,
|
||||
relay_peer_id: _
|
||||
})) => {
|
||||
return Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
|
||||
upgrade: RelayedListenerUpgrade::Relayed(Some(stream)),
|
||||
local_addr: relay_addr
|
||||
.with(Protocol::P2p(relay_peer_id.into()))
|
||||
.with(Protocol::P2pCircuit),
|
||||
local_addr: relay_addr.with(Protocol::P2pCircuit),
|
||||
remote_addr: Protocol::P2p(src_peer_id.into()).into(),
|
||||
})));
|
||||
}
|
||||
|
@ -381,9 +381,10 @@ fn src_try_connect_to_offline_dst() {
|
||||
|
||||
loop {
|
||||
match src_swarm.next_event().await {
|
||||
SwarmEvent::UnknownPeerUnreachableAddr { address, .. }
|
||||
SwarmEvent::UnreachableAddr { address, peer_id, .. }
|
||||
if address == dst_addr_via_relay =>
|
||||
{
|
||||
assert_eq!(peer_id, dst_peer_id);
|
||||
break;
|
||||
}
|
||||
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
||||
@ -437,9 +438,10 @@ fn src_try_connect_to_unsupported_dst() {
|
||||
|
||||
loop {
|
||||
match src_swarm.next_event().await {
|
||||
SwarmEvent::UnknownPeerUnreachableAddr { address, .. }
|
||||
SwarmEvent::UnreachableAddr { address, peer_id, .. }
|
||||
if address == dst_addr_via_relay =>
|
||||
{
|
||||
assert_eq!(peer_id, dst_peer_id);
|
||||
break;
|
||||
}
|
||||
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {}
|
||||
@ -486,8 +488,10 @@ fn src_try_connect_to_offline_dst_via_offline_relay() {
|
||||
|
||||
// Source Node fail to reach Destination Node due to failure reaching Relay.
|
||||
match src_swarm.next_event().await {
|
||||
SwarmEvent::UnknownPeerUnreachableAddr { address, .. }
|
||||
if address == dst_addr_via_relay => {}
|
||||
SwarmEvent::UnreachableAddr { address, peer_id, .. }
|
||||
if address == dst_addr_via_relay => {
|
||||
assert_eq!(peer_id, dst_peer_id);
|
||||
}
|
||||
e => panic!("{:?}", e),
|
||||
}
|
||||
});
|
||||
@ -573,6 +577,9 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
||||
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
||||
..
|
||||
})) => {}
|
||||
e => panic!("{:?}", e),
|
||||
}
|
||||
}
|
||||
@ -1005,9 +1012,10 @@ fn yield_incoming_connection_through_correct_listener() {
|
||||
}
|
||||
|
||||
match src_3_swarm.next_event().boxed().poll_unpin(cx) {
|
||||
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, .. })
|
||||
Poll::Ready(SwarmEvent::UnreachableAddr { address, peer_id, .. })
|
||||
if address == dst_addr_via_relay_3 =>
|
||||
{
|
||||
assert_eq!(peer_id, dst_peer_id);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Poll::Ready(SwarmEvent::Dialing { .. }) => {}
|
||||
|
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
async-trait = "0.1"
|
||||
bytes = "1"
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
|
||||
log = "0.4.11"
|
||||
lru = "0.6"
|
||||
|
12
src/lib.rs
12
src/lib.rs
@ -283,9 +283,9 @@ pub async fn development_transport(keypair: identity::Keypair)
|
||||
{
|
||||
let transport = {
|
||||
let tcp = tcp::TcpConfig::new().nodelay(true);
|
||||
let transport = dns::DnsConfig::system(tcp).await?;
|
||||
let websockets = websocket::WsConfig::new(transport.clone());
|
||||
transport.or_transport(websockets)
|
||||
let dns_tcp = dns::DnsConfig::system(tcp).await?;
|
||||
let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone());
|
||||
dns_tcp.or_transport(ws_dns_tcp)
|
||||
};
|
||||
|
||||
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
|
||||
@ -318,9 +318,9 @@ pub fn tokio_development_transport(keypair: identity::Keypair)
|
||||
{
|
||||
let transport = {
|
||||
let tcp = tcp::TokioTcpConfig::new().nodelay(true);
|
||||
let transport = dns::TokioDnsConfig::system(tcp)?;
|
||||
let websockets = websocket::WsConfig::new(transport.clone());
|
||||
transport.or_transport(websockets)
|
||||
let dns_tcp = dns::TokioDnsConfig::system(tcp)?;
|
||||
let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone());
|
||||
dns_tcp.or_transport(ws_dns_tcp)
|
||||
};
|
||||
|
||||
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
|
||||
|
@ -1,5 +1,9 @@
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- New error variant `DialError::InvalidAddress`
|
||||
|
||||
- `Swarm::dial_addr()` now returns a `DialError` on error.
|
||||
|
||||
- Remove the option for a substream-specific multistream select protocol override.
|
||||
The override at this granularity is no longer deemed useful, in particular because
|
||||
it can usually not be configured for existing protocols like `libp2p-kad` and others.
|
||||
|
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
[dependencies]
|
||||
either = "1.6.0"
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../core" }
|
||||
log = "0.4"
|
||||
rand = "0.7"
|
||||
smallvec = "1.6.1"
|
||||
|
@ -113,6 +113,7 @@ use libp2p_core::{
|
||||
transport::{self, TransportError},
|
||||
muxing::StreamMuxerBox,
|
||||
network::{
|
||||
self,
|
||||
ConnectionLimits,
|
||||
Network,
|
||||
NetworkInfo,
|
||||
@ -359,11 +360,11 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
||||
}
|
||||
|
||||
/// Initiates a new dialing attempt to the given address.
|
||||
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
|
||||
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), DialError> {
|
||||
let handler = me.behaviour.new_handler()
|
||||
.into_node_handler_builder()
|
||||
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
|
||||
me.network.dial(&addr, handler).map(|_id| ())
|
||||
Ok(me.network.dial(&addr, handler).map(|_id| ())?)
|
||||
}
|
||||
|
||||
/// Initiates a new dialing attempt to the given peer.
|
||||
@ -386,7 +387,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
||||
me.network.peer(*peer_id)
|
||||
.dial(first, addrs, handler)
|
||||
.map(|_| ())
|
||||
.map_err(DialError::ConnectionLimit)
|
||||
.map_err(DialError::from)
|
||||
} else {
|
||||
Err(DialError::NoAddresses)
|
||||
};
|
||||
@ -1053,16 +1054,28 @@ pub enum DialError {
|
||||
/// The configured limit for simultaneous outgoing connections
|
||||
/// has been reached.
|
||||
ConnectionLimit(ConnectionLimit),
|
||||
/// The address given for dialing is invalid.
|
||||
InvalidAddress(Multiaddr),
|
||||
/// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
|
||||
/// for the peer to dial.
|
||||
NoAddresses
|
||||
}
|
||||
|
||||
impl From<network::DialError> for DialError {
|
||||
fn from(err: network::DialError) -> DialError {
|
||||
match err {
|
||||
network::DialError::ConnectionLimit(l) => DialError::ConnectionLimit(l),
|
||||
network::DialError::InvalidAddress(a) => DialError::InvalidAddress(a),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DialError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
|
||||
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
|
||||
DialError::InvalidAddress(a) => write!(f, "Dial error: invalid address: {}", a),
|
||||
DialError::Banned => write!(f, "Dial error: peer is banned.")
|
||||
}
|
||||
}
|
||||
@ -1072,6 +1085,7 @@ impl error::Error for DialError {
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
match self {
|
||||
DialError::ConnectionLimit(err) => Some(err),
|
||||
DialError::InvalidAddress(_) => None,
|
||||
DialError::NoAddresses => None,
|
||||
DialError::Banned => None
|
||||
}
|
||||
|
@ -1,3 +1,7 @@
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
# 0.27.1 [2021-01-27]
|
||||
|
||||
- Ensure read buffers are initialised.
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-deflate"
|
||||
edition = "2018"
|
||||
description = "Deflate encryption protocol for libp2p"
|
||||
version = "0.27.1"
|
||||
version = "0.28.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
flate2 = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -1,5 +1,9 @@
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
- Add support for resolving `/dnsaddr` addresses.
|
||||
|
||||
- Use `trust-dns-resolver`, removing the internal thread pool and
|
||||
expanding the configurability of `libp2p-dns` by largely exposing the
|
||||
configuration of `trust-dns-resolver`.
|
||||
|
@ -10,11 +10,12 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[dependencies]
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
log = "0.4.1"
|
||||
futures = "0.3.1"
|
||||
trust-dns-resolver = { version = "0.20", default-features = false, features = ["system-config"] }
|
||||
async-std-resolver = { version = "0.20", optional = true }
|
||||
smallvec = "1.6"
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.6"
|
||||
|
@ -24,10 +24,11 @@
|
||||
//! [`DnsConfig`] and `TokioDnsConfig` for use with `async-std` and `tokio`,
|
||||
//! respectively.
|
||||
//!
|
||||
//! A [`GenDnsConfig`] is a [`Transport`] wrapper that is created around
|
||||
//! A [`GenDnsConfig`] is an address-rewriting [`Transport`] wrapper around
|
||||
//! an inner `Transport`. The composed transport behaves like the inner
|
||||
//! transport, except that [`Transport::dial`] resolves `/dns`, `/dns4/` and
|
||||
//! `/dns6/` components of a given `Multiaddr` through a DNS.
|
||||
//! transport, except that [`Transport::dial`] resolves `/dns/...`, `/dns4/...`,
|
||||
//! `/dns6/...` and `/dnsaddr/...` components of the given `Multiaddr` through
|
||||
//! a DNS, replacing them with the resolved protocols (typically TCP/IP).
|
||||
//!
|
||||
//! The `async-std` feature and hence the `DnsConfig` are
|
||||
//! enabled by default. Tokio users can furthermore opt-in
|
||||
@ -37,14 +38,14 @@
|
||||
//!
|
||||
//![trust-dns-resolver]: https://docs.rs/trust-dns-resolver/latest/trust_dns_resolver/#dns-over-tls-and-dns-over-https
|
||||
|
||||
use futures::{prelude::*, future::BoxFuture, stream::FuturesOrdered};
|
||||
use futures::{prelude::*, future::BoxFuture};
|
||||
use libp2p_core::{
|
||||
Transport,
|
||||
multiaddr::{Protocol, Multiaddr},
|
||||
transport::{TransportError, ListenerEvent}
|
||||
};
|
||||
use log::{debug, trace};
|
||||
use std::{error, fmt, net::IpAddr};
|
||||
use smallvec::SmallVec;
|
||||
use std::{borrow::Cow, convert::TryFrom, error, fmt, iter, net::IpAddr, str};
|
||||
#[cfg(any(feature = "async-std", feature = "tokio"))]
|
||||
use std::io;
|
||||
#[cfg(any(feature = "async-std", feature = "tokio"))]
|
||||
@ -62,6 +63,24 @@ use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider};
|
||||
pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
||||
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
|
||||
|
||||
/// The prefix for `dnsaddr` protocol TXT record lookups.
|
||||
const DNSADDR_PREFIX: &'static str = "_dnsaddr.";
|
||||
|
||||
/// The maximum number of dialing attempts to resolved addresses.
|
||||
const MAX_DIAL_ATTEMPTS: usize = 16;
|
||||
|
||||
/// The maximum number of DNS lookups when dialing.
|
||||
///
|
||||
/// This limit is primarily a safeguard against too many, possibly
|
||||
/// even cyclic, indirections in the addresses obtained from the
|
||||
/// TXT records of a `/dnsaddr`.
|
||||
const MAX_DNS_LOOKUPS: usize = 32;
|
||||
|
||||
/// The maximum number of TXT records applicable for the address
|
||||
/// being dialed that are considered for further lookups as a
|
||||
/// result of a single `/dnsaddr` lookup.
|
||||
const MAX_TXT_RECORDS: usize = 16;
|
||||
|
||||
/// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses
|
||||
/// using `async-std` for all async I/O.
|
||||
#[cfg(feature = "async-std")]
|
||||
@ -137,7 +156,7 @@ where
|
||||
|
||||
impl<T, C, P> Transport for GenDnsConfig<T, C, P>
|
||||
where
|
||||
T: Transport + Send + 'static,
|
||||
T: Transport + Clone + Send + 'static,
|
||||
T::Error: Send,
|
||||
T::Dial: Send,
|
||||
C: DnsHandle<Error = ResolveError>,
|
||||
@ -171,44 +190,120 @@ where
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
// Check if there are any domain names in the address. If not, proceed
|
||||
// straight away with dialing on the underlying transport.
|
||||
if !addr.iter().any(|p| match p {
|
||||
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => true,
|
||||
_ => false
|
||||
}) {
|
||||
trace!("Pass-through address without DNS: {}", addr);
|
||||
let inner_dial = self.inner.dial(addr)
|
||||
.map_err(|err| err.map(DnsErr::Transport))?;
|
||||
return Ok(inner_dial.map_err::<_, fn(_) -> _>(DnsErr::Transport).left_future());
|
||||
}
|
||||
|
||||
// Asynchronlously resolve all DNS names in the address before proceeding
|
||||
// with dialing on the underlying transport.
|
||||
Ok(async move {
|
||||
let resolver = self.resolver;
|
||||
let inner = self.inner;
|
||||
|
||||
trace!("Resolving DNS: {}", addr);
|
||||
let mut last_err = None;
|
||||
let mut dns_lookups = 0;
|
||||
let mut dial_attempts = 0;
|
||||
// We optimise for the common case of a single DNS component
|
||||
// in the address that is resolved with a single lookup.
|
||||
let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
|
||||
unresolved.push(addr.clone());
|
||||
|
||||
let resolved = addr.into_iter()
|
||||
.map(|proto| resolve(proto, &resolver))
|
||||
.collect::<FuturesOrdered<_>>()
|
||||
.collect::<Vec<Result<Protocol<'_>, Self::Error>>>()
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<Protocol<'_>>, Self::Error>>()?
|
||||
.into_iter()
|
||||
.collect::<Multiaddr>();
|
||||
// Resolve (i.e. replace) all DNS protocol components, initiating
|
||||
// dialing attempts as soon as there is another fully resolved
|
||||
// address.
|
||||
while let Some(addr) = unresolved.pop() {
|
||||
if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| match p {
|
||||
Protocol::Dns(_) |
|
||||
Protocol::Dns4(_) |
|
||||
Protocol::Dns6(_) |
|
||||
Protocol::Dnsaddr(_) => true,
|
||||
_ => false
|
||||
}) {
|
||||
if dns_lookups == MAX_DNS_LOOKUPS {
|
||||
log::debug!("Too many DNS lookups. Dropping unresolved {}.", addr);
|
||||
last_err = Some(DnsErr::TooManyLookups);
|
||||
// There may still be fully resolved addresses in `unresolved`,
|
||||
// so keep going until `unresolved` is empty.
|
||||
continue
|
||||
}
|
||||
dns_lookups += 1;
|
||||
match resolve(&name, &resolver).await {
|
||||
Err(e) => {
|
||||
if unresolved.is_empty() {
|
||||
return Err(e)
|
||||
}
|
||||
// If there are still unresolved addresses, there is
|
||||
// a chance of success, but we track the last error.
|
||||
last_err = Some(e);
|
||||
}
|
||||
Ok(Resolved::One(ip)) => {
|
||||
log::trace!("Resolved {} -> {}", name, ip);
|
||||
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
|
||||
unresolved.push(addr);
|
||||
}
|
||||
Ok(Resolved::Many(ips)) => {
|
||||
for ip in ips {
|
||||
log::trace!("Resolved {} -> {}", name, ip);
|
||||
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
|
||||
unresolved.push(addr);
|
||||
}
|
||||
}
|
||||
Ok(Resolved::Addrs(addrs)) => {
|
||||
let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
|
||||
let prefix = addr.iter().take(i).collect::<Multiaddr>();
|
||||
let mut n = 0;
|
||||
for a in addrs {
|
||||
if a.ends_with(&suffix) {
|
||||
if n < MAX_TXT_RECORDS {
|
||||
n += 1;
|
||||
log::trace!("Resolved {} -> {}", name, a);
|
||||
let addr = prefix.iter().chain(a.iter()).collect::<Multiaddr>();
|
||||
unresolved.push(addr);
|
||||
} else {
|
||||
log::debug!("Too many TXT records. Dropping resolved {}.", a);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// We have a fully resolved address, so try to dial it.
|
||||
log::debug!("Dialing {}", addr);
|
||||
|
||||
debug!("DNS resolved: {} => {}", addr, resolved);
|
||||
let transport = inner.clone();
|
||||
let result = match transport.dial(addr) {
|
||||
Ok(out) => {
|
||||
// We only count attempts that the inner transport
|
||||
// actually accepted, i.e. for which it produced
|
||||
// a dialing future.
|
||||
dial_attempts += 1;
|
||||
out.await.map_err(DnsErr::Transport)
|
||||
}
|
||||
Err(TransportError::MultiaddrNotSupported(a)) =>
|
||||
Err(DnsErr::MultiaddrNotSupported(a)),
|
||||
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err))
|
||||
};
|
||||
|
||||
match inner.dial(resolved) {
|
||||
Ok(out) => out.await.map_err(DnsErr::Transport),
|
||||
Err(TransportError::MultiaddrNotSupported(a)) =>
|
||||
Err(DnsErr::MultiaddrNotSupported(a)),
|
||||
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err))
|
||||
match result {
|
||||
Ok(out) => return Ok(out),
|
||||
Err(err) => {
|
||||
log::debug!("Dial error: {:?}.", err);
|
||||
if unresolved.is_empty() {
|
||||
return Err(err)
|
||||
}
|
||||
if dial_attempts == MAX_DIAL_ATTEMPTS {
|
||||
log::debug!("Aborting dialing after {} attempts.", MAX_DIAL_ATTEMPTS);
|
||||
return Err(err)
|
||||
}
|
||||
last_err = Some(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, if there was at least one failed dialing
|
||||
// attempt, return that error. Otherwise there were no valid DNS records
|
||||
// for the given address to begin with (i.e. DNS lookups succeeded but
|
||||
// produced no records relevant for the given `addr`).
|
||||
Err(last_err.unwrap_or_else(||
|
||||
DnsErr::ResolveError(
|
||||
ResolveErrorKind::Message("No matching records found.").into())))
|
||||
}.boxed().right_future())
|
||||
}
|
||||
|
||||
@ -226,6 +321,13 @@ pub enum DnsErr<TErr> {
|
||||
ResolveError(ResolveError),
|
||||
/// DNS resolution was successful, but the underlying transport refused the resolved address.
|
||||
MultiaddrNotSupported(Multiaddr),
|
||||
/// DNS resolution involved too many lookups.
|
||||
///
|
||||
/// DNS resolution on dialing performs up to 32 DNS lookups. If these
|
||||
/// are not sufficient to obtain a fully-resolved address, this error
|
||||
/// is returned and the DNS records for the domain(s) being dialed
|
||||
/// should be investigated.
|
||||
TooManyLookups,
|
||||
}
|
||||
|
||||
impl<TErr> fmt::Display for DnsErr<TErr>
|
||||
@ -236,6 +338,7 @@ where TErr: fmt::Display
|
||||
DnsErr::Transport(err) => write!(f, "{}", err),
|
||||
DnsErr::ResolveError(err) => write!(f, "{}", err),
|
||||
DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a),
|
||||
DnsErr::TooManyLookups => write!(f, "Too many DNS lookups"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -248,14 +351,33 @@ where TErr: error::Error + 'static
|
||||
DnsErr::Transport(err) => Some(err),
|
||||
DnsErr::ResolveError(err) => Some(err),
|
||||
DnsErr::MultiaddrNotSupported(_) => None,
|
||||
DnsErr::TooManyLookups => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Asynchronously resolves the domain name of a `Dns`, `Dns4` or `Dns6` protocol
|
||||
/// component. If the given protocol is not a DNS component, it is returned unchanged.
|
||||
fn resolve<'a, E: 'a, C, P>(proto: Protocol<'a>, resolver: &'a AsyncResolver<C,P>)
|
||||
-> impl Future<Output = Result<Protocol<'a>, DnsErr<E>>> + 'a
|
||||
/// The successful outcome of [`resolve`] for a given [`Protocol`].
|
||||
enum Resolved<'a> {
|
||||
/// The given `Protocol` has been resolved to a single `Protocol`,
|
||||
/// which may be identical to the one given, in case it is not
|
||||
/// a DNS protocol component.
|
||||
One(Protocol<'a>),
|
||||
/// The given `Protocol` has been resolved to multiple alternative
|
||||
/// `Protocol`s as a result of a DNS lookup.
|
||||
Many(Vec<Protocol<'a>>),
|
||||
/// The given `Protocol` has been resolved to a new list of `Multiaddr`s
|
||||
/// obtained from DNS TXT records representing possible alternatives.
|
||||
/// These addresses may contain further DNS names that need resolving.
|
||||
Addrs(Vec<Multiaddr>),
|
||||
}
|
||||
|
||||
/// Asynchronously resolves the domain name of a `Dns`, `Dns4`, `Dns6` or `Dnsaddr` protocol
|
||||
/// component. If the given protocol is of a different type, it is returned unchanged as a
|
||||
/// [`Resolved::One`].
|
||||
fn resolve<'a, E: 'a + Send, C, P>(
|
||||
proto: &Protocol<'a>,
|
||||
resolver: &'a AsyncResolver<C,P>,
|
||||
) -> BoxFuture<'a, Result<Resolved<'a>, DnsErr<E>>>
|
||||
where
|
||||
C: DnsHandle<Error = ResolveError>,
|
||||
P: ConnectionProvider<Conn = C>,
|
||||
@ -263,39 +385,105 @@ where
|
||||
match proto {
|
||||
Protocol::Dns(ref name) => {
|
||||
resolver.lookup_ip(fqdn(name)).map(move |res| match res {
|
||||
Ok(ips) => Ok(ips.into_iter()
|
||||
.next()
|
||||
.map(Protocol::from)
|
||||
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
|
||||
Err(e) => return Err(DnsErr::ResolveError(e))
|
||||
}).left_future()
|
||||
Ok(ips) => {
|
||||
let mut ips = ips.into_iter();
|
||||
let one = ips.next()
|
||||
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
|
||||
if let Some(two) = ips.next() {
|
||||
Ok(Resolved::Many(
|
||||
iter::once(one).chain(iter::once(two))
|
||||
.chain(ips)
|
||||
.map(Protocol::from)
|
||||
.collect()))
|
||||
} else {
|
||||
Ok(Resolved::One(Protocol::from(one)))
|
||||
}
|
||||
}
|
||||
Err(e) => Err(DnsErr::ResolveError(e))
|
||||
}).boxed()
|
||||
}
|
||||
Protocol::Dns4(ref name) => {
|
||||
resolver.ipv4_lookup(fqdn(name)).map(move |res| match res {
|
||||
Ok(ips) => Ok(ips.into_iter()
|
||||
.map(IpAddr::from)
|
||||
.next()
|
||||
.map(Protocol::from)
|
||||
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
|
||||
Err(e) => return Err(DnsErr::ResolveError(e))
|
||||
}).left_future().left_future().right_future()
|
||||
Ok(ips) => {
|
||||
let mut ips = ips.into_iter();
|
||||
let one = ips.next()
|
||||
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
|
||||
if let Some(two) = ips.next() {
|
||||
Ok(Resolved::Many(
|
||||
iter::once(one).chain(iter::once(two))
|
||||
.chain(ips)
|
||||
.map(IpAddr::from)
|
||||
.map(Protocol::from)
|
||||
.collect()))
|
||||
} else {
|
||||
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
|
||||
}
|
||||
}
|
||||
Err(e) => Err(DnsErr::ResolveError(e))
|
||||
}).boxed()
|
||||
}
|
||||
Protocol::Dns6(ref name) => {
|
||||
resolver.ipv6_lookup(fqdn(name)).map(move |res| match res {
|
||||
Ok(ips) => Ok(ips.into_iter()
|
||||
.map(IpAddr::from)
|
||||
.next()
|
||||
.map(Protocol::from)
|
||||
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
|
||||
Err(e) => return Err(DnsErr::ResolveError(e))
|
||||
}).right_future().left_future().right_future()
|
||||
Ok(ips) => {
|
||||
let mut ips = ips.into_iter();
|
||||
let one = ips.next()
|
||||
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
|
||||
if let Some(two) = ips.next() {
|
||||
Ok(Resolved::Many(
|
||||
iter::once(one).chain(iter::once(two))
|
||||
.chain(ips)
|
||||
.map(IpAddr::from)
|
||||
.map(Protocol::from)
|
||||
.collect()))
|
||||
} else {
|
||||
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
|
||||
}
|
||||
}
|
||||
Err(e) => Err(DnsErr::ResolveError(e))
|
||||
}).boxed()
|
||||
},
|
||||
proto => future::ready(Ok(proto)).right_future().right_future()
|
||||
Protocol::Dnsaddr(ref name) => {
|
||||
let name = Cow::Owned([DNSADDR_PREFIX, name].concat());
|
||||
resolver.txt_lookup(fqdn(&name)).map(move |res| match res {
|
||||
Ok(txts) => {
|
||||
let mut addrs = Vec::new();
|
||||
for txt in txts {
|
||||
if let Some(chars) = txt.txt_data().first() {
|
||||
match parse_dnsaddr_txt(chars) {
|
||||
Err(e) => {
|
||||
// Skip over seemingly invalid entries.
|
||||
log::debug!("Invalid TXT record: {:?}", e);
|
||||
}
|
||||
Ok(a) => {
|
||||
addrs.push(a);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Resolved::Addrs(addrs))
|
||||
}
|
||||
Err(e) => Err(DnsErr::ResolveError(e))
|
||||
}).boxed()
|
||||
}
|
||||
proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
fn fqdn(name: &std::borrow::Cow<'_, str>) -> String {
|
||||
if name.ends_with(".") {
|
||||
/// Parses a `<character-string>` of a `dnsaddr` TXT record.
|
||||
fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
|
||||
let s = str::from_utf8(txt).map_err(invalid_data)?;
|
||||
match s.strip_prefix("dnsaddr=") {
|
||||
None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
|
||||
Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?)
|
||||
}
|
||||
}
|
||||
|
||||
fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::InvalidData, e)
|
||||
}
|
||||
|
||||
fn fqdn(name: &Cow<'_, str>) -> String {
|
||||
if name.ends_with('.') {
|
||||
name.to_string()
|
||||
} else {
|
||||
format!("{}.", name)
|
||||
@ -308,6 +496,7 @@ mod tests {
|
||||
use futures::{future::BoxFuture, stream::BoxStream};
|
||||
use libp2p_core::{
|
||||
Transport,
|
||||
PeerId,
|
||||
multiaddr::{Protocol, Multiaddr},
|
||||
transport::ListenerEvent,
|
||||
transport::TransportError,
|
||||
@ -332,17 +521,12 @@ mod tests {
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
let addr = addr.iter().collect::<Vec<_>>();
|
||||
assert_eq!(addr.len(), 2);
|
||||
match addr[1] {
|
||||
Protocol::Tcp(_) => (),
|
||||
_ => panic!(),
|
||||
};
|
||||
match addr[0] {
|
||||
Protocol::Ip4(_) => (),
|
||||
Protocol::Ip6(_) => (),
|
||||
_ => panic!(),
|
||||
};
|
||||
// Check that all DNS components have been resolved, i.e. replaced.
|
||||
assert!(!addr.iter().any(|p| match p {
|
||||
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
|
||||
=> true,
|
||||
_ => false,
|
||||
}));
|
||||
Ok(Box::pin(future::ready(Ok(()))))
|
||||
}
|
||||
|
||||
@ -383,6 +567,37 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io.
|
||||
let _ = transport
|
||||
.clone()
|
||||
.dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap())
|
||||
.unwrap()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io having
|
||||
// an entry with suffix `/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN`,
|
||||
// i.e. a bootnode with such a peer ID.
|
||||
let _ = transport
|
||||
.clone()
|
||||
.dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap())
|
||||
.unwrap()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Failure due to the DNS TXT records at _dnsaddr.libp2p.io not having
|
||||
// an entry with a random `p2p` suffix.
|
||||
match transport
|
||||
.clone()
|
||||
.dial(format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random()).parse().unwrap())
|
||||
.unwrap()
|
||||
.await
|
||||
{
|
||||
Err(DnsErr::ResolveError(_)) => {},
|
||||
Err(e) => panic!("Unexpected error: {:?}", e),
|
||||
Ok(_) => panic!("Unexpected success.")
|
||||
}
|
||||
|
||||
// Failure due to no records.
|
||||
match transport
|
||||
.clone()
|
||||
@ -401,19 +616,27 @@ mod tests {
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
{
|
||||
// Be explicit about the resolver used. At least on github CI, TXT
|
||||
// type record lookups may not work with the system DNS resolver.
|
||||
let config = ResolverConfig::quad9();
|
||||
let opts = ResolverOpts::default();
|
||||
async_std_crate::task::block_on(
|
||||
DnsConfig::system(CustomTransport).then(|dns| run(dns.unwrap()))
|
||||
DnsConfig::custom(CustomTransport, config, opts).then(|dns| run(dns.unwrap()))
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
{
|
||||
// Be explicit about the resolver used. At least on github CI, TXT
|
||||
// type record lookups may not work with the system DNS resolver.
|
||||
let config = ResolverConfig::quad9();
|
||||
let opts = ResolverOpts::default();
|
||||
let rt = tokio_crate::runtime::Builder::new_current_thread()
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(run(TokioDnsConfig::system(CustomTransport).unwrap()));
|
||||
rt.block_on(run(TokioDnsConfig::custom(CustomTransport, config, opts).unwrap()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,7 @@
|
||||
# 0.30.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
# 0.29.0 [2021-01-12]
|
||||
|
||||
- Update dependencies.
|
||||
|
@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "libp2p-noise"
|
||||
description = "Cryptographic handshake protocol using the noise framework."
|
||||
version = "0.29.0"
|
||||
version = "0.30.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -12,7 +12,7 @@ bytes = "1"
|
||||
curve25519-dalek = "3.0.0"
|
||||
futures = "0.3.1"
|
||||
lazy_static = "1.2"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
log = "0.4"
|
||||
prost = "0.7"
|
||||
rand = "0.7.2"
|
||||
|
@ -1,3 +1,7 @@
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
# 0.27.1 [2021-02-15]
|
||||
|
||||
- Update dependencies.
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-plaintext"
|
||||
edition = "2018"
|
||||
description = "Plaintext encryption dummy protocol for libp2p"
|
||||
version = "0.27.1"
|
||||
version = "0.28.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
bytes = "1"
|
||||
futures = "0.3.1"
|
||||
asynchronous-codec = "0.6"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
log = "0.4.8"
|
||||
prost = "0.7"
|
||||
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
|
||||
|
@ -51,7 +51,7 @@ pub struct Remote {
|
||||
}
|
||||
|
||||
impl HandshakeContext<Local> {
|
||||
fn new(config: PlainText2Config) -> Result<Self, PlainTextError> {
|
||||
fn new(config: PlainText2Config) -> Self {
|
||||
let exchange = Exchange {
|
||||
id: Some(config.local_public_key.clone().into_peer_id().to_bytes()),
|
||||
pubkey: Some(config.local_public_key.clone().into_protobuf_encoding())
|
||||
@ -59,12 +59,12 @@ impl HandshakeContext<Local> {
|
||||
let mut buf = Vec::with_capacity(exchange.encoded_len());
|
||||
exchange.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
|
||||
|
||||
Ok(Self {
|
||||
Self {
|
||||
config,
|
||||
state: Local {
|
||||
exchange_bytes: buf
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn with_remote(self, exchange_bytes: BytesMut)
|
||||
@ -119,7 +119,7 @@ where
|
||||
let mut framed_socket = Framed::new(socket, UviBytes::default());
|
||||
|
||||
trace!("starting handshake");
|
||||
let context = HandshakeContext::new(config)?;
|
||||
let context = HandshakeContext::new(config);
|
||||
|
||||
trace!("sending exchange to remote");
|
||||
framed_socket.send(BytesMut::from(&context.state.exchange_bytes[..])).await?;
|
||||
|
@ -1,4 +1,8 @@
|
||||
# 0.27.2 [unreleased]
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
- Permit `/p2p` addresses.
|
||||
|
||||
- Update to `if-watch-0.2`.
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-tcp"
|
||||
edition = "2018"
|
||||
description = "TCP/IP transport protocol for libp2p"
|
||||
version = "0.27.2"
|
||||
version = "0.28.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -17,7 +17,7 @@ if-watch = { version = "0.2.0", optional = true }
|
||||
if-addrs = { version = "0.6.4", optional = true }
|
||||
ipnet = "2.0.0"
|
||||
libc = "0.2.80"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
log = "0.4.11"
|
||||
socket2 = { version = "0.4.0", features = ["all"] }
|
||||
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net"], optional = true }
|
||||
|
@ -379,7 +379,7 @@ where
|
||||
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
|
||||
let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) {
|
||||
let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) {
|
||||
sa
|
||||
} else {
|
||||
return Err(TransportError::MultiaddrNotSupported(addr));
|
||||
@ -390,7 +390,7 @@ where
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
||||
let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
|
||||
if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
|
||||
return Err(TransportError::MultiaddrNotSupported(addr));
|
||||
}
|
||||
@ -653,21 +653,34 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// This type of logic should probably be moved into the multiaddr package
|
||||
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
|
||||
let mut iter = addr.iter();
|
||||
let proto1 = iter.next().ok_or(())?;
|
||||
let proto2 = iter.next().ok_or(())?;
|
||||
|
||||
if iter.next().is_some() {
|
||||
return Err(());
|
||||
}
|
||||
|
||||
match (proto1, proto2) {
|
||||
(Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
|
||||
(Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
|
||||
_ => Err(()),
|
||||
/// Extracts a `SocketAddr` from a given `Multiaddr`.
|
||||
///
|
||||
/// Fails if the given `Multiaddr` does not begin with an IP
|
||||
/// protocol encapsulating a TCP port.
|
||||
fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
|
||||
// "Pop" the IP address and TCP port from the end of the address,
|
||||
// ignoring a `/p2p/...` suffix as well as any prefix of possibly
|
||||
// outer protocols, if present.
|
||||
let mut port = None;
|
||||
while let Some(proto) = addr.pop() {
|
||||
match proto {
|
||||
Protocol::Ip4(ipv4) => match port {
|
||||
Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
|
||||
None => return Err(())
|
||||
},
|
||||
Protocol::Ip6(ipv6) => match port {
|
||||
Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
|
||||
None => return Err(())
|
||||
},
|
||||
Protocol::Tcp(portnum) => match port {
|
||||
Some(_) => return Err(()),
|
||||
None => { port = Some(portnum) }
|
||||
}
|
||||
Protocol::P2p(_) => {}
|
||||
_ => return Err(())
|
||||
}
|
||||
}
|
||||
Err(())
|
||||
}
|
||||
|
||||
// Create a [`Multiaddr`] from the given IP address and port number.
|
||||
@ -687,12 +700,12 @@ mod tests {
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||
|
||||
assert!(
|
||||
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
|
||||
multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
|
||||
multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
|
||||
Ok(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
12345,
|
||||
@ -700,7 +713,7 @@ mod tests {
|
||||
);
|
||||
assert_eq!(
|
||||
multiaddr_to_socketaddr(
|
||||
&"/ip4/255.255.255.255/tcp/8080"
|
||||
"/ip4/255.255.255.255/tcp/8080"
|
||||
.parse::<Multiaddr>()
|
||||
.unwrap()
|
||||
),
|
||||
@ -710,7 +723,7 @@ mod tests {
|
||||
))
|
||||
);
|
||||
assert_eq!(
|
||||
multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
|
||||
multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
|
||||
Ok(SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
|
||||
12345,
|
||||
@ -718,7 +731,7 @@ mod tests {
|
||||
);
|
||||
assert_eq!(
|
||||
multiaddr_to_socketaddr(
|
||||
&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
|
||||
"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
|
||||
.parse::<Multiaddr>()
|
||||
.unwrap()
|
||||
),
|
||||
|
@ -1,3 +1,9 @@
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
- Permit `/p2p` addresses.
|
||||
|
||||
# 0.27.0 [2021-01-12]
|
||||
|
||||
- Update dependencies.
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-uds"
|
||||
edition = "2018"
|
||||
description = "Unix domain sockets transport for libp2p"
|
||||
version = "0.27.0"
|
||||
version = "0.28.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies]
|
||||
async-std = { version = "1.6.2", optional = true }
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
log = "0.4.1"
|
||||
futures = "0.3.1"
|
||||
tokio = { version = "1.0.1", default-features = false, features = ["net"], optional = true }
|
||||
|
@ -140,23 +140,20 @@ codegen!(
|
||||
/// paths.
|
||||
// This type of logic should probably be moved into the multiaddr package
|
||||
fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> {
|
||||
let mut iter = addr.iter();
|
||||
let path = iter.next();
|
||||
|
||||
if iter.next().is_some() {
|
||||
return Err(());
|
||||
let mut protocols = addr.iter();
|
||||
match protocols.next() {
|
||||
Some(Protocol::Unix(ref path)) => {
|
||||
let path = PathBuf::from(path.as_ref());
|
||||
if !path.is_absolute() {
|
||||
return Err(())
|
||||
}
|
||||
match protocols.next() {
|
||||
None | Some(Protocol::P2p(_)) => Ok(path),
|
||||
Some(_) => Err(())
|
||||
}
|
||||
}
|
||||
_ => Err(())
|
||||
}
|
||||
|
||||
let out: PathBuf = match path {
|
||||
Some(Protocol::Unix(ref path)) => path.as_ref().into(),
|
||||
_ => return Err(())
|
||||
};
|
||||
|
||||
if !out.is_absolute() {
|
||||
return Err(());
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "async-std"))]
|
||||
|
@ -1,3 +1,7 @@
|
||||
# 0.28.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
# 0.27.0 [2021-01-12]
|
||||
|
||||
- Update dependencies.
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "libp2p-wasm-ext"
|
||||
version = "0.27.0"
|
||||
version = "0.28.0"
|
||||
authors = ["Pierre Krieger <pierre.krieger1708@gmail.com>"]
|
||||
edition = "2018"
|
||||
description = "Allows passing in an external transport in a WASM environment"
|
||||
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
[dependencies]
|
||||
futures = "0.3.1"
|
||||
js-sys = "0.3.19"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
parity-send-wrapper = "0.1.0"
|
||||
wasm-bindgen = "0.2.42"
|
||||
wasm-bindgen-futures = "0.4.4"
|
||||
|
@ -1,3 +1,9 @@
|
||||
# 0.29.0 [unreleased]
|
||||
|
||||
- Update `libp2p-core`.
|
||||
|
||||
- Permit dialing `/p2p` addresses.
|
||||
|
||||
# 0.28.0 [2021-01-12]
|
||||
|
||||
- Update dependencies.
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-websocket"
|
||||
edition = "2018"
|
||||
description = "WebSocket transport for libp2p"
|
||||
version = "0.28.0"
|
||||
version = "0.29.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
futures-rustls = "0.21"
|
||||
either = "1.5.3"
|
||||
futures = "0.3.1"
|
||||
libp2p-core = { version = "0.27.0", path = "../../core" }
|
||||
libp2p-core = { version = "0.28.0", path = "../../core" }
|
||||
log = "0.4.8"
|
||||
quicksink = "0.1"
|
||||
rw-stream-sink = "0.2.0"
|
||||
|
@ -231,13 +231,11 @@ where
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
// Quick sanity check of the provided Multiaddr.
|
||||
if let Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) = addr.iter().last() {
|
||||
// ok
|
||||
} else {
|
||||
debug!("{} is not a websocket multiaddr", addr);
|
||||
return Err(TransportError::MultiaddrNotSupported(addr))
|
||||
}
|
||||
let addr = match parse_ws_dial_addr(addr) {
|
||||
Ok(addr) => addr,
|
||||
Err(Error::InvalidMultiaddr(a)) => return Err(TransportError::MultiaddrNotSupported(a)),
|
||||
Err(e) => return Err(TransportError::Other(e)),
|
||||
};
|
||||
|
||||
// We are looping here in order to follow redirects (if any):
|
||||
let mut remaining_redirects = self.max_redirects;
|
||||
@ -248,11 +246,11 @@ where
|
||||
match this.dial_once(addr).await {
|
||||
Ok(Either::Left(redirect)) => {
|
||||
if remaining_redirects == 0 {
|
||||
debug!("too many redirects");
|
||||
debug!("Too many redirects (> {})", self.max_redirects);
|
||||
return Err(Error::TooManyRedirects)
|
||||
}
|
||||
remaining_redirects -= 1;
|
||||
addr = location_to_multiaddr(&redirect)?
|
||||
addr = parse_ws_dial_addr(location_to_multiaddr(&redirect)?)?
|
||||
}
|
||||
Ok(Either::Right(conn)) => return Ok(conn),
|
||||
Err(e) => return Err(e)
|
||||
@ -273,46 +271,26 @@ where
|
||||
T: Transport,
|
||||
T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static
|
||||
{
|
||||
/// Attempty to dial the given address and perform a websocket handshake.
|
||||
async fn dial_once(self, address: Multiaddr) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> {
|
||||
trace!("dial address: {}", address);
|
||||
/// Attempts to dial the given address and perform a websocket handshake.
|
||||
async fn dial_once(self, addr: WsAddress) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> {
|
||||
trace!("Dialing websocket address: {:?}", addr);
|
||||
|
||||
let (host_port, dns_name) = host_and_dnsname(&address)?;
|
||||
|
||||
let mut inner_addr = address.clone();
|
||||
|
||||
let (use_tls, path) =
|
||||
match inner_addr.pop() {
|
||||
Some(Protocol::Ws(path)) => (false, path),
|
||||
Some(Protocol::Wss(path)) => {
|
||||
if dns_name.is_none() {
|
||||
debug!("no DNS name in {}", address);
|
||||
return Err(Error::InvalidMultiaddr(address))
|
||||
}
|
||||
(true, path)
|
||||
}
|
||||
_ => {
|
||||
debug!("{} is not a websocket multiaddr", address);
|
||||
return Err(Error::InvalidMultiaddr(address))
|
||||
}
|
||||
};
|
||||
|
||||
let dial = self.transport.dial(inner_addr)
|
||||
let dial = self.transport.dial(addr.tcp_addr)
|
||||
.map_err(|e| match e {
|
||||
TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a),
|
||||
TransportError::Other(e) => Error::Transport(e)
|
||||
})?;
|
||||
|
||||
let stream = dial.map_err(Error::Transport).await?;
|
||||
trace!("connected to {}", address);
|
||||
trace!("TCP connection to {} established.", addr.host_port);
|
||||
|
||||
let stream =
|
||||
if use_tls { // begin TLS session
|
||||
let dns_name = dns_name.expect("for use_tls we have checked that dns_name is some");
|
||||
trace!("starting TLS handshake with {}", address);
|
||||
if addr.use_tls { // begin TLS session
|
||||
let dns_name = addr.dns_name.expect("for use_tls we have checked that dns_name is some");
|
||||
trace!("Starting TLS handshake with {:?}", dns_name);
|
||||
let stream = self.tls_config.client.connect(dns_name.as_ref(), stream)
|
||||
.map_err(|e| {
|
||||
debug!("TLS handshake with {} failed: {}", address, e);
|
||||
debug!("TLS handshake with {:?} failed: {}", dns_name, e);
|
||||
Error::Tls(tls::Error::from(e))
|
||||
})
|
||||
.await?;
|
||||
@ -323,9 +301,9 @@ where
|
||||
EitherOutput::Second(stream)
|
||||
};
|
||||
|
||||
trace!("sending websocket handshake request to {}", address);
|
||||
trace!("Sending websocket handshake to {}", addr.host_port);
|
||||
|
||||
let mut client = handshake::Client::new(stream, &host_port, path.as_ref());
|
||||
let mut client = handshake::Client::new(stream, &addr.host_port, addr.path.as_ref());
|
||||
|
||||
if self.use_deflate {
|
||||
client.add_extension(Box::new(Deflate::new(connection::Mode::Client)));
|
||||
@ -341,32 +319,87 @@ where
|
||||
Err(Error::Handshake(msg.into()))
|
||||
}
|
||||
handshake::ServerResponse::Accepted { .. } => {
|
||||
trace!("websocket handshake with {} successful", address);
|
||||
trace!("websocket handshake with {} successful", addr.host_port);
|
||||
Ok(Either::Right(Connection::new(client.into_builder())))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract host, port and optionally the DNS name from the given [`Multiaddr`].
|
||||
fn host_and_dnsname<T>(addr: &Multiaddr) -> Result<(String, Option<webpki::DNSName>), Error<T>> {
|
||||
let mut iter = addr.iter();
|
||||
match (iter.next(), iter.next()) {
|
||||
(Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) =>
|
||||
Ok((format!("{}:{}", ip, port), None)),
|
||||
(Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) =>
|
||||
Ok((format!("{}:{}", ip, port), None)),
|
||||
(Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) =>
|
||||
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
|
||||
(Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) =>
|
||||
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
|
||||
(Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) =>
|
||||
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
|
||||
_ => {
|
||||
debug!("multi-address format not supported: {}", addr);
|
||||
Err(Error::InvalidMultiaddr(addr.clone()))
|
||||
#[derive(Debug)]
|
||||
struct WsAddress {
|
||||
host_port: String,
|
||||
path: String,
|
||||
dns_name: Option<webpki::DNSName>,
|
||||
use_tls: bool,
|
||||
tcp_addr: Multiaddr,
|
||||
}
|
||||
|
||||
/// Tries to parse the given `Multiaddr` into a `WsAddress` used
|
||||
/// for dialing.
|
||||
///
|
||||
/// Fails if the given `Multiaddr` does not represent a TCP/IP-based
|
||||
/// websocket protocol stack.
|
||||
fn parse_ws_dial_addr<T>(addr: Multiaddr) -> Result<WsAddress, Error<T>> {
|
||||
// The encapsulating protocol must be based on TCP/IP, possibly via DNS.
|
||||
// We peek at it in order to learn the hostname and port to use for
|
||||
// the websocket handshake.
|
||||
let mut protocols = addr.iter();
|
||||
let mut ip = protocols.next();
|
||||
let mut tcp = protocols.next();
|
||||
let (host_port, dns_name) = loop {
|
||||
match (ip, tcp) {
|
||||
(Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)))
|
||||
=> break (format!("{}:{}", ip, port), None),
|
||||
(Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)))
|
||||
=> break (format!("{}:{}", ip, port), None),
|
||||
(Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) |
|
||||
(Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) |
|
||||
(Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) |
|
||||
(Some(Protocol::Dnsaddr(h)), Some(Protocol::Tcp(port)))
|
||||
=> break (format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned())),
|
||||
(Some(_), Some(p)) => {
|
||||
ip = Some(p);
|
||||
tcp = protocols.next();
|
||||
}
|
||||
_ => return Err(Error::InvalidMultiaddr(addr))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Now consume the `Ws` / `Wss` protocol from the end of the address,
|
||||
// preserving the trailing `P2p` protocol that identifies the remote,
|
||||
// if any.
|
||||
let mut protocols = addr.clone();
|
||||
let mut p2p = None;
|
||||
let (use_tls, path) = loop {
|
||||
match protocols.pop() {
|
||||
p@Some(Protocol::P2p(_)) => { p2p = p }
|
||||
Some(Protocol::Ws(path)) => break (false, path.into_owned()),
|
||||
Some(Protocol::Wss(path)) => {
|
||||
if dns_name.is_none() {
|
||||
debug!("Missing DNS name in WSS address: {}", addr);
|
||||
return Err(Error::InvalidMultiaddr(addr))
|
||||
}
|
||||
break (true, path.into_owned())
|
||||
}
|
||||
_ => return Err(Error::InvalidMultiaddr(addr))
|
||||
}
|
||||
};
|
||||
|
||||
// The original address, stripped of the `/ws` and `/wss` protocols,
|
||||
// makes up the the address for the inner TCP-based transport.
|
||||
let tcp_addr = match p2p {
|
||||
Some(p) => protocols.with(p),
|
||||
None => protocols
|
||||
};
|
||||
|
||||
Ok(WsAddress {
|
||||
host_port,
|
||||
dns_name,
|
||||
path,
|
||||
use_tls,
|
||||
tcp_addr,
|
||||
})
|
||||
}
|
||||
|
||||
// Given a location URL, build a new websocket [`Multiaddr`].
|
||||
|
@ -44,6 +44,13 @@ pub struct WsConfig<T> {
|
||||
|
||||
impl<T> WsConfig<T> {
|
||||
/// Create a new websocket transport based on the given transport.
|
||||
///
|
||||
/// > **Note*: The given transport must be based on TCP/IP and should
|
||||
/// > usually incorporate DNS resolution, though the latter is not
|
||||
/// > strictly necessary if one wishes to only use the `Ws` protocol
|
||||
/// > with known IP addresses and ports. See [`libp2p-tcp`](https://docs.rs/libp2p-tcp/)
|
||||
/// > and [`libp2p-dns`](https://docs.rs/libp2p-dns) for constructing
|
||||
/// > the inner transport.
|
||||
pub fn new(transport: T) -> Self {
|
||||
framed::WsConfig::new(transport).into()
|
||||
}
|
||||
@ -187,10 +194,9 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use libp2p_core::Multiaddr;
|
||||
use libp2p_core::{Multiaddr, PeerId, Transport, multiaddr::Protocol};
|
||||
use libp2p_tcp as tcp;
|
||||
use futures::prelude::*;
|
||||
use libp2p_core::{Transport, multiaddr::Protocol};
|
||||
use super::WsConfig;
|
||||
|
||||
#[test]
|
||||
@ -230,7 +236,7 @@ mod tests {
|
||||
conn.await
|
||||
};
|
||||
|
||||
let outbound = ws_config.dial(addr).unwrap();
|
||||
let outbound = ws_config.dial(addr.with(Protocol::P2p(PeerId::random().into()))).unwrap();
|
||||
|
||||
let (a, b) = futures::join!(inbound, outbound);
|
||||
a.and(b).unwrap();
|
||||
|
Reference in New Issue
Block a user