core/src/transport: Add Transport::dial_as_listener (#2363)

Allows `NetworkBehaviour` implementations to dial a peer, but instruct
the dialed connection to be upgraded as if it were the listening
endpoint.

This is needed when establishing direct connections through NATs and/or
Firewalls (hole punching). When hole punching via TCP (QUIC is different
but similar) both ends dial the other at the same time resulting in a
simultaneously opened TCP connection. To disambiguate who is the dialer
and who the listener there are two options:

1. Use the Simultaneous Open Extension of Multistream Select. See
   [sim-open] specification and [sim-open-rust] Rust implementation.

2. Disambiguate the role (dialer or listener) based on the role within
   the DCUtR [dcutr] protocol. More specifically the node initiating the
   DCUtR process will act as a listener and the other as a dialer.

This commit enables (2), i.e. enables the DCUtR protocol to specify the
role used once the connection is established.

While on the positive side (2) requires one round trip less than (1), on
the negative side (2) only works for coordinated simultaneous dials.
I.e. when a simultaneous dial happens by chance, and not coordinated via
DCUtR, the connection attempt fails when only (2) is in place.

[sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md
[sim-open-rust]: https://github.com/libp2p/rust-libp2p/pull/2066
[dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md
This commit is contained in:
Max Inden
2022-01-17 16:35:14 +01:00
committed by GitHub
parent bdfbceb6ee
commit 96dbfcd1ad
43 changed files with 594 additions and 106 deletions

View File

@ -22,12 +22,24 @@
- Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408]) - Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408])
- Allow overriding role when dialing. This option is needed for NAT and firewall
hole punching.
- Add `Transport::dial_as_listener`. As `Transport::dial` but
overrides the role of the local node on the connection . I.e. has the
local node act as a listener on the outgoing connection.
- Add `override_role` option to `DialOpts`.
See [PR 2363].
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350
[PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352 [PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352
[PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392 [PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392
[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408 [PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408
[PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363
# 0.30.1 [2021-11-16] # 0.30.1 [2021-11-16]

View File

@ -97,7 +97,10 @@ pub enum PendingPoint {
/// There is no single address associated with the Dialer of a pending /// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial /// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known. /// is successful is the address of the connection known.
Dialer, Dialer {
/// Same as [`ConnectedPoint::Dialer`] `role_override`.
role_override: Endpoint,
},
/// The socket comes from a listener. /// The socket comes from a listener.
Listener { Listener {
/// Local connection address. /// Local connection address.
@ -110,7 +113,7 @@ pub enum PendingPoint {
impl From<ConnectedPoint> for PendingPoint { impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self { fn from(endpoint: ConnectedPoint) -> Self {
match endpoint { match endpoint {
ConnectedPoint::Dialer { .. } => PendingPoint::Dialer, ConnectedPoint::Dialer { role_override, .. } => PendingPoint::Dialer { role_override },
ConnectedPoint::Listener { ConnectedPoint::Listener {
local_addr, local_addr,
send_back_addr, send_back_addr,
@ -129,6 +132,27 @@ pub enum ConnectedPoint {
Dialer { Dialer {
/// Multiaddress that was successfully dialed. /// Multiaddress that was successfully dialed.
address: Multiaddr, address: Multiaddr,
/// Whether the role of the local node on the connection should be
/// overriden. I.e. whether the local node should act as a listener on
/// the outgoing connection.
///
/// This option is needed for NAT and firewall hole punching.
///
/// - [`Endpoint::Dialer`] represents the default non-overriding option.
///
/// - [`Endpoint::Listener`] represents the overriding option.
/// Realization depends on the transport protocol. E.g. in the case of
/// TCP, both endpoints dial each other, resulting in a _simultaneous
/// open_ TCP connection. On this new connection both endpoints assume
/// to be the dialer of the connection. This is problematic during the
/// connection upgrade process where an upgrade assumes one side to be
/// the listener. With the help of this option, both peers can
/// negotiate the roles (dialer and listener) for the new connection
/// ahead of time, through some external channel, e.g. the DCUtR
/// protocol, and thus have one peer dial the other and upgrade the
/// connection as a dialer and one peer dial the other and upgrade the
/// connection _as a listener_ overriding its role.
role_override: Endpoint,
}, },
/// We received the node. /// We received the node.
Listener { Listener {
@ -179,7 +203,10 @@ impl ConnectedPoint {
/// Returns true if the connection is relayed. /// Returns true if the connection is relayed.
pub fn is_relayed(&self) -> bool { pub fn is_relayed(&self) -> bool {
match self { match self {
ConnectedPoint::Dialer { address } => address, ConnectedPoint::Dialer {
address,
role_override: _,
} => address,
ConnectedPoint::Listener { local_addr, .. } => local_addr, ConnectedPoint::Listener { local_addr, .. } => local_addr,
} }
.iter() .iter()
@ -194,7 +221,7 @@ impl ConnectedPoint {
/// not be usable to establish new connections. /// not be usable to establish new connections.
pub fn get_remote_address(&self) -> &Multiaddr { pub fn get_remote_address(&self) -> &Multiaddr {
match self { match self {
ConnectedPoint::Dialer { address } => address, ConnectedPoint::Dialer { address, .. } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
} }
} }
@ -204,7 +231,7 @@ impl ConnectedPoint {
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`. /// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) { pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self { match self {
ConnectedPoint::Dialer { address } => *address = new_address, ConnectedPoint::Dialer { address, .. } => *address = new_address,
ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address, ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
} }
} }

View File

@ -491,6 +491,13 @@ mod tests {
panic!() panic!()
} }
fn dial_as_listener(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None None
} }
@ -542,6 +549,13 @@ mod tests {
panic!() panic!()
} }
fn dial_as_listener(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None None
} }

View File

@ -22,8 +22,8 @@
use crate::{ use crate::{
connection::{ connection::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo, Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, Endpoint,
IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError, IncomingInfo, IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError, PendingPoint, Substream, PendingOutboundConnectionError, PendingPoint, Substream,
}, },
muxing::StreamMuxer, muxing::StreamMuxer,
@ -460,7 +460,7 @@ where
local_addr, local_addr,
send_back_addr, send_back_addr,
}), }),
PendingPoint::Dialer => None, PendingPoint::Dialer { .. } => None,
}) })
} }
@ -535,6 +535,7 @@ where
addresses: impl Iterator<Item = Multiaddr> + Send + 'static, addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>, peer: Option<PeerId>,
handler: THandler, handler: THandler,
role_override: Endpoint,
dial_concurrency_factor_override: Option<NonZeroU8>, dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, DialError<THandler>> ) -> Result<ConnectionId, DialError<THandler>>
where where
@ -550,6 +551,7 @@ where
peer, peer,
addresses, addresses,
dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor), dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
role_override,
); );
let connection_id = self.next_connection_id(); let connection_id = self.next_connection_id();
@ -566,13 +568,15 @@ where
.boxed(), .boxed(),
); );
self.counters.inc_pending(&PendingPoint::Dialer); let endpoint = PendingPoint::Dialer { role_override };
self.counters.inc_pending(&endpoint);
self.pending.insert( self.pending.insert(
connection_id, connection_id,
PendingConnectionInfo { PendingConnectionInfo {
peer_id: peer, peer_id: peer,
handler, handler,
endpoint: PendingPoint::Dialer, endpoint: endpoint,
_drop_notifier: drop_notifier, _drop_notifier: drop_notifier,
}, },
); );
@ -745,9 +749,13 @@ where
self.counters.dec_pending(&endpoint); self.counters.dec_pending(&endpoint);
let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) { let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
(PendingPoint::Dialer, Some((address, errors))) => { (PendingPoint::Dialer { role_override }, Some((address, errors))) => (
(ConnectedPoint::Dialer { address }, Some(errors)) ConnectedPoint::Dialer {
} address,
role_override,
},
Some(errors),
),
( (
PendingPoint::Listener { PendingPoint::Listener {
local_addr, local_addr,
@ -761,7 +769,7 @@ where
}, },
None, None,
), ),
(PendingPoint::Dialer, None) => unreachable!( (PendingPoint::Dialer { .. }, None) => unreachable!(
"Established incoming connection via pending outgoing connection." "Established incoming connection via pending outgoing connection."
), ),
(PendingPoint::Listener { .. }, Some(_)) => unreachable!( (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
@ -910,7 +918,7 @@ where
self.counters.dec_pending(&endpoint); self.counters.dec_pending(&endpoint);
match (endpoint, error) { match (endpoint, error) {
(PendingPoint::Dialer, Either::Left(error)) => { (PendingPoint::Dialer { .. }, Either::Left(error)) => {
return Poll::Ready(PoolEvent::PendingOutboundConnectionError { return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
id, id,
error, error,
@ -933,7 +941,7 @@ where
local_addr, local_addr,
}); });
} }
(PendingPoint::Dialer, Either::Right(_)) => { (PendingPoint::Dialer { .. }, Either::Right(_)) => {
unreachable!("Inbound error for outbound connection.") unreachable!("Inbound error for outbound connection.")
} }
(PendingPoint::Listener { .. }, Either::Left(_)) => { (PendingPoint::Listener { .. }, Either::Left(_)) => {
@ -1176,7 +1184,7 @@ impl ConnectionCounters {
fn inc_pending(&mut self, endpoint: &PendingPoint) { fn inc_pending(&mut self, endpoint: &PendingPoint) {
match endpoint { match endpoint {
PendingPoint::Dialer => { PendingPoint::Dialer { .. } => {
self.pending_outgoing += 1; self.pending_outgoing += 1;
} }
PendingPoint::Listener { .. } => { PendingPoint::Listener { .. } => {
@ -1191,7 +1199,7 @@ impl ConnectionCounters {
fn dec_pending(&mut self, endpoint: &PendingPoint) { fn dec_pending(&mut self, endpoint: &PendingPoint) {
match endpoint { match endpoint {
PendingPoint::Dialer => { PendingPoint::Dialer { .. } => {
self.pending_outgoing -= 1; self.pending_outgoing -= 1;
} }
PendingPoint::Listener { .. } => { PendingPoint::Listener { .. } => {

View File

@ -18,9 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
pub use crate::connection::{ConnectionCounters, ConnectionLimits};
use crate::{ use crate::{
connection::Endpoint,
transport::{Transport, TransportError}, transport::{Transport, TransportError},
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
@ -63,14 +62,21 @@ where
peer: Option<PeerId>, peer: Option<PeerId>,
addresses: impl Iterator<Item = Multiaddr> + Send + 'static, addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
concurrency_factor: NonZeroU8, concurrency_factor: NonZeroU8,
role_override: Endpoint,
) -> Self { ) -> Self {
let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) { let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) {
Ok(address) => match transport.clone().dial(address.clone()) { Ok(address) => {
let dial = match role_override {
Endpoint::Dialer => transport.clone().dial(address.clone()),
Endpoint::Listener => transport.clone().dial_as_listener(address.clone()),
};
match dial {
Ok(fut) => fut Ok(fut) => fut
.map(|r| (address, r.map_err(|e| TransportError::Other(e)))) .map(|r| (address, r.map_err(|e| TransportError::Other(e))))
.boxed(), .boxed(),
Err(err) => futures::future::ready((address, Err(err))).boxed(), Err(err) => futures::future::ready((address, Err(err))).boxed(),
}, }
}
Err(address) => futures::future::ready(( Err(address) => futures::future::ready((
address.clone(), address.clone(),
Err(TransportError::MultiaddrNotSupported(address)), Err(TransportError::MultiaddrNotSupported(address)),

View File

@ -529,6 +529,25 @@ where
} }
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized,
{
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.dial_as_listener(addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
},
EitherTransport::Right(b) => match b.dial_as_listener(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
},
}
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
match self { match self {
EitherTransport::Left(a) => a.address_translation(server, observed), EitherTransport::Left(a) => a.address_translation(server, observed),

View File

@ -21,7 +21,7 @@
mod event; mod event;
pub mod peer; pub mod peer;
pub use crate::connection::{ConnectionCounters, ConnectionLimits}; pub use crate::connection::{ConnectionCounters, ConnectionLimits, Endpoint};
pub use event::{IncomingConnection, NetworkEvent}; pub use event::{IncomingConnection, NetworkEvent};
pub use peer::Peer; pub use peer::Peer;
@ -97,7 +97,7 @@ where
self.pool self.pool
.iter_pending_info() .iter_pending_info()
.filter(move |(_, endpoint, peer_id)| { .filter(move |(_, endpoint, peer_id)| {
matches!(endpoint, PendingPoint::Dialer) && peer_id.as_ref() == Some(&peer) matches!(endpoint, PendingPoint::Dialer { .. }) && peer_id.as_ref() == Some(&peer)
}) })
.map(|(connection_id, _, _)| connection_id) .map(|(connection_id, _, _)| connection_id)
} }
@ -206,19 +206,24 @@ where
{ {
let opts = opts.into(); let opts = opts.into();
let (peer_id, addresses, dial_concurrency_factor_override) = match opts.0 { let (peer_id, addresses, dial_concurrency_factor_override, role_override) = match opts.0 {
// Dial a known peer. // Dial a known peer.
Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses {
peer_id, peer_id,
addresses, addresses,
dial_concurrency_factor_override, dial_concurrency_factor_override,
role_override,
}) => ( }) => (
Some(peer_id), Some(peer_id),
Either::Left(addresses.into_iter()), Either::Left(addresses.into_iter()),
dial_concurrency_factor_override, dial_concurrency_factor_override,
role_override,
), ),
// Dial an unknown peer. // Dial an unknown peer.
Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { address }) => { Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress {
address,
role_override,
}) => {
// If the address ultimately encapsulates an expected peer ID, dial that peer // If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol // such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p` // from the address, because it may be used by the `Transport`, i.e. `P2p`
@ -239,7 +244,12 @@ where
Err(_) => return Err(DialError::InvalidPeerId { handler }), Err(_) => return Err(DialError::InvalidPeerId { handler }),
}; };
(peer_id, Either::Right(std::iter::once(address)), None) (
peer_id,
Either::Right(std::iter::once(address)),
None,
role_override,
)
} }
}; };
@ -248,6 +258,7 @@ where
addresses, addresses,
peer_id, peer_id,
handler, handler,
role_override,
dial_concurrency_factor_override, dial_concurrency_factor_override,
) )
} }
@ -284,7 +295,7 @@ where
pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> { pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool self.pool
.iter_pending_info() .iter_pending_info()
.filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer)) .filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer { .. }))
.filter_map(|(_, _, peer)| peer.as_ref()) .filter_map(|(_, _, peer)| peer.as_ref())
} }
@ -627,6 +638,7 @@ impl WithPeerId {
peer_id: self.peer_id, peer_id: self.peer_id,
addresses, addresses,
dial_concurrency_factor_override: Default::default(), dial_concurrency_factor_override: Default::default(),
role_override: Endpoint::Dialer,
} }
} }
} }
@ -636,6 +648,7 @@ pub struct WithPeerIdWithAddresses {
pub(crate) peer_id: PeerId, pub(crate) peer_id: PeerId,
pub(crate) addresses: Vec<Multiaddr>, pub(crate) addresses: Vec<Multiaddr>,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>, pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
pub(crate) role_override: Endpoint,
} }
impl WithPeerIdWithAddresses { impl WithPeerIdWithAddresses {
@ -645,6 +658,17 @@ impl WithPeerIdWithAddresses {
self self
} }
/// Override role of local node on connection. I.e. execute the dial _as a
/// listener_.
///
/// See
/// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer)
/// for details.
pub fn override_role(mut self, role: Endpoint) -> Self {
self.role_override = role;
self
}
/// Build the final [`DialOpts`]. /// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts { pub fn build(self) -> DialOpts {
DialOpts(Opts::WithPeerIdWithAddresses(self)) DialOpts(Opts::WithPeerIdWithAddresses(self))
@ -657,16 +681,31 @@ pub struct WithoutPeerId {}
impl WithoutPeerId { impl WithoutPeerId {
/// Specify a single address to dial the unknown peer. /// Specify a single address to dial the unknown peer.
pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress { pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress {
WithoutPeerIdWithAddress { address } WithoutPeerIdWithAddress {
address,
role_override: Endpoint::Dialer,
}
} }
} }
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub struct WithoutPeerIdWithAddress { pub struct WithoutPeerIdWithAddress {
pub(crate) address: Multiaddr, pub(crate) address: Multiaddr,
pub(crate) role_override: Endpoint,
} }
impl WithoutPeerIdWithAddress { impl WithoutPeerIdWithAddress {
/// Override role of local node on connection. I.e. execute the dial _as a
/// listener_.
///
/// See
/// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer)
/// for details.
pub fn override_role(mut self, role: Endpoint) -> Self {
self.role_override = role;
self
}
/// Build the final [`DialOpts`]. /// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts { pub fn build(self) -> DialOpts {
DialOpts(Opts::WithoutPeerIdWithAddress(self)) DialOpts(Opts::WithoutPeerIdWithAddress(self))

View File

@ -25,7 +25,7 @@
//! any desired protocols. The rest of the module defines combinators for //! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades. //! modifying a transport through composition with other transports or protocol upgrades.
use crate::ConnectedPoint; use crate::connection::ConnectedPoint;
use futures::prelude::*; use futures::prelude::*;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use std::{error::Error, fmt}; use std::{error::Error, fmt};
@ -130,6 +130,15 @@ pub trait Transport {
where where
Self: Sized; Self: Sized;
/// As [`Transport::dial`] but has the local node act as a listener on the outgoing connection.
///
/// This option is needed for NAT and firewall hole punching.
///
/// See [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) for related option.
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized;
/// Performs a transport-specific mapping of an address `observed` by /// Performs a transport-specific mapping of an address `observed` by
/// a remote onto a local `listen` address to yield an address for /// a remote onto a local `listen` address to yield an address for
/// the local node that may be reachable for other peers. /// the local node that may be reachable for other peers.

View File

@ -19,9 +19,9 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
connection::{ConnectedPoint, Endpoint},
either::EitherError, either::EitherError,
transport::{ListenerEvent, Transport, TransportError}, transport::{ListenerEvent, Transport, TransportError},
ConnectedPoint,
}; };
use futures::{future::Either, prelude::*}; use futures::{future::Either, prelude::*};
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
@ -76,7 +76,32 @@ where
.map_err(|err| err.map(EitherError::A))?; .map_err(|err| err.map(EitherError::A))?;
let future = AndThenFuture { let future = AndThenFuture {
inner: Either::Left(Box::pin(dialed_fut)), inner: Either::Left(Box::pin(dialed_fut)),
args: Some((self.fun, ConnectedPoint::Dialer { address: addr })), args: Some((
self.fun,
ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Dialer,
},
)),
_marker: PhantomPinned,
};
Ok(future)
}
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dialed_fut = self
.transport
.dial_as_listener(addr.clone())
.map_err(|err| err.map(EitherError::A))?;
let future = AndThenFuture {
inner: Either::Left(Box::pin(dialed_fut)),
args: Some((
self.fun,
ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Listener,
},
)),
_marker: PhantomPinned, _marker: PhantomPinned,
}; };
Ok(future) Ok(future)

View File

@ -52,6 +52,7 @@ type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
trait Abstract<O> { trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>; fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>; fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn dial_as_listener(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>; fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
} }
@ -85,6 +86,13 @@ where
Ok(Box::pin(fut) as Dial<_>) Ok(Box::pin(fut) as Dial<_>)
} }
fn dial_as_listener(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>> {
let fut = Transport::dial_as_listener(self.clone(), addr)
.map(|r| r.map_err(box_err))
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Transport::address_translation(self, server, observed) Transport::address_translation(self, server, observed)
} }
@ -119,6 +127,10 @@ impl<O> Transport for Boxed<O> {
self.inner.dial(addr) self.inner.dial(addr)
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial_as_listener(addr)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed) self.inner.address_translation(server, observed)
} }

View File

@ -83,6 +83,26 @@ where
Err(TransportError::MultiaddrNotSupported(addr)) Err(TransportError::MultiaddrNotSupported(addr))
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = match self.0.dial_as_listener(addr) {
Ok(connec) => return Ok(EitherFuture::First(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => {
return Err(TransportError::Other(EitherError::A(err)))
}
};
let addr = match self.1.dial_as_listener(addr) {
Ok(connec) => return Ok(EitherFuture::Second(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => {
return Err(TransportError::Other(EitherError::B(err)))
}
};
Err(TransportError::MultiaddrNotSupported(addr))
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(addr) = self.0.address_translation(server, observed) { if let Some(addr) = self.0.address_translation(server, observed) {
Some(addr) Some(addr)

View File

@ -70,6 +70,10 @@ impl<TOut> Transport for DummyTransport<TOut> {
Err(TransportError::MultiaddrNotSupported(addr)) Err(TransportError::MultiaddrNotSupported(addr))
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None None
} }

View File

@ -19,8 +19,8 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
connection::{ConnectedPoint, Endpoint},
transport::{ListenerEvent, Transport, TransportError}, transport::{ListenerEvent, Transport, TransportError},
ConnectedPoint,
}; };
use futures::prelude::*; use futures::prelude::*;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
@ -60,7 +60,22 @@ where
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial(addr.clone())?; let future = self.transport.dial(addr.clone())?;
let p = ConnectedPoint::Dialer { address: addr }; let p = ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Dialer,
};
Ok(MapFuture {
inner: future,
args: Some((self.fun, p)),
})
}
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial_as_listener(addr.clone())?;
let p = ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Listener,
};
Ok(MapFuture { Ok(MapFuture {
inner: future, inner: future,
args: Some((self.fun, p)), args: Some((self.fun, p)),

View File

@ -68,6 +68,17 @@ where
} }
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let map = self.map;
match self.transport.dial_as_listener(addr) {
Ok(future) => Ok(MapErrDial {
inner: future,
map: Some(map),
}),
Err(err) => Err(err.map(map)),
}
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed) self.transport.address_translation(server, observed)
} }

View File

@ -205,6 +205,10 @@ impl Transport for MemoryTransport {
DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable)) DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable))
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<DialFuture, TransportError<Self::Error>> {
self.dial(addr)
}
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None None
} }

View File

@ -75,6 +75,14 @@ where
} }
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
if let Some(inner) = self.0 {
inner.dial_as_listener(addr)
} else {
Err(TransportError::MultiaddrNotSupported(addr))
}
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(inner) = &self.0 { if let Some(inner) = &self.0 {
inner.address_translation(server, observed) inner.address_translation(server, observed)

View File

@ -109,6 +109,17 @@ where
}) })
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dial = self
.inner
.dial_as_listener(addr)
.map_err(|err| err.map(TransportTimeoutError::Other))?;
Ok(Timeout {
inner: dial,
timer: Delay::new(self.outgoing_timeout),
})
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed) self.inner.address_translation(server, observed)
} }

View File

@ -23,6 +23,7 @@
pub use crate::upgrade::Version; pub use crate::upgrade::Version;
use crate::{ use crate::{
connection::ConnectedPoint,
muxing::{StreamMuxer, StreamMuxerBox}, muxing::{StreamMuxer, StreamMuxerBox},
transport::{ transport::{
and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport, and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport,
@ -32,7 +33,7 @@ use crate::{
self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade, self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade,
OutboundUpgradeApply, UpgradeError, OutboundUpgradeApply, UpgradeError,
}, },
ConnectedPoint, Negotiated, PeerId, Negotiated, PeerId,
}; };
use futures::{prelude::*, ready}; use futures::{prelude::*, ready};
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
@ -340,6 +341,10 @@ where
self.0.dial(addr) self.0.dial(addr)
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.0.dial_as_listener(addr)
}
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> { fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.0.listen_on(addr) self.0.listen_on(addr)
} }
@ -393,6 +398,17 @@ where
}) })
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self
.inner
.dial_as_listener(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(DialUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade)),
})
}
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> { fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let stream = self let stream = self
.inner .inner

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError}; use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError};
use crate::{ConnectedPoint, Negotiated}; use crate::{connection::ConnectedPoint, Negotiated};
use futures::{future::Either, prelude::*}; use futures::{future::Either, prelude::*};
use log::debug; use log::debug;
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
@ -27,6 +27,7 @@ use std::{iter, mem, pin::Pin, task::Context, task::Poll};
pub use multistream_select::Version; pub use multistream_select::Version;
// TODO: Still needed?
/// Applies an upgrade to the inbound and outbound direction of a connection or substream. /// Applies an upgrade to the inbound and outbound direction of a connection or substream.
pub fn apply<C, U>( pub fn apply<C, U>(
conn: C, conn: C,
@ -38,11 +39,12 @@ where
C: AsyncRead + AsyncWrite + Unpin, C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>, U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
{ {
if cp.is_listener() { match cp {
Either::Left(apply_inbound(conn, up)) ConnectedPoint::Dialer { role_override, .. } if role_override.is_dialer() => {
} else {
Either::Right(apply_outbound(conn, up, v)) Either::Right(apply_outbound(conn, up, v))
} }
_ => Either::Left(apply_inbound(conn, up)),
}
} }
/// Tries to perform an upgrade on an inbound connection or substream. /// Tries to perform an upgrade on an inbound connection or substream.

View File

@ -120,7 +120,7 @@ fn concurrent_dialing() {
.. ..
}) => { }) => {
match connection.endpoint() { match connection.endpoint() {
ConnectedPoint::Dialer { address } => { ConnectedPoint::Dialer { address, .. } => {
assert_eq!( assert_eq!(
*address, *address,
accepted_addr accepted_addr

View File

@ -30,7 +30,7 @@ use futures_timer::Delay;
use instant::Instant; use instant::Instant;
use libp2p_core::{ use libp2p_core::{
connection::{ConnectionId, ListenerId}, connection::{ConnectionId, ListenerId},
ConnectedPoint, Multiaddr, PeerId, ConnectedPoint, Endpoint, Multiaddr, PeerId,
}; };
use libp2p_request_response::{ use libp2p_request_response::{
handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse, handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse,
@ -315,12 +315,23 @@ impl NetworkBehaviour for Behaviour {
connections.insert(*conn, addr); connections.insert(*conn, addr);
match endpoint { match endpoint {
ConnectedPoint::Dialer { address } => { ConnectedPoint::Dialer {
address,
role_override: Endpoint::Dialer,
} => {
if let Some(event) = self.as_server().on_outbound_connection(peer, address) { if let Some(event) = self.as_server().on_outbound_connection(peer, address) {
self.pending_out_events self.pending_out_events
.push_back(Event::InboundProbe(event)); .push_back(Event::InboundProbe(event));
} }
} }
ConnectedPoint::Dialer {
address: _,
role_override: Endpoint::Listener,
} => {
// Outgoing connection was dialed as a listener. In other words outgoing connection
// was dialed as part of a hole punch. `libp2p-autonat` never attempts to hole
// punch, thus this connection has not been requested by this [`NetworkBehaviour`].
}
ConnectedPoint::Listener { .. } => self.as_client().on_inbound_connection(), ConnectedPoint::Listener { .. } => self.as_client().on_inbound_connection(),
} }
} }

View File

@ -30,7 +30,7 @@ use libp2p::{
use libp2p_autonat::{ use libp2p_autonat::{
Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError, Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError,
}; };
use libp2p_core::ConnectedPoint; use libp2p_core::{ConnectedPoint, Endpoint};
use libp2p_swarm::DialError; use libp2p_swarm::DialError;
use std::{num::NonZeroU32, time::Duration}; use std::{num::NonZeroU32, time::Duration};
@ -191,7 +191,11 @@ async fn test_dial_back() {
match server.select_next_some().await { match server.select_next_some().await {
SwarmEvent::ConnectionEstablished { SwarmEvent::ConnectionEstablished {
peer_id, peer_id,
endpoint: ConnectedPoint::Dialer { address }, endpoint:
ConnectedPoint::Dialer {
address,
role_override: Endpoint::Dialer,
},
num_established, num_established,
concurrent_dial_errors, concurrent_dial_errors,
} => { } => {
@ -399,7 +403,11 @@ async fn test_dial_multiple_addr() {
match server.select_next_some().await { match server.select_next_some().await {
SwarmEvent::ConnectionEstablished { SwarmEvent::ConnectionEstablished {
peer_id, peer_id,
endpoint: ConnectedPoint::Dialer { address }, endpoint:
ConnectedPoint::Dialer {
address,
role_override: Endpoint::Dialer,
},
concurrent_dial_errors, concurrent_dial_errors,
.. ..
} => { } => {

View File

@ -38,6 +38,7 @@ mod tests {
use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::subscription_filter::WhitelistSubscriptionFilter;
use crate::transform::{DataTransform, IdentityTransform}; use crate::transform::{DataTransform, IdentityTransform};
use crate::types::FastMessageId; use crate::types::FastMessageId;
use libp2p_core::Endpoint;
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
@ -189,7 +190,10 @@ mod tests {
&peer, &peer,
&ConnectionId::new(0), &ConnectionId::new(0),
&if outbound { &if outbound {
ConnectedPoint::Dialer { address } ConnectedPoint::Dialer {
address,
role_override: Endpoint::Dialer,
}
} else { } else {
ConnectedPoint::Listener { ConnectedPoint::Listener {
local_addr: Multiaddr::empty(), local_addr: Multiaddr::empty(),
@ -534,6 +538,7 @@ mod tests {
&ConnectionId::new(1), &ConnectionId::new(1),
&ConnectedPoint::Dialer { &ConnectedPoint::Dialer {
address: "/ip4/127.0.0.1".parse::<Multiaddr>().unwrap(), address: "/ip4/127.0.0.1".parse::<Multiaddr>().unwrap(),
role_override: Endpoint::Dialer,
}, },
None, None,
); );
@ -4075,6 +4080,7 @@ mod tests {
&ConnectionId::new(0), &ConnectionId::new(0),
&ConnectedPoint::Dialer { &ConnectedPoint::Dialer {
address: addr.clone(), address: addr.clone(),
role_override: Endpoint::Dialer,
}, },
None, None,
); );
@ -4094,6 +4100,7 @@ mod tests {
&ConnectionId::new(0), &ConnectionId::new(0),
&ConnectedPoint::Dialer { &ConnectedPoint::Dialer {
address: addr2.clone(), address: addr2.clone(),
role_override: Endpoint::Dialer,
}, },
None, None,
); );
@ -4122,6 +4129,7 @@ mod tests {
&ConnectionId::new(0), &ConnectionId::new(0),
&ConnectedPoint::Dialer { &ConnectedPoint::Dialer {
address: addr.clone(), address: addr.clone(),
role_override: Endpoint::Dialer,
}, },
None, None,
); );

View File

@ -228,7 +228,7 @@ impl NetworkBehaviour for Identify {
failed_addresses: Option<&Vec<Multiaddr>>, failed_addresses: Option<&Vec<Multiaddr>>,
) { ) {
let addr = match endpoint { let addr = match endpoint {
ConnectedPoint::Dialer { address } => address.clone(), ConnectedPoint::Dialer { address, .. } => address.clone(),
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
}; };

View File

@ -1968,7 +1968,7 @@ where
// since the remote address on an inbound connection may be specific // since the remote address on an inbound connection may be specific
// to that connection (e.g. typically the TCP port numbers). // to that connection (e.g. typically the TCP port numbers).
let address = match endpoint { let address = match endpoint {
ConnectedPoint::Dialer { address } => Some(address), ConnectedPoint::Dialer { address, .. } => Some(address),
ConnectedPoint::Listener { .. } => None, ConnectedPoint::Listener { .. } => None,
}; };
self.connection_updated(source, address, NodeStatus::Connected); self.connection_updated(source, address, NodeStatus::Connected);

View File

@ -33,7 +33,7 @@ use libp2p_core::{
multiaddr::{multiaddr, Multiaddr, Protocol}, multiaddr::{multiaddr, Multiaddr, Protocol},
multihash::{Code, Multihash, MultihashDigest}, multihash::{Code, Multihash, MultihashDigest},
transport::MemoryTransport, transport::MemoryTransport,
upgrade, PeerId, Transport, upgrade, Endpoint, PeerId, Transport,
}; };
use libp2p_noise as noise; use libp2p_noise as noise;
use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm::{Swarm, SwarmEvent};
@ -1287,6 +1287,7 @@ fn network_behaviour_inject_address_change() {
let endpoint = ConnectedPoint::Dialer { let endpoint = ConnectedPoint::Dialer {
address: old_address.clone(), address: old_address.clone(),
role_override: Endpoint::Dialer,
}; };
// Mimick a connection being established. // Mimick a connection being established.
@ -1316,9 +1317,11 @@ fn network_behaviour_inject_address_change() {
&connection_id, &connection_id,
&ConnectedPoint::Dialer { &ConnectedPoint::Dialer {
address: old_address.clone(), address: old_address.clone(),
role_override: Endpoint::Dialer,
}, },
&ConnectedPoint::Dialer { &ConnectedPoint::Dialer {
address: new_address.clone(), address: new_address.clone(),
role_override: Endpoint::Dialer,
}, },
); );

View File

@ -30,7 +30,7 @@ use libp2p_core::{
use libp2p_mplex as mplex; use libp2p_mplex as mplex;
use libp2p_noise as noise; use libp2p_noise as noise;
use libp2p_ping as ping; use libp2p_ping as ping;
use libp2p_swarm::{dial_opts::DialOpts, DummyBehaviour, KeepAlive, Swarm, SwarmEvent}; use libp2p_swarm::{DummyBehaviour, KeepAlive, Swarm, SwarmEvent};
use libp2p_tcp::TcpConfig; use libp2p_tcp::TcpConfig;
use libp2p_yamux as yamux; use libp2p_yamux as yamux;
use quickcheck::*; use quickcheck::*;

View File

@ -25,6 +25,7 @@ use futures::channel::oneshot;
use futures::future::{BoxFuture, Future, FutureExt}; use futures::future::{BoxFuture, Future, FutureExt};
use futures::sink::SinkExt; use futures::sink::SinkExt;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use libp2p_core::connection::Endpoint;
use libp2p_core::either::{EitherError, EitherFuture, EitherOutput}; use libp2p_core::either::{EitherError, EitherFuture, EitherOutput};
use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::{ListenerEvent, TransportError}; use libp2p_core::transport::{ListenerEvent, TransportError};
@ -220,15 +221,41 @@ impl<T: Transport + Clone> Transport for RelayTransport<T> {
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Dialer)
}
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Listener)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner_transport.address_translation(server, observed)
}
}
impl<T: Transport + Clone> RelayTransport<T> {
fn do_dial(
self,
addr: Multiaddr,
role_override: Endpoint,
) -> Result<<Self as Transport>::Dial, TransportError<<Self as Transport>::Error>> {
match parse_relayed_multiaddr(addr)? { match parse_relayed_multiaddr(addr)? {
// Address does not contain circuit relay protocol. Use inner transport. // Address does not contain circuit relay protocol. Use inner transport.
Err(addr) => match self.inner_transport.dial(addr) { Err(addr) => {
let dial = match role_override {
Endpoint::Dialer => self.inner_transport.dial(addr),
Endpoint::Listener => self.inner_transport.dial_as_listener(addr),
};
match dial {
Ok(dialer) => Ok(EitherFuture::First(dialer)), Ok(dialer) => Ok(EitherFuture::First(dialer)),
Err(TransportError::MultiaddrNotSupported(addr)) => { Err(TransportError::MultiaddrNotSupported(addr)) => {
Err(TransportError::MultiaddrNotSupported(addr)) Err(TransportError::MultiaddrNotSupported(addr))
} }
Err(TransportError::Other(err)) => Err(TransportError::Other(EitherError::A(err))), Err(TransportError::Other(err)) => {
}, Err(TransportError::Other(EitherError::A(err)))
}
}
}
// Address does contain circuit relay protocol. Dial destination via relay. // Address does contain circuit relay protocol. Dial destination via relay.
Ok(RelayedMultiaddr { Ok(RelayedMultiaddr {
relay_peer_id, relay_peer_id,
@ -263,10 +290,6 @@ impl<T: Transport + Clone> Transport for RelayTransport<T> {
} }
} }
} }
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner_transport.address_translation(server, observed)
}
} }
#[derive(Default)] #[derive(Default)]

View File

@ -197,6 +197,17 @@ impl Transport for ClientTransport {
.boxed()) .boxed())
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized,
{
// [`Transport::dial_as_listener`] is used for NAT and firewall
// traversal. One would coordinate such traversal via a previously
// established relayed connection, but never using a relayed connection
// itself.
return Err(TransportError::MultiaddrNotSupported(addr));
}
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None None
} }

View File

@ -1264,6 +1264,10 @@ impl<T: Transport> Transport for Firewall<T> {
self.0.dial(addr) self.0.dial(addr)
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.0.dial_as_listener(addr)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.0.address_translation(server, observed) self.0.address_translation(server, observed)
} }

View File

@ -29,9 +29,7 @@ use libp2p::core::upgrade::SelectUpgrade;
use libp2p::core::{identity, Multiaddr, PeerId, Transport}; use libp2p::core::{identity, Multiaddr, PeerId, Transport};
use libp2p::mplex::MplexConfig; use libp2p::mplex::MplexConfig;
use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; use libp2p::noise::{Keypair, NoiseConfig, X25519Spec};
use libp2p::swarm::{ use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent};
dial_opts::DialOpts, AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent,
};
use libp2p::yamux::YamuxConfig; use libp2p::yamux::YamuxConfig;
use std::fmt::Debug; use std::fmt::Debug;
use std::time::Duration; use std::time::Duration;

View File

@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use libp2p_core::identity; use libp2p_core::identity;
use libp2p_rendezvous as rendezvous; use libp2p_rendezvous as rendezvous;
use libp2p_swarm::{dial_opts::DialOpts, DialError, Swarm, SwarmEvent}; use libp2p_swarm::{DialError, Swarm, SwarmEvent};
use std::convert::TryInto; use std::convert::TryInto;
use std::time::Duration; use std::time::Duration;

View File

@ -599,7 +599,7 @@ where
new: &ConnectedPoint, new: &ConnectedPoint,
) { ) {
let new_address = match new { let new_address = match new {
ConnectedPoint::Dialer { address } => Some(address.clone()), ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None, ConnectedPoint::Listener { .. } => None,
}; };
let connections = self let connections = self
@ -631,7 +631,7 @@ where
_errors: Option<&Vec<Multiaddr>>, _errors: Option<&Vec<Multiaddr>>,
) { ) {
let address = match endpoint { let address = match endpoint {
ConnectedPoint::Dialer { address } => Some(address.clone()), ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None, ConnectedPoint::Listener { .. } => None,
}; };
self.connected self.connected

View File

@ -89,6 +89,13 @@ where
.map(move |fut| BandwidthFuture { inner: fut, sinks }) .map(move |fut| BandwidthFuture { inner: fut, sinks })
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let sinks = self.sinks;
self.inner
.dial_as_listener(addr)
.map(move |fut| BandwidthFuture { inner: fut, sinks })
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed) self.inner.address_translation(server, observed)
} }

View File

@ -14,9 +14,13 @@
- Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]). - Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]).
- Enable overriding _dial concurrency factor_ per dial via - Allow overriding _dial concurrency factor_ per dial via
`DialOpts::override_dial_concurrency_factor`. See [PR 2404]. `DialOpts::override_dial_concurrency_factor`. See [PR 2404].
- Allow overriding role when dialing through `override_role` option on
`DialOpts`. This option is needed for NAT and firewall hole punching. See [PR
2363].
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350
[PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 [PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362
@ -24,6 +28,7 @@
[PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375 [PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375
[PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378 [PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378
[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404
[PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363
# 0.32.0 [2021-11-16] # 0.32.0 [2021-11-16]

View File

@ -19,6 +19,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use libp2p_core::connection::Endpoint;
use libp2p_core::{Multiaddr, PeerId}; use libp2p_core::{Multiaddr, PeerId};
use std::num::NonZeroU8; use std::num::NonZeroU8;
@ -51,6 +52,7 @@ impl DialOpts {
WithPeerId { WithPeerId {
peer_id, peer_id,
condition: Default::default(), condition: Default::default(),
role_override: Endpoint::Dialer,
dial_concurrency_factor_override: Default::default(), dial_concurrency_factor_override: Default::default(),
} }
} }
@ -108,6 +110,7 @@ pub(super) enum Opts {
pub struct WithPeerId { pub struct WithPeerId {
pub(crate) peer_id: PeerId, pub(crate) peer_id: PeerId,
pub(crate) condition: PeerCondition, pub(crate) condition: PeerCondition,
pub(crate) role_override: Endpoint,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>, pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
} }
@ -132,10 +135,22 @@ impl WithPeerId {
condition: self.condition, condition: self.condition,
addresses, addresses,
extend_addresses_through_behaviour: false, extend_addresses_through_behaviour: false,
role_override: self.role_override,
dial_concurrency_factor_override: self.dial_concurrency_factor_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override,
} }
} }
/// Override role of local node on connection. I.e. execute the dial _as a
/// listener_.
///
/// See
/// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer)
/// for details.
pub fn override_role(mut self) -> Self {
self.role_override = Endpoint::Listener;
self
}
/// Build the final [`DialOpts`]. /// Build the final [`DialOpts`].
/// ///
/// Addresses to dial the peer are retrieved via /// Addresses to dial the peer are retrieved via
@ -151,6 +166,7 @@ pub struct WithPeerIdWithAddresses {
pub(crate) condition: PeerCondition, pub(crate) condition: PeerCondition,
pub(crate) addresses: Vec<Multiaddr>, pub(crate) addresses: Vec<Multiaddr>,
pub(crate) extend_addresses_through_behaviour: bool, pub(crate) extend_addresses_through_behaviour: bool,
pub(crate) role_override: Endpoint,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>, pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
} }
@ -168,6 +184,17 @@ impl WithPeerIdWithAddresses {
self self
} }
/// Override role of local node on connection. I.e. execute the dial _as a
/// listener_.
///
/// See
/// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer)
/// for details.
pub fn override_role(mut self) -> Self {
self.role_override = Endpoint::Listener;
self
}
/// Override /// Override
/// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor). /// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor).
pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self { pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
@ -187,16 +214,30 @@ pub struct WithoutPeerId {}
impl WithoutPeerId { impl WithoutPeerId {
/// Specify a single address to dial the unknown peer. /// Specify a single address to dial the unknown peer.
pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress { pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress {
WithoutPeerIdWithAddress { address } WithoutPeerIdWithAddress {
address,
role_override: Endpoint::Dialer,
}
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct WithoutPeerIdWithAddress { pub struct WithoutPeerIdWithAddress {
pub(crate) address: Multiaddr, pub(crate) address: Multiaddr,
pub(crate) role_override: Endpoint,
} }
impl WithoutPeerIdWithAddress { impl WithoutPeerIdWithAddress {
/// Override role of local node on connection. I.e. execute the dial _as a
/// listener_.
///
/// See
/// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer)
/// for details.
pub fn override_role(mut self) -> Self {
self.role_override = Endpoint::Listener;
self
}
/// Build the final [`DialOpts`]. /// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts { pub fn build(self) -> DialOpts {
DialOpts(Opts::WithoutPeerIdWithAddress(self)) DialOpts(Opts::WithoutPeerIdWithAddress(self))

View File

@ -368,11 +368,13 @@ where
dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { dial_opts::Opts::WithPeerId(dial_opts::WithPeerId {
peer_id, peer_id,
condition, condition,
role_override,
dial_concurrency_factor_override, dial_concurrency_factor_override,
}) })
| dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses { | dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses {
peer_id, peer_id,
condition, condition,
role_override,
dial_concurrency_factor_override, dial_concurrency_factor_override,
.. ..
}) => { }) => {
@ -439,7 +441,9 @@ where
addresses addresses
}; };
let mut opts = libp2p_core::DialOpts::peer_id(peer_id).addresses(addresses); let mut opts = libp2p_core::DialOpts::peer_id(peer_id)
.addresses(addresses)
.override_role(role_override);
if let Some(f) = dial_concurrency_factor_override { if let Some(f) = dial_concurrency_factor_override {
opts = opts.override_dial_concurrency_factor(f); opts = opts.override_dial_concurrency_factor(f);
@ -450,8 +454,10 @@ where
// Dial an unknown peer. // Dial an unknown peer.
dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress { dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress {
address, address,
role_override,
}) => libp2p_core::DialOpts::unknown_peer_id() }) => libp2p_core::DialOpts::unknown_peer_id()
.address(address) .address(address)
.override_role(role_override)
.build(), .build(),
}; };

View File

@ -58,6 +58,7 @@
use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider}; use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider};
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{ use libp2p_core::{
connection::Endpoint,
multiaddr::{Multiaddr, Protocol}, multiaddr::{Multiaddr, Protocol},
transport::{ListenerEvent, TransportError}, transport::{ListenerEvent, TransportError},
Transport, Transport,
@ -211,6 +212,31 @@ where
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Dialer)
}
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Listener)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}
impl<T, C, P> GenDnsConfig<T, C, P>
where
T: Transport + Clone + Send + 'static,
T::Error: Send,
T::Dial: Send,
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
fn do_dial(
self,
addr: Multiaddr,
role_override: Endpoint,
) -> Result<<Self as Transport>::Dial, TransportError<<Self as Transport>::Error>> {
// Asynchronlously resolve all DNS names in the address before proceeding // Asynchronlously resolve all DNS names in the address before proceeding
// with dialing on the underlying transport. // with dialing on the underlying transport.
Ok(async move { Ok(async move {
@ -293,7 +319,11 @@ where
log::debug!("Dialing {}", addr); log::debug!("Dialing {}", addr);
let transport = inner.clone(); let transport = inner.clone();
let result = match transport.dial(addr) { let dial = match role_override {
Endpoint::Dialer => transport.dial(addr),
Endpoint::Listener => transport.dial_as_listener(addr),
};
let result = match dial {
Ok(out) => { Ok(out) => {
// We only count attempts that the inner transport // We only count attempts that the inner transport
// actually accepted, i.e. for which it produced // actually accepted, i.e. for which it produced
@ -338,10 +368,6 @@ where
.boxed() .boxed()
.right_future()) .right_future())
} }
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
} }
/// The possible errors of a [`GenDnsConfig`] wrapped transport. /// The possible errors of a [`GenDnsConfig`] wrapped transport.
@ -579,6 +605,13 @@ mod tests {
Ok(Box::pin(future::ready(Ok(())))) Ok(Box::pin(future::ready(Ok(()))))
} }
fn dial_as_listener(
self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
self.dial(addr)
}
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None None
} }

View File

@ -405,6 +405,10 @@ where
Ok(Box::pin(self.do_dial(socket_addr))) Ok(Box::pin(self.do_dial(socket_addr)))
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.dial(addr)
}
/// When port reuse is disabled and hence ephemeral local ports are /// When port reuse is disabled and hence ephemeral local ports are
/// used for outgoing connections, the returned address is the /// used for outgoing connections, the returned address is the
/// `observed` address with the port replaced by the port of the /// `observed` address with the port replaced by the port of the

View File

@ -105,6 +105,7 @@ impl Transport for $uds_config {
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// TODO: Should we dial at all?
if let Ok(path) = multiaddr_to_path(&addr) { if let Ok(path) = multiaddr_to_path(&addr) {
debug!("Dialing {}", addr); debug!("Dialing {}", addr);
Ok(async move { <$unix_stream>::connect(&path).await }.boxed()) Ok(async move { <$unix_stream>::connect(&path).await }.boxed())
@ -113,6 +114,10 @@ impl Transport for $uds_config {
} }
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.dial(addr)
}
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None None
} }

View File

@ -33,7 +33,11 @@
//! //!
use futures::{future::Ready, prelude::*}; use futures::{future::Ready, prelude::*};
use libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport}; use libp2p_core::{
connection::Endpoint,
transport::{ListenerEvent, TransportError},
Multiaddr, Transport,
};
use parity_send_wrapper::SendWrapper; use parity_send_wrapper::SendWrapper;
use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll};
use wasm_bindgen::{prelude::*, JsCast}; use wasm_bindgen::{prelude::*, JsCast};
@ -61,7 +65,11 @@ pub mod ffi {
/// If the multiaddress is not supported, you should return an instance of `Error` whose /// If the multiaddress is not supported, you should return an instance of `Error` whose
/// `name` property has been set to the string `"NotSupportedError"`. /// `name` property has been set to the string `"NotSupportedError"`.
#[wasm_bindgen(method, catch)] #[wasm_bindgen(method, catch)]
pub fn dial(this: &Transport, multiaddr: &str) -> Result<js_sys::Promise, JsValue>; pub fn dial(
this: &Transport,
multiaddr: &str,
_role_override: bool,
) -> Result<js_sys::Promise, JsValue>;
/// Start listening on the given multiaddress. /// Start listening on the given multiaddress.
/// ///
@ -148,6 +156,29 @@ impl ExtTransport {
inner: SendWrapper::new(transport), inner: SendWrapper::new(transport),
} }
} }
fn do_dial(
self,
addr: Multiaddr,
role_override: Endpoint,
) -> Result<<Self as Transport>::Dial, TransportError<<Self as Transport>::Error>> {
let promise = self
.inner
.dial(
&addr.to_string(),
matches!(role_override, Endpoint::Listener),
)
.map_err(|err| {
if is_not_supported_error(&err) {
TransportError::MultiaddrNotSupported(addr)
} else {
TransportError::Other(JsErr::from(err))
}
})?;
Ok(Dial {
inner: SendWrapper::new(promise.into()),
})
}
} }
impl fmt::Debug for ExtTransport { impl fmt::Debug for ExtTransport {
@ -187,18 +218,18 @@ impl Transport for ExtTransport {
}) })
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
let promise = self.inner.dial(&addr.to_string()).map_err(|err| { where
if is_not_supported_error(&err) { Self: Sized,
TransportError::MultiaddrNotSupported(addr) {
} else { self.do_dial(addr, Endpoint::Dialer)
TransportError::Other(JsErr::from(err))
} }
})?;
Ok(Dial { fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
inner: SendWrapper::new(promise.into()), where
}) Self: Sized,
{
self.do_dial(addr, Endpoint::Listener)
} }
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {

View File

@ -23,6 +23,7 @@ use either::Either;
use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream};
use futures_rustls::{client, rustls, server}; use futures_rustls::{client, rustls, server};
use libp2p_core::{ use libp2p_core::{
connection::Endpoint,
either::EitherOutput, either::EitherOutput,
multiaddr::{Multiaddr, Protocol}, multiaddr::{Multiaddr, Protocol},
transport::{ListenerEvent, TransportError}, transport::{ListenerEvent, TransportError},
@ -245,6 +246,32 @@ where
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Dialer)
}
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Listener)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}
impl<T> WsConfig<T>
where
T: Transport + Send + Clone + 'static,
T::Error: Send + 'static,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
fn do_dial(
self,
addr: Multiaddr,
role_override: Endpoint,
) -> Result<<Self as Transport>::Dial, TransportError<<Self as Transport>::Error>> {
let addr = match parse_ws_dial_addr(addr) { let addr = match parse_ws_dial_addr(addr) {
Ok(addr) => addr, Ok(addr) => addr,
Err(Error::InvalidMultiaddr(a)) => { Err(Error::InvalidMultiaddr(a)) => {
@ -259,7 +286,7 @@ where
let future = async move { let future = async move {
loop { loop {
let this = self.clone(); let this = self.clone();
match this.dial_once(addr).await { match this.dial_once(addr, role_override).await {
Ok(Either::Left(redirect)) => { Ok(Either::Left(redirect)) => {
if remaining_redirects == 0 { if remaining_redirects == 0 {
debug!("Too many redirects (> {})", self.max_redirects); debug!("Too many redirects (> {})", self.max_redirects);
@ -276,25 +303,19 @@ where
Ok(Box::pin(future)) Ok(Box::pin(future))
} }
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}
impl<T> WsConfig<T>
where
T: Transport,
T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
/// Attempts to dial the given address and perform a websocket handshake. /// Attempts to dial the given address and perform a websocket handshake.
async fn dial_once( async fn dial_once(
self, self,
addr: WsAddress, addr: WsAddress,
role_override: Endpoint,
) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> { ) -> Result<Either<String, Connection<T::Output>>, Error<T::Error>> {
trace!("Dialing websocket address: {:?}", addr); trace!("Dialing websocket address: {:?}", addr);
let dial = self.transport.dial(addr.tcp_addr).map_err(|e| match e { let dial = match role_override {
Endpoint::Dialer => self.transport.dial(addr.tcp_addr),
Endpoint::Listener => self.transport.dial_as_listener(addr.tcp_addr),
}
.map_err(|e| match e {
TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a), TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a),
TransportError::Other(e) => Error::Transport(e), TransportError::Other(e) => Error::Transport(e),
})?; })?;

View File

@ -28,12 +28,13 @@ use error::Error;
use framed::{Connection, Incoming}; use framed::{Connection, Incoming};
use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream};
use libp2p_core::{ use libp2p_core::{
connection::ConnectedPoint,
multiaddr::Multiaddr, multiaddr::Multiaddr,
transport::{ transport::{
map::{MapFuture, MapStream}, map::{MapFuture, MapStream},
ListenerEvent, TransportError, ListenerEvent, TransportError,
}, },
ConnectedPoint, Transport, Transport,
}; };
use rw_stream_sink::RwStreamSink; use rw_stream_sink::RwStreamSink;
use std::{ use std::{
@ -129,6 +130,12 @@ where
.dial(addr) .dial(addr)
} }
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.transport
.map(wrap_connection as WrapperFn<T::Output>)
.dial_as_listener(addr)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed) self.transport.address_translation(server, observed)
} }