[libp2p-dns] Implement /dnsaddr resolution. (#1931)

* Implement `/dnsaddr` support on `libp2p-dns`.

To that end, since resolving `/dnsaddr` addresses needs
"fully qualified" multiaddresses when dialing, i.e. those
that end with the `/p2p/...` protocol, we make sure that
dialing always uses such fully qualified addresses by
appending the `/p2p` protocol as necessary. As a side-effect,
this adds support for dialing peers via "fully qualified"
addresses, as an alternative to using a `PeerId` together
with a `Multiaddr` with or without the `/p2p` protocol.

* Adapt libp2p-relay.

* Update versions, changelogs and small cleanups.
This commit is contained in:
Roman Borschel
2021-03-17 10:53:19 +01:00
committed by GitHub
parent c1f75eee81
commit 45f07bf863
57 changed files with 738 additions and 309 deletions

View File

@ -64,35 +64,35 @@ atomic = "0.5.0"
bytes = "1"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.27.2", path = "core", default-features = false }
libp2p-core = { version = "0.28.0", path = "core", default-features = false }
libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.27.2", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.29.0", path = "transports/noise", optional = true }
libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.28.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.27.1", path = "transports/plaintext", optional = true }
libp2p-plaintext = { version = "0.28.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.20.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.1.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.10.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.28.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.22.0", path = "swarm-derive" }
libp2p-uds = { version = "0.27.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.27.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.30.1", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.11.1", path = "misc/multiaddr" }
libp2p-uds = { version = "0.28.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.28.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.31.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.11.2", path = "misc/multiaddr" }
parking_lot = "0.11.0"
pin-project = "1.0.0"
smallvec = "1.6.1"
wasm-timer = "0.2.4"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.27.1", path = "transports/deflate", optional = true }
libp2p-deflate = { version = "0.28.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.27.1", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true }
libp2p-tcp = { version = "0.28.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.29.0", path = "transports/websocket", optional = true }
[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }

View File

@ -1,4 +1,12 @@
# 0.27.2 [unreleased]
# 0.28.0 [unreleased]
- `Network::dial()` understands `/p2p` addresses and `Transport::dial`
gets a "fully qualified" `/p2p` address when dialing a specific peer,
whether through the `Network::peer()` API or via `Network::dial()`
with a `/p2p` address.
- `Network::dial()` and `network::Peer::dial()` return a `DialError`
on error.
- Shorten and unify `Debug` impls of public keys.

View File

@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.27.2"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -554,7 +554,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Returns an iterator over all connected peers, i.e. those that have
/// at least one established connection in the pool.
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
pub fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
self.established.keys()
}

View File

@ -209,13 +209,14 @@ where
&self.local_peer_id
}
/// Dials a multiaddress without expecting a particular remote peer ID.
/// Dials a [`Multiaddr`] that may or may not encapsulate a
/// specific expected remote peer ID.
///
/// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
@ -225,15 +226,32 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() {
if let Ok(peer) = PeerId::try_from(ma) {
return self.dial_peer(DialingOpts {
peer,
address: address.clone(),
handler,
remaining: Vec::new(),
})
}
}
// The address does not specify an expected peer, so just try to dial it as-is,
// accepting any peer ID that the remote identifies as.
let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
}
}
@ -430,7 +448,7 @@ where
/// Initiates a connection attempt to a known peer.
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
@ -460,7 +478,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
<THandler::Handler as ConnectionHandler>::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
) -> Result<ConnectionId, DialError>
where
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
@ -478,23 +496,28 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let result = match transport.dial(opts.address.clone()) {
// Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
// to work with.
let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?;
let result = match transport.dial(addr.clone()) {
Ok(fut) => {
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
};
if let Ok(id) = &result {
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
current: (*id, addr),
remaining: opts.remaining,
},
);
@ -668,6 +691,37 @@ impl NetworkConfig {
}
}
/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
///
/// If the given address is already a `p2p` address for the given peer,
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
///
/// If the given address is already a `p2p` address for a different peer
/// than the one given, the given `Multiaddr` is returned as an `Err`.
///
/// If the given address is not yet a `p2p` address for the given peer,
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() {
if &hash != peer.as_ref() {
return Err(addr)
}
Ok(addr)
} else {
Ok(addr.with(multiaddr::Protocol::P2p(peer.into())))
}
}
/// Possible (synchronous) errors when dialing a peer.
#[derive(Clone, Debug)]
pub enum DialError {
/// The dialing attempt is rejected because of a connection limit.
ConnectionLimit(ConnectionLimit),
/// The address being dialed is invalid, e.g. if it refers to a different
/// remote peer than the one being dialed.
InvalidAddress(Multiaddr),
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -45,7 +45,7 @@ use std::{
error,
fmt,
};
use super::{Network, DialingOpts};
use super::{Network, DialingOpts, DialError};
/// The possible representations of a peer in a [`Network`], as
/// seen by the local node.
@ -210,7 +210,7 @@ where
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
ConnectionLimit
DialError
>
where
I: IntoIterator<Item = Multiaddr>,
@ -219,7 +219,9 @@ where
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
Peer::Local => return Err(DialError::ConnectionLimit(ConnectionLimit {
current: 0, limit: 0
}))
};
let id = network.dial_peer(DialingOpts {

View File

@ -263,19 +263,14 @@ impl Drop for Listener {
/// If the address is `/memory/n`, returns the value of `n`.
fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> {
let mut iter = a.iter();
let port = if let Some(Protocol::Memory(port)) = iter.next() {
port
} else {
return Err(());
};
if iter.next().is_some() {
return Err(());
let mut protocols = a.iter();
match protocols.next() {
Some(Protocol::Memory(port)) => match protocols.next() {
None | Some(Protocol::P2p(_)) => Ok(port),
_ => Err(())
}
_ => Err(())
}
Ok(port)
}
/// A channel represents an established, in-memory, logical connection between two endpoints.

View File

@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
network::{NetworkEvent, NetworkConfig, ConnectionLimits},
network::{NetworkEvent, NetworkConfig, ConnectionLimits, DialError},
};
use rand::Rng;
use std::task::Poll;
@ -47,12 +47,16 @@ fn max_outgoing() {
.expect("Unexpected connection limit.");
}
let err = network.peer(target.clone())
match network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
.expect_err("Unexpected dialing success.")
{
DialError::ConnectionLimit(err) => {
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
}
e => panic!("Unexpected error: {:?}", e),
}
let info = network.info();
assert_eq!(info.num_peers(), 0);

View File

@ -25,6 +25,7 @@ use libp2p_core::multiaddr::multiaddr;
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
multiaddr::Protocol,
network::{NetworkEvent, NetworkConfig},
};
use rand::seq::SliceRandom;
@ -70,7 +71,7 @@ fn deny_incoming_connec() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(&peer_id, swarm1.local_peer_id());
assert_eq!(multiaddr, address);
assert_eq!(multiaddr, address.clone().with(Protocol::P2p(peer_id.into())));
return Poll::Ready(Ok(()));
},
Poll::Ready(_) => unreachable!(),
@ -162,21 +163,27 @@ fn dial_self_by_id() {
fn multiple_addresses_err() {
// Tries dialing multiple addresses, and makes sure there's one dialing error per address.
let target = PeerId::random();
let mut swarm = test_network(NetworkConfig::default());
let mut addresses = Vec::new();
for _ in 0 .. 3 {
addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]);
addresses.push(multiaddr![
Ip4([0, 0, 0, 0]),
Tcp(rand::random::<u16>())
]);
}
for _ in 0 .. 5 {
addresses.push(multiaddr![Udp(rand::random::<u16>())]);
addresses.push(multiaddr![
Udp(rand::random::<u16>())
]);
}
addresses.shuffle(&mut rand::thread_rng());
let first = addresses[0].clone();
let rest = (&addresses[1..]).iter().cloned();
let target = PeerId::random();
swarm.peer(target.clone())
.dial(first, rest, TestHandler())
.unwrap();
@ -191,7 +198,7 @@ fn multiple_addresses_err() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(peer_id, target);
let expected = addresses.remove(0);
let expected = addresses.remove(0).with(Protocol::P2p(target.clone().into()));
assert_eq!(multiaddr, expected);
if addresses.is_empty() {
assert_eq!(attempts_remaining, 0);

View File

@ -25,6 +25,7 @@
use async_std::task;
use libp2p::{
Multiaddr,
Swarm,
PeerId,
identity,
@ -38,7 +39,14 @@ use libp2p::kad::{
QueryResult,
};
use libp2p::kad::record::store::MemoryStore;
use std::{env, error::Error, time::Duration};
use std::{env, error::Error, str::FromStr, time::Duration};
const BOOTNODES: [&'static str; 4] = [
"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt"
];
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
@ -59,28 +67,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
let store = MemoryStore::new(local_peer_id.clone());
let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg);
// TODO: the /dnsaddr/ scheme is not supported (https://github.com/libp2p/rust-libp2p/issues/967)
/*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/
// Add the bootnodes to the local routing table. `libp2p-dns` built
// into the `transport` resolves the `dnsaddr` when Kademlia tries
// to dial these nodes.
let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io")?;
for peer in &BOOTNODES {
behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone());
}
// The only address that currently works.
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse()?, "/ip4/104.131.131.82/tcp/4001".parse()?);
// The following addresses always fail signature verification, possibly due to
// RSA keys with < 2048 bits.
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
// The following addresses are permanently unreachable:
// Other(Other(A(Transport(A(Underlying(Os { code: 101, kind: Other, message: "Network is unreachable" }))))))
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
Swarm::new(transport, behaviour, local_peer_id)
};

View File

@ -1,3 +1,7 @@
# 0.11.2 [unreleased]
- Add `Multiaddr::ends_with()`.
# 0.11.1 [2021-02-15]
- Update dependencies

View File

@ -6,7 +6,7 @@ description = "Implementation of the multiaddr format"
homepage = "https://github.com/libp2p/rust-libp2p"
keywords = ["multiaddr", "ipfs"]
license = "MIT"
version = "0.11.1"
version = "0.11.2"
[features]
default = ["url"]

View File

@ -174,6 +174,16 @@ impl Multiaddr {
if replaced { Some(address) } else { None }
}
/// Checks whether the given `Multiaddr` is a suffix of this `Multiaddr`.
pub fn ends_with(&self, other: &Multiaddr) -> bool {
let n = self.bytes.len();
let m = other.bytes.len();
if n < m {
return false
}
self.bytes[(n - m) ..] == other.bytes[..]
}
}
impl fmt::Debug for Multiaddr {

View File

@ -56,6 +56,18 @@ fn push_pop_identity() {
QuickCheck::new().quickcheck(prop as fn(Ma, Proto) -> bool)
}
#[test]
fn ends_with() {
fn prop(Ma(m): Ma) {
let n = m.iter().count();
for i in 0 .. n {
let suffix = m.iter().skip(i).collect::<Multiaddr>();
assert!(m.ends_with(&suffix));
}
}
QuickCheck::new().quickcheck(prop as fn(_))
}
// Arbitrary impls

View File

@ -260,7 +260,7 @@ where
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
*this.state = SeqState::SendProtocol { io, protocol }
}
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
}
}

View File

@ -1,4 +1,4 @@
# 0.27.2 [unreleased]
# 0.28.0 [unreleased]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-mplex"
edition = "2018"
description = "Mplex multiplexing protocol for libp2p"
version = "0.27.2"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
bytes = "1"
futures = "0.3.1"
asynchronous-codec = "0.6"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4"
nohash-hasher = "0.2"
parking_lot = "0.11"

View File

@ -238,7 +238,7 @@ where
num_buffered += 1;
}
Frame::Close { stream_id } => {
self.on_close(stream_id.into_local())?;
self.on_close(stream_id.into_local());
}
Frame::Reset { stream_id } => {
self.on_reset(stream_id.into_local())
@ -460,7 +460,7 @@ where
}
Frame::Close { stream_id } => {
let stream_id = stream_id.into_local();
self.on_close(stream_id)?;
self.on_close(stream_id);
if id == stream_id {
return Poll::Ready(Ok(None))
}
@ -683,7 +683,7 @@ where
}
/// Processes an inbound `Close` frame.
fn on_close(&mut self, id: LocalStreamId) -> io::Result<()> {
fn on_close(&mut self, id: LocalStreamId) {
if let Some(state) = self.substreams.remove(&id) {
match state {
SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => {
@ -715,8 +715,6 @@ where
trace!("{}: Ignoring `Close` for unknown substream {}. Possibly dropped earlier.",
self.id, id);
}
Ok(())
}
/// Generates the next outbound stream ID.

View File

@ -1,3 +1,7 @@
# 0.31.0 [unreleased]
- Update `libp2p-core`.
# 0.30.1 [2021-02-17]
- Update `yamux` to `0.8.1`.

View File

@ -2,7 +2,7 @@
name = "libp2p-yamux"
edition = "2018"
description = "Yamux multiplexing protocol for libp2p"
version = "0.30.1"
version = "0.31.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
parking_lot = "0.11"
thiserror = "1.0"
yamux = "0.8.1"

View File

@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
cuckoofilter = "0.5.0"
fnv = "1.0"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4"
prost = "0.7"

View File

@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
bytes = "1.0"
byteorder = "1.3.4"
fnv = "1.0.7"

View File

@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.1"
prost = "0.7"

View File

@ -17,7 +17,7 @@ fnv = "1.0"
asynchronous-codec = "0.6"
futures = "0.3.1"
log = "0.4"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
prost = "0.7"
rand = "0.7.2"

View File

@ -1072,7 +1072,11 @@ fn manual_bucket_inserts() {
let mut swarms = build_connected_nodes_with_config(3, 1, cfg);
// The peers and their addresses for which we expect `RoutablePeer` events.
let mut expected = swarms.iter().skip(2)
.map(|(a, s)| (a.clone(), Swarm::local_peer_id(s).clone()))
.map(|(a, s)| {
let pid = *Swarm::local_peer_id(s);
let addr = a.clone().with(Protocol::P2p(pid.into()));
(addr, pid)
})
.collect::<HashMap<_,_>>();
// We collect the peers for which a `RoutablePeer` event
// was received in here to check at the end of the test
@ -1087,7 +1091,7 @@ fn manual_bucket_inserts() {
Poll::Ready(Some(KademliaEvent::RoutablePeer {
peer, address
})) => {
assert_eq!(peer, expected.remove(&address).expect("Unexpected address"));
assert_eq!(peer, expected.remove(&address).expect("Missing address"));
routable.push(peer);
if expected.is_empty() {
for peer in routable.iter() {

View File

@ -16,7 +16,7 @@ dns-parser = "0.8.0"
futures = "0.3.13"
if-watch = "0.2.0"
lazy_static = "1.4.0"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.14"
rand = "0.8.3"

View File

@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.1"
rand = "0.7.2"

View File

@ -14,7 +14,7 @@ asynchronous-codec = "0.6"
bytes = "1"
futures = "0.3.1"
futures-timer = "3"
libp2p-core = { version = "0.27", path = "../../core" }
libp2p-core = { version = "0.28", path = "../../core" }
libp2p-swarm = { version = "0.28", path = "../../swarm" }
log = "0.4"
pin-project = "1"

View File

@ -401,14 +401,12 @@ impl<T: Transport> Stream for RelayListener<T> {
Poll::Ready(Some(BehaviourToListenerMsg::IncomingRelayedConnection {
stream,
src_peer_id,
relay_peer_id,
relay_addr,
relay_peer_id: _
})) => {
return Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
upgrade: RelayedListenerUpgrade::Relayed(Some(stream)),
local_addr: relay_addr
.with(Protocol::P2p(relay_peer_id.into()))
.with(Protocol::P2pCircuit),
local_addr: relay_addr.with(Protocol::P2pCircuit),
remote_addr: Protocol::P2p(src_peer_id.into()).into(),
})));
}

View File

@ -381,9 +381,10 @@ fn src_try_connect_to_offline_dst() {
loop {
match src_swarm.next_event().await {
SwarmEvent::UnknownPeerUnreachableAddr { address, .. }
SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay =>
{
assert_eq!(peer_id, dst_peer_id);
break;
}
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
@ -437,9 +438,10 @@ fn src_try_connect_to_unsupported_dst() {
loop {
match src_swarm.next_event().await {
SwarmEvent::UnknownPeerUnreachableAddr { address, .. }
SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay =>
{
assert_eq!(peer_id, dst_peer_id);
break;
}
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {}
@ -486,8 +488,10 @@ fn src_try_connect_to_offline_dst_via_offline_relay() {
// Source Node fail to reach Destination Node due to failure reaching Relay.
match src_swarm.next_event().await {
SwarmEvent::UnknownPeerUnreachableAddr { address, .. }
if address == dst_addr_via_relay => {}
SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay => {
assert_eq!(peer_id, dst_peer_id);
}
e => panic!("{:?}", e),
}
});
@ -573,6 +577,9 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
}
}
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
..
})) => {}
e => panic!("{:?}", e),
}
}
@ -1005,9 +1012,10 @@ fn yield_incoming_connection_through_correct_listener() {
}
match src_3_swarm.next_event().boxed().poll_unpin(cx) {
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, .. })
Poll::Ready(SwarmEvent::UnreachableAddr { address, peer_id, .. })
if address == dst_addr_via_relay_3 =>
{
assert_eq!(peer_id, dst_peer_id);
return Poll::Ready(());
}
Poll::Ready(SwarmEvent::Dialing { .. }) => {}

View File

@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
async-trait = "0.1"
bytes = "1"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm" }
log = "0.4.11"
lru = "0.6"

View File

@ -283,9 +283,9 @@ pub async fn development_transport(keypair: identity::Keypair)
{
let transport = {
let tcp = tcp::TcpConfig::new().nodelay(true);
let transport = dns::DnsConfig::system(tcp).await?;
let websockets = websocket::WsConfig::new(transport.clone());
transport.or_transport(websockets)
let dns_tcp = dns::DnsConfig::system(tcp).await?;
let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone());
dns_tcp.or_transport(ws_dns_tcp)
};
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
@ -318,9 +318,9 @@ pub fn tokio_development_transport(keypair: identity::Keypair)
{
let transport = {
let tcp = tcp::TokioTcpConfig::new().nodelay(true);
let transport = dns::TokioDnsConfig::system(tcp)?;
let websockets = websocket::WsConfig::new(transport.clone());
transport.or_transport(websockets)
let dns_tcp = dns::TokioDnsConfig::system(tcp)?;
let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone());
dns_tcp.or_transport(ws_dns_tcp)
};
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()

View File

@ -1,5 +1,9 @@
# 0.28.0 [unreleased]
- New error variant `DialError::InvalidAddress`
- `Swarm::dial_addr()` now returns a `DialError` on error.
- Remove the option for a substream-specific multistream select protocol override.
The override at this granularity is no longer deemed useful, in particular because
it can usually not be configured for existing protocols like `libp2p-kad` and others.

View File

@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
either = "1.6.0"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../core" }
libp2p-core = { version = "0.28.0", path = "../core" }
log = "0.4"
rand = "0.7"
smallvec = "1.6.1"

View File

@ -113,6 +113,7 @@ use libp2p_core::{
transport::{self, TransportError},
muxing::StreamMuxerBox,
network::{
self,
ConnectionLimits,
Network,
NetworkInfo,
@ -359,11 +360,11 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}
/// Initiates a new dialing attempt to the given address.
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), DialError> {
let handler = me.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.dial(&addr, handler).map(|_id| ())
Ok(me.network.dial(&addr, handler).map(|_id| ())?)
}
/// Initiates a new dialing attempt to the given peer.
@ -386,7 +387,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
me.network.peer(*peer_id)
.dial(first, addrs, handler)
.map(|_| ())
.map_err(DialError::ConnectionLimit)
.map_err(DialError::from)
} else {
Err(DialError::NoAddresses)
};
@ -1053,16 +1054,28 @@ pub enum DialError {
/// The configured limit for simultaneous outgoing connections
/// has been reached.
ConnectionLimit(ConnectionLimit),
/// The address given for dialing is invalid.
InvalidAddress(Multiaddr),
/// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
/// for the peer to dial.
NoAddresses
}
impl From<network::DialError> for DialError {
fn from(err: network::DialError) -> DialError {
match err {
network::DialError::ConnectionLimit(l) => DialError::ConnectionLimit(l),
network::DialError::InvalidAddress(a) => DialError::InvalidAddress(a),
}
}
}
impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::InvalidAddress(a) => write!(f, "Dial error: invalid address: {}", a),
DialError::Banned => write!(f, "Dial error: peer is banned.")
}
}
@ -1072,6 +1085,7 @@ impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::InvalidAddress(_) => None,
DialError::NoAddresses => None,
DialError::Banned => None
}

View File

@ -1,3 +1,7 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
# 0.27.1 [2021-01-27]
- Ensure read buffers are initialised.

View File

@ -2,7 +2,7 @@
name = "libp2p-deflate"
edition = "2018"
description = "Deflate encryption protocol for libp2p"
version = "0.27.1"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
flate2 = "1.0"
[dev-dependencies]

View File

@ -1,5 +1,9 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
- Add support for resolving `/dnsaddr` addresses.
- Use `trust-dns-resolver`, removing the internal thread pool and
expanding the configurability of `libp2p-dns` by largely exposing the
configuration of `trust-dns-resolver`.

View File

@ -10,11 +10,12 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"
trust-dns-resolver = { version = "0.20", default-features = false, features = ["system-config"] }
async-std-resolver = { version = "0.20", optional = true }
smallvec = "1.6"
[dev-dependencies]
env_logger = "0.6"

View File

@ -24,10 +24,11 @@
//! [`DnsConfig`] and `TokioDnsConfig` for use with `async-std` and `tokio`,
//! respectively.
//!
//! A [`GenDnsConfig`] is a [`Transport`] wrapper that is created around
//! A [`GenDnsConfig`] is an address-rewriting [`Transport`] wrapper around
//! an inner `Transport`. The composed transport behaves like the inner
//! transport, except that [`Transport::dial`] resolves `/dns`, `/dns4/` and
//! `/dns6/` components of a given `Multiaddr` through a DNS.
//! transport, except that [`Transport::dial`] resolves `/dns/...`, `/dns4/...`,
//! `/dns6/...` and `/dnsaddr/...` components of the given `Multiaddr` through
//! a DNS, replacing them with the resolved protocols (typically TCP/IP).
//!
//! The `async-std` feature and hence the `DnsConfig` are
//! enabled by default. Tokio users can furthermore opt-in
@ -37,14 +38,14 @@
//!
//![trust-dns-resolver]: https://docs.rs/trust-dns-resolver/latest/trust_dns_resolver/#dns-over-tls-and-dns-over-https
use futures::{prelude::*, future::BoxFuture, stream::FuturesOrdered};
use futures::{prelude::*, future::BoxFuture};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{TransportError, ListenerEvent}
};
use log::{debug, trace};
use std::{error, fmt, net::IpAddr};
use smallvec::SmallVec;
use std::{borrow::Cow, convert::TryFrom, error, fmt, iter, net::IpAddr, str};
#[cfg(any(feature = "async-std", feature = "tokio"))]
use std::io;
#[cfg(any(feature = "async-std", feature = "tokio"))]
@ -62,6 +63,24 @@ use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider};
pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
/// The prefix for `dnsaddr` protocol TXT record lookups.
const DNSADDR_PREFIX: &'static str = "_dnsaddr.";
/// The maximum number of dialing attempts to resolved addresses.
const MAX_DIAL_ATTEMPTS: usize = 16;
/// The maximum number of DNS lookups when dialing.
///
/// This limit is primarily a safeguard against too many, possibly
/// even cyclic, indirections in the addresses obtained from the
/// TXT records of a `/dnsaddr`.
const MAX_DNS_LOOKUPS: usize = 32;
/// The maximum number of TXT records applicable for the address
/// being dialed that are considered for further lookups as a
/// result of a single `/dnsaddr` lookup.
const MAX_TXT_RECORDS: usize = 16;
/// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses
/// using `async-std` for all async I/O.
#[cfg(feature = "async-std")]
@ -137,7 +156,7 @@ where
impl<T, C, P> Transport for GenDnsConfig<T, C, P>
where
T: Transport + Send + 'static,
T: Transport + Clone + Send + 'static,
T::Error: Send,
T::Dial: Send,
C: DnsHandle<Error = ResolveError>,
@ -171,44 +190,120 @@ where
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// Check if there are any domain names in the address. If not, proceed
// straight away with dialing on the underlying transport.
if !addr.iter().any(|p| match p {
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => true,
_ => false
}) {
trace!("Pass-through address without DNS: {}", addr);
let inner_dial = self.inner.dial(addr)
.map_err(|err| err.map(DnsErr::Transport))?;
return Ok(inner_dial.map_err::<_, fn(_) -> _>(DnsErr::Transport).left_future());
}
// Asynchronlously resolve all DNS names in the address before proceeding
// with dialing on the underlying transport.
Ok(async move {
let resolver = self.resolver;
let inner = self.inner;
trace!("Resolving DNS: {}", addr);
let mut last_err = None;
let mut dns_lookups = 0;
let mut dial_attempts = 0;
// We optimise for the common case of a single DNS component
// in the address that is resolved with a single lookup.
let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
unresolved.push(addr.clone());
let resolved = addr.into_iter()
.map(|proto| resolve(proto, &resolver))
.collect::<FuturesOrdered<_>>()
.collect::<Vec<Result<Protocol<'_>, Self::Error>>>()
.await
.into_iter()
.collect::<Result<Vec<Protocol<'_>>, Self::Error>>()?
.into_iter()
.collect::<Multiaddr>();
// Resolve (i.e. replace) all DNS protocol components, initiating
// dialing attempts as soon as there is another fully resolved
// address.
while let Some(addr) = unresolved.pop() {
if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| match p {
Protocol::Dns(_) |
Protocol::Dns4(_) |
Protocol::Dns6(_) |
Protocol::Dnsaddr(_) => true,
_ => false
}) {
if dns_lookups == MAX_DNS_LOOKUPS {
log::debug!("Too many DNS lookups. Dropping unresolved {}.", addr);
last_err = Some(DnsErr::TooManyLookups);
// There may still be fully resolved addresses in `unresolved`,
// so keep going until `unresolved` is empty.
continue
}
dns_lookups += 1;
match resolve(&name, &resolver).await {
Err(e) => {
if unresolved.is_empty() {
return Err(e)
}
// If there are still unresolved addresses, there is
// a chance of success, but we track the last error.
last_err = Some(e);
}
Ok(Resolved::One(ip)) => {
log::trace!("Resolved {} -> {}", name, ip);
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
Ok(Resolved::Many(ips)) => {
for ip in ips {
log::trace!("Resolved {} -> {}", name, ip);
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
}
Ok(Resolved::Addrs(addrs)) => {
let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
let prefix = addr.iter().take(i).collect::<Multiaddr>();
let mut n = 0;
for a in addrs {
if a.ends_with(&suffix) {
if n < MAX_TXT_RECORDS {
n += 1;
log::trace!("Resolved {} -> {}", name, a);
let addr = prefix.iter().chain(a.iter()).collect::<Multiaddr>();
unresolved.push(addr);
} else {
log::debug!("Too many TXT records. Dropping resolved {}.", a);
}
}
}
}
}
} else {
// We have a fully resolved address, so try to dial it.
log::debug!("Dialing {}", addr);
debug!("DNS resolved: {} => {}", addr, resolved);
let transport = inner.clone();
let result = match transport.dial(addr) {
Ok(out) => {
// We only count attempts that the inner transport
// actually accepted, i.e. for which it produced
// a dialing future.
dial_attempts += 1;
out.await.map_err(DnsErr::Transport)
}
Err(TransportError::MultiaddrNotSupported(a)) =>
Err(DnsErr::MultiaddrNotSupported(a)),
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err))
};
match inner.dial(resolved) {
Ok(out) => out.await.map_err(DnsErr::Transport),
Err(TransportError::MultiaddrNotSupported(a)) =>
Err(DnsErr::MultiaddrNotSupported(a)),
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err))
match result {
Ok(out) => return Ok(out),
Err(err) => {
log::debug!("Dial error: {:?}.", err);
if unresolved.is_empty() {
return Err(err)
}
if dial_attempts == MAX_DIAL_ATTEMPTS {
log::debug!("Aborting dialing after {} attempts.", MAX_DIAL_ATTEMPTS);
return Err(err)
}
last_err = Some(err);
}
}
}
}
// At this point, if there was at least one failed dialing
// attempt, return that error. Otherwise there were no valid DNS records
// for the given address to begin with (i.e. DNS lookups succeeded but
// produced no records relevant for the given `addr`).
Err(last_err.unwrap_or_else(||
DnsErr::ResolveError(
ResolveErrorKind::Message("No matching records found.").into())))
}.boxed().right_future())
}
@ -226,6 +321,13 @@ pub enum DnsErr<TErr> {
ResolveError(ResolveError),
/// DNS resolution was successful, but the underlying transport refused the resolved address.
MultiaddrNotSupported(Multiaddr),
/// DNS resolution involved too many lookups.
///
/// DNS resolution on dialing performs up to 32 DNS lookups. If these
/// are not sufficient to obtain a fully-resolved address, this error
/// is returned and the DNS records for the domain(s) being dialed
/// should be investigated.
TooManyLookups,
}
impl<TErr> fmt::Display for DnsErr<TErr>
@ -236,6 +338,7 @@ where TErr: fmt::Display
DnsErr::Transport(err) => write!(f, "{}", err),
DnsErr::ResolveError(err) => write!(f, "{}", err),
DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a),
DnsErr::TooManyLookups => write!(f, "Too many DNS lookups"),
}
}
}
@ -248,14 +351,33 @@ where TErr: error::Error + 'static
DnsErr::Transport(err) => Some(err),
DnsErr::ResolveError(err) => Some(err),
DnsErr::MultiaddrNotSupported(_) => None,
DnsErr::TooManyLookups => None,
}
}
}
/// Asynchronously resolves the domain name of a `Dns`, `Dns4` or `Dns6` protocol
/// component. If the given protocol is not a DNS component, it is returned unchanged.
fn resolve<'a, E: 'a, C, P>(proto: Protocol<'a>, resolver: &'a AsyncResolver<C,P>)
-> impl Future<Output = Result<Protocol<'a>, DnsErr<E>>> + 'a
/// The successful outcome of [`resolve`] for a given [`Protocol`].
enum Resolved<'a> {
/// The given `Protocol` has been resolved to a single `Protocol`,
/// which may be identical to the one given, in case it is not
/// a DNS protocol component.
One(Protocol<'a>),
/// The given `Protocol` has been resolved to multiple alternative
/// `Protocol`s as a result of a DNS lookup.
Many(Vec<Protocol<'a>>),
/// The given `Protocol` has been resolved to a new list of `Multiaddr`s
/// obtained from DNS TXT records representing possible alternatives.
/// These addresses may contain further DNS names that need resolving.
Addrs(Vec<Multiaddr>),
}
/// Asynchronously resolves the domain name of a `Dns`, `Dns4`, `Dns6` or `Dnsaddr` protocol
/// component. If the given protocol is of a different type, it is returned unchanged as a
/// [`Resolved::One`].
fn resolve<'a, E: 'a + Send, C, P>(
proto: &Protocol<'a>,
resolver: &'a AsyncResolver<C,P>,
) -> BoxFuture<'a, Result<Resolved<'a>, DnsErr<E>>>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
@ -263,39 +385,105 @@ where
match proto {
Protocol::Dns(ref name) => {
resolver.lookup_ip(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter()
.next()
.map(Protocol::from)
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
Err(e) => return Err(DnsErr::ResolveError(e))
}).left_future()
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one).chain(iter::once(two))
.chain(ips)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(one)))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}
Protocol::Dns4(ref name) => {
resolver.ipv4_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter()
.map(IpAddr::from)
.next()
.map(Protocol::from)
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
Err(e) => return Err(DnsErr::ResolveError(e))
}).left_future().left_future().right_future()
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one).chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}
Protocol::Dns6(ref name) => {
resolver.ipv6_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => Ok(ips.into_iter()
.map(IpAddr::from)
.next()
.map(Protocol::from)
.expect("If there are no results, `Err(NoRecordsFound)` is expected.")),
Err(e) => return Err(DnsErr::ResolveError(e))
}).right_future().left_future().right_future()
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one).chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
},
proto => future::ready(Ok(proto)).right_future().right_future()
Protocol::Dnsaddr(ref name) => {
let name = Cow::Owned([DNSADDR_PREFIX, name].concat());
resolver.txt_lookup(fqdn(&name)).map(move |res| match res {
Ok(txts) => {
let mut addrs = Vec::new();
for txt in txts {
if let Some(chars) = txt.txt_data().first() {
match parse_dnsaddr_txt(chars) {
Err(e) => {
// Skip over seemingly invalid entries.
log::debug!("Invalid TXT record: {:?}", e);
}
Ok(a) => {
addrs.push(a);
}
}
}
}
Ok(Resolved::Addrs(addrs))
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}
proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed()
}
}
fn fqdn(name: &std::borrow::Cow<'_, str>) -> String {
if name.ends_with(".") {
/// Parses a `<character-string>` of a `dnsaddr` TXT record.
fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
let s = str::from_utf8(txt).map_err(invalid_data)?;
match s.strip_prefix("dnsaddr=") {
None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?)
}
}
fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, e)
}
fn fqdn(name: &Cow<'_, str>) -> String {
if name.ends_with('.') {
name.to_string()
} else {
format!("{}.", name)
@ -308,6 +496,7 @@ mod tests {
use futures::{future::BoxFuture, stream::BoxStream};
use libp2p_core::{
Transport,
PeerId,
multiaddr::{Protocol, Multiaddr},
transport::ListenerEvent,
transport::TransportError,
@ -332,17 +521,12 @@ mod tests {
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = addr.iter().collect::<Vec<_>>();
assert_eq!(addr.len(), 2);
match addr[1] {
Protocol::Tcp(_) => (),
_ => panic!(),
};
match addr[0] {
Protocol::Ip4(_) => (),
Protocol::Ip6(_) => (),
_ => panic!(),
};
// Check that all DNS components have been resolved, i.e. replaced.
assert!(!addr.iter().any(|p| match p {
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
=> true,
_ => false,
}));
Ok(Box::pin(future::ready(Ok(()))))
}
@ -383,6 +567,37 @@ mod tests {
.await
.unwrap();
// Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io.
let _ = transport
.clone()
.dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap())
.unwrap()
.await
.unwrap();
// Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io having
// an entry with suffix `/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN`,
// i.e. a bootnode with such a peer ID.
let _ = transport
.clone()
.dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap())
.unwrap()
.await
.unwrap();
// Failure due to the DNS TXT records at _dnsaddr.libp2p.io not having
// an entry with a random `p2p` suffix.
match transport
.clone()
.dial(format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random()).parse().unwrap())
.unwrap()
.await
{
Err(DnsErr::ResolveError(_)) => {},
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => panic!("Unexpected success.")
}
// Failure due to no records.
match transport
.clone()
@ -401,19 +616,27 @@ mod tests {
#[cfg(feature = "async-std")]
{
// Be explicit about the resolver used. At least on github CI, TXT
// type record lookups may not work with the system DNS resolver.
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
async_std_crate::task::block_on(
DnsConfig::system(CustomTransport).then(|dns| run(dns.unwrap()))
DnsConfig::custom(CustomTransport, config, opts).then(|dns| run(dns.unwrap()))
);
}
#[cfg(feature = "tokio")]
{
// Be explicit about the resolver used. At least on github CI, TXT
// type record lookups may not work with the system DNS resolver.
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
let rt = tokio_crate::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
rt.block_on(run(TokioDnsConfig::system(CustomTransport).unwrap()));
rt.block_on(run(TokioDnsConfig::custom(CustomTransport, config, opts).unwrap()));
}
}
}

View File

@ -1,3 +1,7 @@
# 0.30.0 [unreleased]
- Update `libp2p-core`.
# 0.29.0 [2021-01-12]
- Update dependencies.

View File

@ -1,7 +1,7 @@
[package]
name = "libp2p-noise"
description = "Cryptographic handshake protocol using the noise framework."
version = "0.29.0"
version = "0.30.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -12,7 +12,7 @@ bytes = "1"
curve25519-dalek = "3.0.0"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4"
prost = "0.7"
rand = "0.7.2"

View File

@ -1,3 +1,7 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
# 0.27.1 [2021-02-15]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-plaintext"
edition = "2018"
description = "Plaintext encryption dummy protocol for libp2p"
version = "0.27.1"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
bytes = "1"
futures = "0.3.1"
asynchronous-codec = "0.6"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.8"
prost = "0.7"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }

View File

@ -51,7 +51,7 @@ pub struct Remote {
}
impl HandshakeContext<Local> {
fn new(config: PlainText2Config) -> Result<Self, PlainTextError> {
fn new(config: PlainText2Config) -> Self {
let exchange = Exchange {
id: Some(config.local_public_key.clone().into_peer_id().to_bytes()),
pubkey: Some(config.local_public_key.clone().into_protobuf_encoding())
@ -59,12 +59,12 @@ impl HandshakeContext<Local> {
let mut buf = Vec::with_capacity(exchange.encoded_len());
exchange.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
Ok(Self {
Self {
config,
state: Local {
exchange_bytes: buf
}
})
}
}
fn with_remote(self, exchange_bytes: BytesMut)
@ -119,7 +119,7 @@ where
let mut framed_socket = Framed::new(socket, UviBytes::default());
trace!("starting handshake");
let context = HandshakeContext::new(config)?;
let context = HandshakeContext::new(config);
trace!("sending exchange to remote");
framed_socket.send(BytesMut::from(&context.state.exchange_bytes[..])).await?;

View File

@ -1,4 +1,8 @@
# 0.27.2 [unreleased]
# 0.28.0 [unreleased]
- Update `libp2p-core`.
- Permit `/p2p` addresses.
- Update to `if-watch-0.2`.

View File

@ -2,7 +2,7 @@
name = "libp2p-tcp"
edition = "2018"
description = "TCP/IP transport protocol for libp2p"
version = "0.27.2"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -17,7 +17,7 @@ if-watch = { version = "0.2.0", optional = true }
if-addrs = { version = "0.6.4", optional = true }
ipnet = "2.0.0"
libc = "0.2.80"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.11"
socket2 = { version = "0.4.0", features = ["all"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net"], optional = true }

View File

@ -379,7 +379,7 @@ where
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) {
let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) {
sa
} else {
return Err(TransportError::MultiaddrNotSupported(addr));
@ -390,7 +390,7 @@ where
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
return Err(TransportError::MultiaddrNotSupported(addr));
}
@ -653,21 +653,34 @@ where
}
}
// This type of logic should probably be moved into the multiaddr package
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
let mut iter = addr.iter();
let proto1 = iter.next().ok_or(())?;
let proto2 = iter.next().ok_or(())?;
if iter.next().is_some() {
return Err(());
}
match (proto1, proto2) {
(Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
(Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
_ => Err(()),
/// Extracts a `SocketAddr` from a given `Multiaddr`.
///
/// Fails if the given `Multiaddr` does not begin with an IP
/// protocol encapsulating a TCP port.
fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
// "Pop" the IP address and TCP port from the end of the address,
// ignoring a `/p2p/...` suffix as well as any prefix of possibly
// outer protocols, if present.
let mut port = None;
while let Some(proto) = addr.pop() {
match proto {
Protocol::Ip4(ipv4) => match port {
Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
None => return Err(())
},
Protocol::Ip6(ipv6) => match port {
Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
None => return Err(())
},
Protocol::Tcp(portnum) => match port {
Some(_) => return Err(()),
None => { port = Some(portnum) }
}
Protocol::P2p(_) => {}
_ => return Err(())
}
}
Err(())
}
// Create a [`Multiaddr`] from the given IP address and port number.
@ -687,12 +700,12 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
assert!(
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
.is_err()
);
assert_eq!(
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
12345,
@ -700,7 +713,7 @@ mod tests {
);
assert_eq!(
multiaddr_to_socketaddr(
&"/ip4/255.255.255.255/tcp/8080"
"/ip4/255.255.255.255/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),
@ -710,7 +723,7 @@ mod tests {
))
);
assert_eq!(
multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
12345,
@ -718,7 +731,7 @@ mod tests {
);
assert_eq!(
multiaddr_to_socketaddr(
&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),

View File

@ -1,3 +1,9 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
- Permit `/p2p` addresses.
# 0.27.0 [2021-01-12]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-uds"
edition = "2018"
description = "Unix domain sockets transport for libp2p"
version = "0.27.0"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies]
async-std = { version = "1.6.2", optional = true }
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"
tokio = { version = "1.0.1", default-features = false, features = ["net"], optional = true }

View File

@ -140,23 +140,20 @@ codegen!(
/// paths.
// This type of logic should probably be moved into the multiaddr package
fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> {
let mut iter = addr.iter();
let path = iter.next();
if iter.next().is_some() {
return Err(());
let mut protocols = addr.iter();
match protocols.next() {
Some(Protocol::Unix(ref path)) => {
let path = PathBuf::from(path.as_ref());
if !path.is_absolute() {
return Err(())
}
match protocols.next() {
None | Some(Protocol::P2p(_)) => Ok(path),
Some(_) => Err(())
}
}
_ => Err(())
}
let out: PathBuf = match path {
Some(Protocol::Unix(ref path)) => path.as_ref().into(),
_ => return Err(())
};
if !out.is_absolute() {
return Err(());
}
Ok(out)
}
#[cfg(all(test, feature = "async-std"))]

View File

@ -1,3 +1,7 @@
# 0.28.0 [unreleased]
- Update `libp2p-core`.
# 0.27.0 [2021-01-12]
- Update dependencies.

View File

@ -1,6 +1,6 @@
[package]
name = "libp2p-wasm-ext"
version = "0.27.0"
version = "0.28.0"
authors = ["Pierre Krieger <pierre.krieger1708@gmail.com>"]
edition = "2018"
description = "Allows passing in an external transport in a WASM environment"
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
js-sys = "0.3.19"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
parity-send-wrapper = "0.1.0"
wasm-bindgen = "0.2.42"
wasm-bindgen-futures = "0.4.4"

View File

@ -1,3 +1,9 @@
# 0.29.0 [unreleased]
- Update `libp2p-core`.
- Permit dialing `/p2p` addresses.
# 0.28.0 [2021-01-12]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-websocket"
edition = "2018"
description = "WebSocket transport for libp2p"
version = "0.28.0"
version = "0.29.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
futures-rustls = "0.21"
either = "1.5.3"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core" }
libp2p-core = { version = "0.28.0", path = "../../core" }
log = "0.4.8"
quicksink = "0.1"
rw-stream-sink = "0.2.0"

View File

@ -231,13 +231,11 @@ where
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// Quick sanity check of the provided Multiaddr.
if let Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) = addr.iter().last() {
// ok
} else {
debug!("{} is not a websocket multiaddr", addr);
return Err(TransportError::MultiaddrNotSupported(addr))
}
let addr = match parse_ws_dial_addr(addr) {
Ok(addr) => addr,
Err(Error::InvalidMultiaddr(a)) => return Err(TransportError::MultiaddrNotSupported(a)),
Err(e) => return Err(TransportError::Other(e)),
};
// We are looping here in order to follow redirects (if any):
let mut remaining_redirects = self.max_redirects;
@ -248,11 +246,11 @@ where
match this.dial_once(addr).await {
Ok(Either::Left(redirect)) => {
if remaining_redirects == 0 {
debug!("too many redirects");
debug!("Too many redirects (> {})", self.max_redirects);
return Err(Error::TooManyRedirects)
}
remaining_redirects -= 1;
addr = location_to_multiaddr(&redirect)?
addr = parse_ws_dial_addr(location_to_multiaddr(&redirect)?)?
}
Ok(Either::Right(conn)) => return Ok(conn),
Err(e) => return Err(e)
@ -273,46 +271,26 @@ where
T: Transport,
T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
/// Attempty to dial the given address and perform a websocket handshake.
async fn dial_once(self, address: Multiaddr) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> {
trace!("dial address: {}", address);
/// Attempts to dial the given address and perform a websocket handshake.
async fn dial_once(self, addr: WsAddress) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> {
trace!("Dialing websocket address: {:?}", addr);
let (host_port, dns_name) = host_and_dnsname(&address)?;
let mut inner_addr = address.clone();
let (use_tls, path) =
match inner_addr.pop() {
Some(Protocol::Ws(path)) => (false, path),
Some(Protocol::Wss(path)) => {
if dns_name.is_none() {
debug!("no DNS name in {}", address);
return Err(Error::InvalidMultiaddr(address))
}
(true, path)
}
_ => {
debug!("{} is not a websocket multiaddr", address);
return Err(Error::InvalidMultiaddr(address))
}
};
let dial = self.transport.dial(inner_addr)
let dial = self.transport.dial(addr.tcp_addr)
.map_err(|e| match e {
TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a),
TransportError::Other(e) => Error::Transport(e)
})?;
let stream = dial.map_err(Error::Transport).await?;
trace!("connected to {}", address);
trace!("TCP connection to {} established.", addr.host_port);
let stream =
if use_tls { // begin TLS session
let dns_name = dns_name.expect("for use_tls we have checked that dns_name is some");
trace!("starting TLS handshake with {}", address);
if addr.use_tls { // begin TLS session
let dns_name = addr.dns_name.expect("for use_tls we have checked that dns_name is some");
trace!("Starting TLS handshake with {:?}", dns_name);
let stream = self.tls_config.client.connect(dns_name.as_ref(), stream)
.map_err(|e| {
debug!("TLS handshake with {} failed: {}", address, e);
debug!("TLS handshake with {:?} failed: {}", dns_name, e);
Error::Tls(tls::Error::from(e))
})
.await?;
@ -323,9 +301,9 @@ where
EitherOutput::Second(stream)
};
trace!("sending websocket handshake request to {}", address);
trace!("Sending websocket handshake to {}", addr.host_port);
let mut client = handshake::Client::new(stream, &host_port, path.as_ref());
let mut client = handshake::Client::new(stream, &addr.host_port, addr.path.as_ref());
if self.use_deflate {
client.add_extension(Box::new(Deflate::new(connection::Mode::Client)));
@ -341,32 +319,87 @@ where
Err(Error::Handshake(msg.into()))
}
handshake::ServerResponse::Accepted { .. } => {
trace!("websocket handshake with {} successful", address);
trace!("websocket handshake with {} successful", addr.host_port);
Ok(Either::Right(Connection::new(client.into_builder())))
}
}
}
}
// Extract host, port and optionally the DNS name from the given [`Multiaddr`].
fn host_and_dnsname<T>(addr: &Multiaddr) -> Result<(String, Option<webpki::DNSName>), Error<T>> {
let mut iter = addr.iter();
match (iter.next(), iter.next()) {
(Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", ip, port), None)),
(Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", ip, port), None)),
(Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
(Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
(Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
_ => {
debug!("multi-address format not supported: {}", addr);
Err(Error::InvalidMultiaddr(addr.clone()))
#[derive(Debug)]
struct WsAddress {
host_port: String,
path: String,
dns_name: Option<webpki::DNSName>,
use_tls: bool,
tcp_addr: Multiaddr,
}
/// Tries to parse the given `Multiaddr` into a `WsAddress` used
/// for dialing.
///
/// Fails if the given `Multiaddr` does not represent a TCP/IP-based
/// websocket protocol stack.
fn parse_ws_dial_addr<T>(addr: Multiaddr) -> Result<WsAddress, Error<T>> {
// The encapsulating protocol must be based on TCP/IP, possibly via DNS.
// We peek at it in order to learn the hostname and port to use for
// the websocket handshake.
let mut protocols = addr.iter();
let mut ip = protocols.next();
let mut tcp = protocols.next();
let (host_port, dns_name) = loop {
match (ip, tcp) {
(Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)))
=> break (format!("{}:{}", ip, port), None),
(Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)))
=> break (format!("{}:{}", ip, port), None),
(Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) |
(Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) |
(Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) |
(Some(Protocol::Dnsaddr(h)), Some(Protocol::Tcp(port)))
=> break (format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned())),
(Some(_), Some(p)) => {
ip = Some(p);
tcp = protocols.next();
}
_ => return Err(Error::InvalidMultiaddr(addr))
}
}
};
// Now consume the `Ws` / `Wss` protocol from the end of the address,
// preserving the trailing `P2p` protocol that identifies the remote,
// if any.
let mut protocols = addr.clone();
let mut p2p = None;
let (use_tls, path) = loop {
match protocols.pop() {
p@Some(Protocol::P2p(_)) => { p2p = p }
Some(Protocol::Ws(path)) => break (false, path.into_owned()),
Some(Protocol::Wss(path)) => {
if dns_name.is_none() {
debug!("Missing DNS name in WSS address: {}", addr);
return Err(Error::InvalidMultiaddr(addr))
}
break (true, path.into_owned())
}
_ => return Err(Error::InvalidMultiaddr(addr))
}
};
// The original address, stripped of the `/ws` and `/wss` protocols,
// makes up the the address for the inner TCP-based transport.
let tcp_addr = match p2p {
Some(p) => protocols.with(p),
None => protocols
};
Ok(WsAddress {
host_port,
dns_name,
path,
use_tls,
tcp_addr,
})
}
// Given a location URL, build a new websocket [`Multiaddr`].

View File

@ -44,6 +44,13 @@ pub struct WsConfig<T> {
impl<T> WsConfig<T> {
/// Create a new websocket transport based on the given transport.
///
/// > **Note*: The given transport must be based on TCP/IP and should
/// > usually incorporate DNS resolution, though the latter is not
/// > strictly necessary if one wishes to only use the `Ws` protocol
/// > with known IP addresses and ports. See [`libp2p-tcp`](https://docs.rs/libp2p-tcp/)
/// > and [`libp2p-dns`](https://docs.rs/libp2p-dns) for constructing
/// > the inner transport.
pub fn new(transport: T) -> Self {
framed::WsConfig::new(transport).into()
}
@ -187,10 +194,9 @@ where
#[cfg(test)]
mod tests {
use libp2p_core::Multiaddr;
use libp2p_core::{Multiaddr, PeerId, Transport, multiaddr::Protocol};
use libp2p_tcp as tcp;
use futures::prelude::*;
use libp2p_core::{Transport, multiaddr::Protocol};
use super::WsConfig;
#[test]
@ -230,7 +236,7 @@ mod tests {
conn.await
};
let outbound = ws_config.dial(addr).unwrap();
let outbound = ws_config.dial(addr.with(Protocol::P2p(PeerId::random().into()))).unwrap();
let (a, b) = futures::join!(inbound, outbound);
a.and(b).unwrap();