feat(dcutr): keep connection alive while we are using it

Similar to #3876, we now compute `connection_keep_alive` based on whether we are still using the connection, applied to the `dcutr` protocol.

Related: https://github.com/libp2p/rust-libp2p/issues/3844.

Pull-Request: #3960.
This commit is contained in:
Thomas Coratger 2023-06-04 09:43:01 +02:00 committed by GitHub
parent 75edcfcdb0
commit a4450d41c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 190 deletions

View File

@ -4,8 +4,11 @@
See [PR 3715]. See [PR 3715].
- Remove deprecated items. See [PR 3700]. - Remove deprecated items. See [PR 3700].
- Keep connection alive while we are using it. See [PR 3960].
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3700]: https://github.com/libp2p/rust-libp2p/pull/3700 [PR 3700]: https://github.com/libp2p/rust-libp2p/pull/3700
[PR 3960]: https://github.com/libp2p/rust-libp2p/pull/3960
## 0.9.1 ## 0.9.1

View File

@ -26,9 +26,11 @@ use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Protocol; use libp2p_core::multiaddr::Protocol;
use libp2p_core::{Endpoint, Multiaddr}; use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm};
use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent}; use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent,
};
use libp2p_swarm::{ use libp2p_swarm::{
ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError,
THandlerInEvent, ToSwarm, THandlerInEvent, ToSwarm,
@ -38,7 +40,7 @@ use std::task::{Context, Poll};
use thiserror::Error; use thiserror::Error;
use void::Void; use void::Void;
const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3;
/// The events produced by the [`Behaviour`]. /// The events produced by the [`Behaviour`].
#[derive(Debug)] #[derive(Debug)]
@ -107,51 +109,6 @@ impl Behaviour {
.collect() .collect()
} }
fn on_connection_established(
&mut self,
ConnectionEstablished {
peer_id,
connection_id,
endpoint: connected_point,
..
}: ConnectionEstablished,
) {
if connected_point.is_relayed() {
if connected_point.is_listener() && !self.direct_connections.contains_key(&peer_id) {
// TODO: Try dialing the remote peer directly. Specification:
//
// > The protocol starts with the completion of a relay connection from A to B. Upon
// observing the new connection, the inbound peer (here B) checks the addresses
// advertised by A via identify. If that set includes public addresses, then A may
// be reachable by a direct connection, in which case B attempts a unilateral
// connection upgrade by initiating a direct connection to A.
//
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
self.queued_events.extend([
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: Either::Left(handler::relayed::Command::Connect {
obs_addrs: self.observed_addresses(),
}),
},
ToSwarm::GenerateEvent(Event::InitiatedDirectConnectionUpgrade {
remote_peer_id: peer_id,
local_relayed_addr: match connected_point {
ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(),
ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."),
},
}),
]);
}
} else {
self.direct_connections
.entry(peer_id)
.or_default()
.insert(connection_id);
}
}
fn on_dial_failure( fn on_dial_failure(
&mut self, &mut self,
DialFailure { DialFailure {
@ -188,22 +145,15 @@ impl Behaviour {
self.queued_events.push_back(ToSwarm::NotifyHandler { self.queued_events.push_back(ToSwarm::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id), handler: NotifyHandler::One(relayed_connection_id),
peer_id, peer_id,
event: Either::Left(handler::relayed::Command::Connect { event: Either::Left(handler::relayed::Command::Connect),
obs_addrs: self.observed_addresses(),
}),
}) })
} else { } else {
self.queued_events.extend([ self.queued_events.extend([ToSwarm::GenerateEvent(
ToSwarm::NotifyHandler { Event::DirectConnectionUpgradeFailed {
peer_id,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive),
},
ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id, remote_peer_id: peer_id,
error: Error::Dial, error: Error::Dial,
}), },
]); )]);
} }
} }
@ -239,18 +189,32 @@ impl NetworkBehaviour for Behaviour {
fn handle_established_inbound_connection( fn handle_established_inbound_connection(
&mut self, &mut self,
connection_id: ConnectionId, connection_id: ConnectionId,
_peer: PeerId, peer: PeerId,
local_addr: &Multiaddr, local_addr: &Multiaddr,
remote_addr: &Multiaddr, remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> { ) -> Result<THandler<Self>, ConnectionDenied> {
if is_relayed(local_addr) { if is_relayed(local_addr) {
return Ok(Either::Left(handler::relayed::Handler::new( let connected_point = ConnectedPoint::Listener {
ConnectedPoint::Listener { local_addr: local_addr.clone(),
local_addr: local_addr.clone(), send_back_addr: remote_addr.clone(),
send_back_addr: remote_addr.clone(), };
let mut handler =
handler::relayed::Handler::new(connected_point, self.observed_addresses());
handler.on_behaviour_event(handler::relayed::Command::Connect);
self.queued_events.extend([ToSwarm::GenerateEvent(
Event::InitiatedDirectConnectionUpgrade {
remote_peer_id: peer,
local_relayed_addr: local_addr.clone(),
}, },
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. )]);
return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
} }
self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);
assert!( assert!(
self.direct_to_relayed_connections self.direct_to_relayed_connections
@ -275,9 +239,15 @@ impl NetworkBehaviour for Behaviour {
address: addr.clone(), address: addr.clone(),
role_override, role_override,
}, },
self.observed_addresses(),
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. ))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
} }
self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);
// Whether this is a connection requested by this behaviour. // Whether this is a connection requested by this behaviour.
if let Some(&relayed_connection_id) = self.direct_to_relayed_connections.get(&connection_id) if let Some(&relayed_connection_id) = self.direct_to_relayed_connections.get(&connection_id)
{ {
@ -290,16 +260,11 @@ impl NetworkBehaviour for Behaviour {
); );
} }
self.queued_events.extend([ self.queued_events.extend([ToSwarm::GenerateEvent(
ToSwarm::NotifyHandler { Event::DirectConnectionUpgradeSucceeded {
peer_id: peer,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive),
},
ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeSucceeded {
remote_peer_id: peer, remote_peer_id: peer,
}), },
]); )]);
} }
Ok(Either::Right(dummy::ConnectionHandler)) Ok(Either::Right(dummy::ConnectionHandler))
@ -323,24 +288,13 @@ impl NetworkBehaviour for Behaviour {
}; };
match handler_event { match handler_event {
Either::Left(handler::relayed::Event::InboundConnectRequest { Either::Left(handler::relayed::Event::InboundConnectRequest { remote_addr }) => {
inbound_connect, self.queued_events.extend([ToSwarm::GenerateEvent(
remote_addr, Event::RemoteInitiatedDirectConnectionUpgrade {
}) => {
self.queued_events.extend([
ToSwarm::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id),
peer_id: event_source,
event: Either::Left(handler::relayed::Command::AcceptInboundConnect {
inbound_connect,
obs_addrs: self.observed_addresses(),
}),
},
ToSwarm::GenerateEvent(Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id: event_source, remote_peer_id: event_source,
remote_relayed_addr: remote_addr, remote_relayed_addr: remote_addr,
}), },
]); )]);
} }
Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => {
self.queued_events.push_back(ToSwarm::GenerateEvent( self.queued_events.push_back(ToSwarm::GenerateEvent(
@ -407,14 +361,12 @@ impl NetworkBehaviour for Behaviour {
self.external_addresses.on_swarm_event(&event); self.external_addresses.on_swarm_event(&event);
match event { match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.on_connection_established(connection_established)
}
FromSwarm::ConnectionClosed(connection_closed) => { FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed) self.on_connection_closed(connection_closed)
} }
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure), FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::AddressChange(_) FromSwarm::AddressChange(_)
| FromSwarm::ConnectionEstablished(_)
| FromSwarm::ListenFailure(_) | FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_) | FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_) | FromSwarm::NewListenAddr(_)

View File

@ -20,11 +20,11 @@
//! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection. //! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection.
use crate::behaviour_impl::MAX_NUMBER_OF_UPGRADE_ATTEMPTS;
use crate::protocol; use crate::protocol;
use either::Either; use either::Either;
use futures::future; use futures::future;
use futures::future::{BoxFuture, FutureExt}; use futures::future::{BoxFuture, FutureExt};
use instant::Instant;
use libp2p_core::multiaddr::Multiaddr; use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_core::ConnectedPoint; use libp2p_core::ConnectedPoint;
@ -36,48 +36,16 @@ use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
}; };
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration;
#[derive(Debug)]
pub enum Command { pub enum Command {
Connect { Connect,
obs_addrs: Vec<Multiaddr>,
},
AcceptInboundConnect {
obs_addrs: Vec<Multiaddr>,
inbound_connect: Box<protocol::inbound::PendingConnect>,
},
/// Upgrading the relayed connection to a direct connection either failed for good or succeeded.
/// There is no need to keep the relayed connection alive for the sake of upgrading to a direct
/// connection.
UpgradeFinishedDontKeepAlive,
}
impl fmt::Debug for Command {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Command::Connect { obs_addrs } => f
.debug_struct("Command::Connect")
.field("obs_addrs", obs_addrs)
.finish(),
Command::AcceptInboundConnect {
obs_addrs,
inbound_connect: _,
} => f
.debug_struct("Command::AcceptInboundConnect")
.field("obs_addrs", obs_addrs)
.finish(),
Command::UpgradeFinishedDontKeepAlive => f
.debug_struct("Command::UpgradeFinishedDontKeepAlive")
.finish(),
}
}
} }
#[derive(Debug)]
pub enum Event { pub enum Event {
InboundConnectRequest { InboundConnectRequest {
inbound_connect: Box<protocol::inbound::PendingConnect>,
remote_addr: Multiaddr, remote_addr: Multiaddr,
}, },
InboundNegotiationFailed { InboundNegotiationFailed {
@ -92,36 +60,6 @@ pub enum Event {
}, },
} }
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Event::InboundConnectRequest {
inbound_connect: _,
remote_addr,
} => f
.debug_struct("Event::InboundConnectRequest")
.field("remote_addrs", remote_addr)
.finish(),
Event::InboundNegotiationFailed { error } => f
.debug_struct("Event::InboundNegotiationFailed")
.field("error", error)
.finish(),
Event::InboundConnectNegotiated(addrs) => f
.debug_tuple("Event::InboundConnectNegotiated")
.field(addrs)
.finish(),
Event::OutboundNegotiationFailed { error } => f
.debug_struct("Event::OutboundNegotiationFailed")
.field("error", error)
.finish(),
Event::OutboundConnectNegotiated { remote_addrs } => f
.debug_struct("Event::OutboundConnectNegotiated")
.field("remote_addrs", remote_addrs)
.finish(),
}
}
}
pub struct Handler { pub struct Handler {
endpoint: ConnectedPoint, endpoint: ConnectedPoint,
/// A pending fatal error that results in the connection being closed. /// A pending fatal error that results in the connection being closed.
@ -142,17 +80,22 @@ pub struct Handler {
/// Inbound connect, accepted by the behaviour, pending completion. /// Inbound connect, accepted by the behaviour, pending completion.
inbound_connect: inbound_connect:
Option<BoxFuture<'static, Result<Vec<Multiaddr>, protocol::inbound::UpgradeError>>>, Option<BoxFuture<'static, Result<Vec<Multiaddr>, protocol::inbound::UpgradeError>>>,
keep_alive: KeepAlive,
/// The addresses we will send to the other party for hole-punching attempts.
holepunch_candidates: Vec<Multiaddr>,
attempts: u8,
} }
impl Handler { impl Handler {
pub fn new(endpoint: ConnectedPoint) -> Self { pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec<Multiaddr>) -> Self {
Self { Self {
endpoint, endpoint,
pending_error: Default::default(), pending_error: Default::default(),
queued_events: Default::default(), queued_events: Default::default(),
inbound_connect: Default::default(), inbound_connect: Default::default(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)), holepunch_candidates,
attempts: 0,
} }
} }
@ -167,17 +110,29 @@ impl Handler {
) { ) {
match output { match output {
future::Either::Left(inbound_connect) => { future::Either::Left(inbound_connect) => {
if self
.inbound_connect
.replace(
inbound_connect
.accept(self.holepunch_candidates.clone())
.boxed(),
)
.is_some()
{
log::warn!(
"New inbound connect stream while still upgrading previous one. \
Replacing previous with new.",
);
}
let remote_addr = match &self.endpoint { let remote_addr = match &self.endpoint {
ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), ConnectedPoint::Dialer { address, role_override: _ } => address.clone(),
ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."), ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."),
}; };
self.queued_events self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour( .push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::InboundConnectRequest { Event::InboundConnectRequest { remote_addr },
inbound_connect: Box::new(inbound_connect),
remote_addr,
},
)); ));
self.attempts += 1;
} }
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated. // A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
future::Either::Right(output) => void::unreachable(output), future::Either::Right(output) => void::unreachable(output),
@ -226,8 +181,6 @@ impl Handler {
<Self as ConnectionHandler>::OutboundProtocol, <Self as ConnectionHandler>::OutboundProtocol,
>, >,
) { ) {
self.keep_alive = KeepAlive::No;
match error { match error {
StreamUpgradeError::Timeout => { StreamUpgradeError::Timeout => {
self.queued_events self.queued_events
@ -286,38 +239,33 @@ impl ConnectionHandler for Handler {
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event { match event {
Command::Connect { obs_addrs } => { Command::Connect => {
self.queued_events self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new( protocol: SubstreamProtocol::new(
protocol::outbound::Upgrade::new(obs_addrs), protocol::outbound::Upgrade::new(self.holepunch_candidates.clone()),
(), (),
), ),
}); });
} self.attempts += 1;
Command::AcceptInboundConnect {
inbound_connect,
obs_addrs,
} => {
if self
.inbound_connect
.replace(inbound_connect.accept(obs_addrs).boxed())
.is_some()
{
log::warn!(
"New inbound connect stream while still upgrading previous one. \
Replacing previous with new.",
);
}
}
Command::UpgradeFinishedDontKeepAlive => {
self.keep_alive = KeepAlive::No;
} }
} }
} }
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive if !self.queued_events.is_empty() {
return KeepAlive::Yes;
}
if self.inbound_connect.is_some() {
return KeepAlive::Yes;
}
if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
return KeepAlive::Yes;
}
KeepAlive::No
} }
fn poll( fn poll(