feat(quic): implement hole punching

Implement `Transport::dial_as_listener` for QUIC as specified by the [DCUtR spec](https://github.com/libp2p/specs/blob/master/relay/DCUtR.md).

To facilitate hole punching in QUIC, one side needs to send random UDP packets to establish a mapping in the routing table of the NAT device. If successful, our listener will emit a new inbound connection. This connection needs to then be sent to the dialing task. We achieve this by storing a `HashMap` of hole punch attempts indexed by the remote's `SocketAddr`. A matching incoming connection is then sent via a oneshot channel to the dialing task which continues with upgrading the connection.

Related #2883.

Pull-Request: #3964.
This commit is contained in:
Arpan Kapoor 2023-06-13 17:12:18 +05:30 committed by GitHub
parent 2a6311f473
commit cf3e4c6860
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 293 additions and 78 deletions

2
Cargo.lock generated
View File

@ -1190,6 +1190,7 @@ dependencies = [
"futures", "futures",
"futures-timer", "futures-timer",
"libp2p", "libp2p",
"libp2p-quic",
"log", "log",
] ]
@ -4112,6 +4113,7 @@ dependencies = [
"env_logger 0.10.0", "env_logger 0.10.0",
"futures", "futures",
"libp2p", "libp2p",
"libp2p-quic",
] ]
[[package]] [[package]]

View File

@ -11,4 +11,5 @@ env_logger = "0.10.0"
futures = "0.3.28" futures = "0.3.28"
futures-timer = "3.0" futures-timer = "3.0"
libp2p = { path = "../../libp2p", features = ["async-std", "dns", "dcutr", "identify", "macros", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] } libp2p = { path = "../../libp2p", features = ["async-std", "dns", "dcutr", "identify", "macros", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] }
libp2p-quic = { path = "../../transports/quic", features = ["async-std"] }
log = "0.4" log = "0.4"

View File

@ -23,13 +23,14 @@
use clap::Parser; use clap::Parser;
use futures::{ use futures::{
executor::{block_on, ThreadPool}, executor::{block_on, ThreadPool},
future::FutureExt, future::{Either, FutureExt},
stream::StreamExt, stream::StreamExt,
}; };
use libp2p::{ use libp2p::{
core::{ core::{
multiaddr::{Multiaddr, Protocol}, multiaddr::{Multiaddr, Protocol},
transport::{OrTransport, Transport}, muxing::StreamMuxerBox,
transport::Transport,
upgrade, upgrade,
}, },
dcutr, dcutr,
@ -38,9 +39,9 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId, tcp, yamux, PeerId,
}; };
use libp2p_quic as quic;
use log::info; use log::info;
use std::error::Error; use std::error::Error;
use std::net::Ipv4Addr;
use std::str::FromStr; use std::str::FromStr;
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
@ -91,19 +92,26 @@ fn main() -> Result<(), Box<dyn Error>> {
let (relay_transport, client) = relay::client::new(local_peer_id); let (relay_transport, client) = relay::client::new(local_peer_id);
let transport = OrTransport::new( let transport = {
relay_transport, let relay_tcp_quic_transport = relay_transport
block_on(DnsConfig::system(tcp::async_io::Transport::new( .or_transport(tcp::async_io::Transport::new(
tcp::Config::default().port_reuse(true), tcp::Config::default().port_reuse(true),
))) ))
.unwrap(), .upgrade(upgrade::Version::V1)
) .authenticate(noise::Config::new(&local_key).unwrap())
.upgrade(upgrade::Version::V1Lazy) .multiplex(yamux::Config::default())
.authenticate( .or_transport(quic::async_std::Transport::new(quic::Config::new(
noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."), &local_key,
) )));
.multiplex(yamux::Config::default())
.boxed(); block_on(DnsConfig::system(relay_tcp_quic_transport))
.unwrap()
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed()
};
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")] #[behaviour(to_swarm = "Event")]
@ -164,11 +172,10 @@ fn main() -> Result<(), Box<dyn Error>> {
.build(); .build();
swarm swarm
.listen_on( .listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap())
Multiaddr::empty() .unwrap();
.with("0.0.0.0".parse::<Ipv4Addr>().unwrap().into()) swarm
.with(Protocol::Tcp(0)), .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
)
.unwrap(); .unwrap();
// Wait to listen on all interfaces. // Wait to listen on all interfaces.

View File

@ -12,3 +12,4 @@ async-trait = "0.1"
env_logger = "0.10.0" env_logger = "0.10.0"
futures = "0.3.28" futures = "0.3.28"
libp2p = { path = "../../libp2p", features = ["async-std", "noise", "macros", "ping", "tcp", "identify", "yamux", "relay"] } libp2p = { path = "../../libp2p", features = ["async-std", "noise", "macros", "ping", "tcp", "identify", "yamux", "relay"] }
libp2p-quic = { path = "../../transports/quic", features = ["async-std"] }

View File

@ -22,10 +22,11 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
use clap::Parser; use clap::Parser;
use futures::executor::block_on;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::{executor::block_on, future::Either};
use libp2p::{ use libp2p::{
core::multiaddr::Protocol, core::multiaddr::Protocol,
core::muxing::StreamMuxerBox,
core::upgrade, core::upgrade,
core::{Multiaddr, Transport}, core::{Multiaddr, Transport},
identify, identity, identify, identity,
@ -34,6 +35,7 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
tcp, tcp,
}; };
use libp2p_quic as quic;
use std::error::Error; use std::error::Error;
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{Ipv4Addr, Ipv6Addr};
@ -50,12 +52,21 @@ fn main() -> Result<(), Box<dyn Error>> {
let tcp_transport = tcp::async_io::Transport::default(); let tcp_transport = tcp::async_io::Transport::default();
let transport = tcp_transport let tcp_transport = tcp_transport
.upgrade(upgrade::Version::V1Lazy) .upgrade(upgrade::Version::V1Lazy)
.authenticate( .authenticate(
noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."), noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."),
) )
.multiplex(libp2p::yamux::Config::default()) .multiplex(libp2p::yamux::Config::default());
let quic_transport = quic::async_std::Transport::new(quic::Config::new(&local_key));
let transport = quic_transport
.or_transport(tcp_transport)
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed(); .boxed();
let behaviour = Behaviour { let behaviour = Behaviour {
@ -70,13 +81,22 @@ fn main() -> Result<(), Box<dyn Error>> {
let mut swarm = SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build(); let mut swarm = SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build();
// Listen on all interfaces // Listen on all interfaces
let listen_addr = Multiaddr::empty() let listen_addr_tcp = Multiaddr::empty()
.with(match opt.use_ipv6 { .with(match opt.use_ipv6 {
Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED), Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED),
_ => Protocol::from(Ipv4Addr::UNSPECIFIED), _ => Protocol::from(Ipv4Addr::UNSPECIFIED),
}) })
.with(Protocol::Tcp(opt.port)); .with(Protocol::Tcp(opt.port));
swarm.listen_on(listen_addr)?; swarm.listen_on(listen_addr_tcp)?;
let listen_addr_quic = Multiaddr::empty()
.with(match opt.use_ipv6 {
Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED),
_ => Protocol::from(Ipv4Addr::UNSPECIFIED),
})
.with(Protocol::Udp(opt.port))
.with(Protocol::QuicV1);
swarm.listen_on(listen_addr_quic)?;
block_on(async { block_on(async {
loop { loop {

View File

@ -3,7 +3,10 @@
- Raise MSRV to 1.65. - Raise MSRV to 1.65.
See [PR 3715]. See [PR 3715].
- Add hole punching support by implementing `Transport::dial_as_listener`. See [PR 3964].
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3964]: https://github.com/libp2p/rust-libp2p/pull/3964
## 0.7.0-alpha.3 ## 0.7.0-alpha.3

View File

@ -23,7 +23,7 @@ quinn-proto = { version = "0.10.1", default-features = false, features = ["tls-r
rand = "0.8.5" rand = "0.8.5"
rustls = { version = "0.21.1", default-features = false } rustls = { version = "0.21.1", default-features = false }
thiserror = "1.0.40" thiserror = "1.0.40"
tokio = { version = "1.28.2", default-features = false, features = ["net", "rt"], optional = true } tokio = { version = "1.28.2", default-features = false, features = ["net", "rt", "time"], optional = true }
[features] [features]
tokio = ["dep:tokio", "if-watch/tokio"] tokio = ["dep:tokio", "if-watch/tokio"]

View File

@ -279,6 +279,13 @@ impl Channel {
Ok(Ok(())) Ok(Ok(()))
} }
pub(crate) async fn send(&mut self, to_endpoint: ToEndpoint) -> Result<(), Disconnected> {
self.to_endpoint
.send(to_endpoint)
.await
.map_err(|_| Disconnected {})
}
/// Send a message to inform the [`Driver`] about an /// Send a message to inform the [`Driver`] about an
/// event caused by the owner of this [`Channel`] dropping. /// event caused by the owner of this [`Channel`] dropping.
/// This clones the sender to the endpoint to guarantee delivery. /// This clones the sender to the endpoint to guarantee delivery.

View File

@ -0,0 +1,47 @@
use std::{net::SocketAddr, time::Duration};
use futures::future::Either;
use rand::{distributions, Rng};
use crate::{
endpoint::{self, ToEndpoint},
Error, Provider,
};
pub(crate) async fn hole_puncher<P: Provider>(
endpoint_channel: endpoint::Channel,
remote_addr: SocketAddr,
timeout_duration: Duration,
) -> Error {
let punch_holes_future = punch_holes::<P>(endpoint_channel, remote_addr);
futures::pin_mut!(punch_holes_future);
match futures::future::select(P::sleep(timeout_duration), punch_holes_future).await {
Either::Left(_) => Error::HandshakeTimedOut,
Either::Right((hole_punch_err, _)) => hole_punch_err,
}
}
async fn punch_holes<P: Provider>(
mut endpoint_channel: endpoint::Channel,
remote_addr: SocketAddr,
) -> Error {
loop {
let sleep_duration = Duration::from_millis(rand::thread_rng().gen_range(10..=200));
P::sleep(sleep_duration).await;
let random_udp_packet = ToEndpoint::SendUdpPacket(quinn_proto::Transmit {
destination: remote_addr,
ecn: None,
contents: rand::thread_rng()
.sample_iter(distributions::Standard)
.take(64)
.collect(),
segment_size: None,
src_ip: None,
});
if endpoint_channel.send(random_udp_packet).await.is_err() {
return Error::EndpointDriverCrashed;
}
}
}

View File

@ -59,9 +59,12 @@
mod connection; mod connection;
mod endpoint; mod endpoint;
mod hole_punching;
mod provider; mod provider;
mod transport; mod transport;
use std::net::SocketAddr;
pub use connection::{Connecting, Connection, Substream}; pub use connection::{Connecting, Connection, Substream};
pub use endpoint::Config; pub use endpoint::Config;
#[cfg(feature = "async-std")] #[cfg(feature = "async-std")]
@ -94,6 +97,14 @@ pub enum Error {
/// The [`Connecting`] future timed out. /// The [`Connecting`] future timed out.
#[error("Handshake with the remote timed out.")] #[error("Handshake with the remote timed out.")]
HandshakeTimedOut, HandshakeTimedOut,
/// Error when `Transport::dial_as_listener` is called without an active listener.
#[error("Tried to dial as listener without an active listener.")]
NoActiveListenerForDialAsListener,
/// Error when holepunching for a remote is already in progress
#[error("Already punching hole for {0}).")]
HolePunchInProgress(SocketAddr),
} }
/// Dialing a remote peer failed. /// Dialing a remote peer failed.

View File

@ -18,12 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::Future; use futures::{future::BoxFuture, Future};
use if_watch::IfEvent; use if_watch::IfEvent;
use std::{ use std::{
io, io,
net::SocketAddr, net::SocketAddr,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
#[cfg(feature = "async-std")] #[cfg(feature = "async-std")]
@ -74,4 +75,7 @@ pub trait Provider: Unpin + Send + Sized + 'static {
watcher: &mut Self::IfWatcher, watcher: &mut Self::IfWatcher,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<io::Result<IfEvent>>; ) -> Poll<io::Result<IfEvent>>;
/// Sleep for specified amount of time.
fn sleep(duration: Duration) -> BoxFuture<'static, ()>;
} }

View File

@ -26,6 +26,7 @@ use std::{
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use crate::GenTransport; use crate::GenTransport;
@ -104,6 +105,10 @@ impl super::Provider for Provider {
) -> Poll<io::Result<if_watch::IfEvent>> { ) -> Poll<io::Result<if_watch::IfEvent>> {
watcher.poll_if_event(cx) watcher.poll_if_event(cx)
} }
fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
async_std::task::sleep(duration).boxed()
}
} }
type ReceiveStreamItem = ( type ReceiveStreamItem = (

View File

@ -18,11 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::{ready, Future}; use futures::{future::BoxFuture, ready, Future, FutureExt};
use std::{ use std::{
io, io,
net::SocketAddr, net::SocketAddr,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use tokio::{io::ReadBuf, net::UdpSocket}; use tokio::{io::ReadBuf, net::UdpSocket};
@ -95,4 +96,8 @@ impl super::Provider for Provider {
) -> Poll<io::Result<if_watch::IfEvent>> { ) -> Poll<io::Result<if_watch::IfEvent>> {
watcher.poll_if_event(cx) watcher.poll_if_event(cx)
} }
fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
tokio::time::sleep(duration).boxed()
}
} }

View File

@ -19,11 +19,12 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::endpoint::{Config, QuinnConfig, ToEndpoint}; use crate::endpoint::{Config, QuinnConfig, ToEndpoint};
use crate::hole_punching::hole_puncher;
use crate::provider::Provider; use crate::provider::Provider;
use crate::{endpoint, Connecting, Connection, Error}; use crate::{endpoint, Connecting, Connection, Error};
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture; use futures::future::{BoxFuture, Either};
use futures::ready; use futures::ready;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::{prelude::*, stream::SelectAll}; use futures::{prelude::*, stream::SelectAll};
@ -73,6 +74,8 @@ pub struct GenTransport<P: Provider> {
dialer: HashMap<SocketFamily, Dialer>, dialer: HashMap<SocketFamily, Dialer>,
/// Waker to poll the transport again when a new dialer or listener is added. /// Waker to poll the transport again when a new dialer or listener is added.
waker: Option<Waker>, waker: Option<Waker>,
/// Holepunching attempts
hole_punch_attempts: HashMap<SocketAddr, oneshot::Sender<Connecting>>,
} }
impl<P: Provider> GenTransport<P> { impl<P: Provider> GenTransport<P> {
@ -88,6 +91,49 @@ impl<P: Provider> GenTransport<P> {
dialer: HashMap::new(), dialer: HashMap::new(),
waker: None, waker: None,
support_draft_29, support_draft_29,
hole_punch_attempts: Default::default(),
}
}
fn remote_multiaddr_to_socketaddr(
&self,
addr: Multiaddr,
) -> Result<
(SocketAddr, ProtocolVersion, Option<PeerId>),
TransportError<<Self as Transport>::Error>,
> {
let (socket_addr, version, peer_id) = multiaddr_to_socketaddr(&addr, self.support_draft_29)
.ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?;
if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
return Err(TransportError::MultiaddrNotSupported(addr));
}
Ok((socket_addr, version, peer_id))
}
fn eligible_listener(&mut self, socket_addr: &SocketAddr) -> Option<&mut Listener<P>> {
let mut listeners: Vec<_> = self
.listeners
.iter_mut()
.filter(|l| {
if l.is_closed {
return false;
}
let listen_addr = l.endpoint_channel.socket_addr();
SocketFamily::is_same(&listen_addr.ip(), &socket_addr.ip())
&& listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback()
})
.collect();
match listeners.len() {
0 => None,
1 => listeners.pop(),
_ => {
// Pick any listener to use for dialing.
// We hash the socket address to achieve determinism.
let mut hasher = DefaultHasher::new();
socket_addr.hash(&mut hasher);
let index = hasher.finish() as usize % listeners.len();
Some(listeners.swap_remove(index))
}
} }
} }
} }
@ -103,8 +149,9 @@ impl<P: Provider> Transport for GenTransport<P> {
listener_id: ListenerId, listener_id: ListenerId,
addr: Multiaddr, addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> { ) -> Result<(), TransportError<Self::Error>> {
let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) let (socket_addr, version, _peer_id) =
.ok_or(TransportError::MultiaddrNotSupported(addr))?; multiaddr_to_socketaddr(&addr, self.support_draft_29)
.ok_or(TransportError::MultiaddrNotSupported(addr))?;
let listener = Listener::new( let listener = Listener::new(
listener_id, listener_id,
socket_addr, socket_addr,
@ -147,27 +194,12 @@ impl<P: Provider> Transport for GenTransport<P> {
} }
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr)?;
.ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?;
if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
return Err(TransportError::MultiaddrNotSupported(addr));
}
let mut listeners = self let handshake_timeout = self.handshake_timeout;
.listeners
.iter_mut()
.filter(|l| {
if l.is_closed {
return false;
}
let listen_addr = l.endpoint_channel.socket_addr();
SocketFamily::is_same(&listen_addr.ip(), &socket_addr.ip())
&& listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback()
})
.collect::<Vec<_>>();
let dialer_state = match listeners.len() { let dialer_state = match self.eligible_listener(&socket_addr) {
0 => { None => {
// No listener. Get or create an explicit dialer. // No listener. Get or create an explicit dialer.
let socket_family = socket_addr.ip().into(); let socket_family = socket_addr.ip().into();
let dialer = match self.dialer.entry(socket_family) { let dialer = match self.dialer.entry(socket_family) {
@ -181,28 +213,61 @@ impl<P: Provider> Transport for GenTransport<P> {
}; };
&mut dialer.state &mut dialer.state
} }
1 => &mut listeners[0].dialer_state, Some(listener) => &mut listener.dialer_state,
_ => {
// Pick any listener to use for dialing.
// We hash the socket address to achieve determinism.
let mut hasher = DefaultHasher::new();
socket_addr.hash(&mut hasher);
let index = hasher.finish() as usize % listeners.len();
&mut listeners[index].dialer_state
}
}; };
Ok(dialer_state.new_dial(socket_addr, self.handshake_timeout, version)) Ok(dialer_state.new_dial(socket_addr, handshake_timeout, version))
} }
fn dial_as_listener( fn dial_as_listener(
&mut self, &mut self,
addr: Multiaddr, addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> { ) -> Result<Self::Dial, TransportError<Self::Error>> {
// TODO: As the listener of a QUIC hole punch, we need to send a random UDP packet to the let (socket_addr, _version, peer_id) = self.remote_multiaddr_to_socketaddr(addr.clone())?;
// `addr`. See DCUtR specification below. let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr))?;
//
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol let endpoint_channel = self
Err(TransportError::MultiaddrNotSupported(addr)) .eligible_listener(&socket_addr)
.ok_or(TransportError::Other(
Error::NoActiveListenerForDialAsListener,
))?
.endpoint_channel
.clone();
let hole_puncher = hole_puncher::<P>(endpoint_channel, socket_addr, self.handshake_timeout);
let (sender, receiver) = oneshot::channel();
match self.hole_punch_attempts.entry(socket_addr) {
Entry::Occupied(mut sender_entry) => {
// Stale senders, i.e. from failed hole punches are not removed.
// Thus, we can just overwrite a stale sender.
if !sender_entry.get().is_canceled() {
return Err(TransportError::Other(Error::HolePunchInProgress(
socket_addr,
)));
}
sender_entry.insert(sender);
}
Entry::Vacant(entry) => {
entry.insert(sender);
}
};
Ok(Box::pin(async move {
futures::pin_mut!(hole_puncher);
match futures::future::select(receiver, hole_puncher).await {
Either::Left((message, _)) => {
let (inbound_peer_id, connection) = message
.expect("hole punch connection sender is never dropped before receiver")
.await?;
if inbound_peer_id != peer_id {
log::warn!("expected inbound connection from {socket_addr} to resolve to {peer_id} but got {inbound_peer_id}");
}
Ok((inbound_peer_id, connection))
}
Either::Right((hole_punch_err, _)) => Err(hole_punch_err),
}
}))
} }
fn poll( fn poll(
@ -222,8 +287,37 @@ impl<P: Provider> Transport for GenTransport<P> {
self.dialer.remove(&key); self.dialer.remove(&key);
} }
if let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) { while let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) {
return Poll::Ready(ev); match ev {
TransportEvent::Incoming {
listener_id,
mut upgrade,
local_addr,
send_back_addr,
} => {
let socket_addr =
multiaddr_to_socketaddr(&send_back_addr, self.support_draft_29)
.unwrap()
.0;
if let Some(sender) = self.hole_punch_attempts.remove(&socket_addr) {
match sender.send(upgrade) {
Ok(()) => continue,
Err(timed_out_holepunch) => {
upgrade = timed_out_holepunch;
}
}
}
return Poll::Ready(TransportEvent::Incoming {
listener_id,
upgrade,
local_addr,
send_back_addr,
});
}
_ => return Poll::Ready(ev),
}
} }
self.waker = Some(cx.waker().clone()); self.waker = Some(cx.waker().clone());
@ -594,15 +688,18 @@ fn ip_to_listenaddr(
fn multiaddr_to_socketaddr( fn multiaddr_to_socketaddr(
addr: &Multiaddr, addr: &Multiaddr,
support_draft_29: bool, support_draft_29: bool,
) -> Option<(SocketAddr, ProtocolVersion)> { ) -> Option<(SocketAddr, ProtocolVersion, Option<PeerId>)> {
let mut iter = addr.iter(); let mut iter = addr.iter();
let proto1 = iter.next()?; let proto1 = iter.next()?;
let proto2 = iter.next()?; let proto2 = iter.next()?;
let proto3 = iter.next()?; let proto3 = iter.next()?;
let mut peer_id = None;
for proto in iter { for proto in iter {
match proto { match proto {
Protocol::P2p(_) => {} // Ignore a `/p2p/...` prefix of possibly outer protocols, if present. Protocol::P2p(id) => {
peer_id = Some(id);
}
_ => return None, _ => return None,
} }
} }
@ -614,10 +711,10 @@ fn multiaddr_to_socketaddr(
match (proto1, proto2) { match (proto1, proto2) {
(Protocol::Ip4(ip), Protocol::Udp(port)) => { (Protocol::Ip4(ip), Protocol::Udp(port)) => {
Some((SocketAddr::new(ip.into(), port), version)) Some((SocketAddr::new(ip.into(), port), version, peer_id))
} }
(Protocol::Ip6(ip), Protocol::Udp(port)) => { (Protocol::Ip6(ip), Protocol::Udp(port)) => {
Some((SocketAddr::new(ip.into(), port), version)) Some((SocketAddr::new(ip.into(), port), version, peer_id))
} }
_ => None, _ => None,
} }
@ -691,7 +788,8 @@ mod test {
), ),
Some(( Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345,), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345,),
ProtocolVersion::V1 ProtocolVersion::V1,
None
)) ))
); );
assert_eq!( assert_eq!(
@ -703,7 +801,8 @@ mod test {
), ),
Some(( Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080,), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080,),
ProtocolVersion::V1 ProtocolVersion::V1,
None
)) ))
); );
assert_eq!( assert_eq!(
@ -715,7 +814,7 @@ mod test {
Some((SocketAddr::new( Some((SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
55148, 55148,
), ProtocolVersion::V1)) ), ProtocolVersion::V1, Some("12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ".parse().unwrap())))
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr( multiaddr_to_socketaddr(
@ -724,7 +823,8 @@ mod test {
), ),
Some(( Some((
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345,), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345,),
ProtocolVersion::V1 ProtocolVersion::V1,
None
)) ))
); );
assert_eq!( assert_eq!(
@ -741,7 +841,8 @@ mod test {
)), )),
8080, 8080,
), ),
ProtocolVersion::V1 ProtocolVersion::V1,
None
)) ))
); );
@ -757,7 +858,8 @@ mod test {
), ),
Some(( Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234,), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234,),
ProtocolVersion::Draft29 ProtocolVersion::Draft29,
None
)) ))
); );
} }