core/: Concurrent dial attempts (#2248)

Concurrently dial address candidates within a single dial attempt.

Main motivation for this feature is to increase success rate on hole punching
(see https://github.com/libp2p/rust-libp2p/issues/1896#issuecomment-885894496
for details). Though, as a nice side effect, as one would expect, it does
improve connection establishment time.

Cleanups and fixes done along the way:

- Merge `pool.rs` and `manager.rs`.

- Instead of manually implementing state machines in `task.rs` use
  `async/await`.

- Fix bug where `NetworkBehaviour::inject_connection_closed` is called without a
  previous `NetworkBehaviour::inject_connection_established` (see
  https://github.com/libp2p/rust-libp2p/issues/2242).

- Return handler to behaviour on incoming connection limit error. Missed in
  https://github.com/libp2p/rust-libp2p/issues/2242.
This commit is contained in:
Max Inden
2021-10-14 18:05:07 +02:00
committed by GitHub
parent c0d7d4a9eb
commit 40c5335e3b
36 changed files with 2242 additions and 2319 deletions

View File

@ -78,12 +78,12 @@ use libp2p_core::{
connection::{
ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
EstablishedConnection, IntoConnectionHandler, ListenerId, PendingConnectionError,
Substream,
PendingInboundConnectionError, PendingOutboundConnectionError, Substream,
},
muxing::StreamMuxerBox,
network::{
self, peer::ConnectedPeer, ConnectionLimits, DialAttemptsRemaining, Network, NetworkConfig,
NetworkEvent, NetworkInfo,
self, peer::ConnectedPeer, ConnectionLimits, Network, NetworkConfig, NetworkEvent,
NetworkInfo,
},
transport::{self, TransportError},
upgrade::ProtocolName,
@ -93,7 +93,7 @@ use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapperError};
use registry::{AddressIntoIter, Addresses};
use smallvec::SmallVec;
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
use std::{
error, fmt, io,
pin::Pin,
@ -141,6 +141,10 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Number of established connections to this peer, including the one that has just been
/// opened.
num_established: NonZeroU32,
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed concurrently. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
},
/// A connection with the given peer has been closed,
/// possibly as a result of an error.
@ -181,7 +185,14 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
/// The error that happened.
error: PendingConnectionError<io::Error>,
error: PendingInboundConnectionError<io::Error>,
},
/// Outgoing connection attempt failed.
OutgoingConnectionError {
/// If known, [`PeerId`] of the peer we tried to reach.
peer_id: Option<PeerId>,
/// Error that has been encountered.
error: DialError,
},
/// We connected to a peer, but we immediately closed the connection because that peer is banned.
BannedPeer {
@ -190,26 +201,6 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Endpoint of the connection that has been closed.
endpoint: ConnectedPoint,
},
/// Tried to dial an address but it ended up being unreachaable.
UnreachableAddr {
/// `PeerId` that we were trying to reach.
peer_id: PeerId,
/// Address that we failed to reach.
address: Multiaddr,
/// Error that has been encountered.
error: PendingConnectionError<io::Error>,
/// Number of remaining connection attempts that are being tried for this peer.
attempts_remaining: u32,
},
/// Tried to dial an address but it ended up being unreachaable.
/// Contrary to `UnreachableAddr`, we don't know the identity of the peer that we were trying
/// to reach.
UnknownPeerUnreachableAddr {
/// Address that we failed to reach.
address: Multiaddr,
/// Error that has been encountered.
error: PendingConnectionError<io::Error>,
},
/// One of our listeners has reported a new local listening address.
NewListenAddr {
/// The listener that is listening on the new address.
@ -245,10 +236,10 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
},
/// A new dialing attempt has been initiated.
///
/// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished)
/// event is reported if the dialing attempt succeeds, otherwise a
/// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported
/// with `attempts_remaining` equal to 0.
/// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
/// reported if the dialing attempt succeeds, otherwise a
/// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
/// is reported with `attempts_remaining` equal to 0.
Dialing(PeerId),
}
@ -364,38 +355,36 @@ where
if self.banned_peers.contains(peer_id) {
let error = DialError::Banned;
self.behaviour
.inject_dial_failure(peer_id, handler, error.clone());
.inject_dial_failure(Some(*peer_id), handler, &error);
return Err(error);
}
let self_listening = &self.listened_addrs;
let self_listening = self.listened_addrs.clone();
let mut addrs = self
.behaviour
.addresses_of_peer(peer_id)
.into_iter()
.filter(|a| !self_listening.contains(a));
.filter(move |a| !self_listening.contains(a))
.peekable();
let first = match addrs.next() {
Some(first) => first,
None => {
let error = DialError::NoAddresses;
self.behaviour
.inject_dial_failure(peer_id, handler, error.clone());
return Err(error);
}
if addrs.peek().is_none() {
let error = DialError::NoAddresses;
self.behaviour
.inject_dial_failure(Some(*peer_id), handler, &error);
return Err(error);
};
let handler = handler
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override);
match self.network.peer(*peer_id).dial(first, addrs, handler) {
match self.network.peer(*peer_id).dial(addrs, handler) {
Ok(_connection_id) => Ok(()),
Err(error) => {
let (error, handler) = DialError::from_network_dial_error(error);
self.behaviour.inject_dial_failure(
peer_id,
Some(*peer_id),
handler.into_protocols_handler(),
error.clone(),
&error,
);
Err(error)
}
@ -553,6 +542,7 @@ where
Poll::Ready(NetworkEvent::ConnectionEstablished {
connection,
num_established,
concurrent_dial_errors,
}) => {
let peer_id = connection.peer_id();
let endpoint = connection.endpoint().clone();
@ -565,15 +555,20 @@ where
return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
log::debug!(
"Connection established: {:?}; Total (peer): {}.",
connection.connected(),
"Connection established: {:?} {:?}; Total (peer): {}.",
connection.peer_id(),
connection.endpoint(),
num_established
);
let endpoint = connection.endpoint().clone();
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
this.behaviour.inject_connection_established(
&peer_id,
&connection.id(),
&endpoint,
failed_addresses.as_ref(),
);
if num_established.get() == 1 {
this.behaviour.inject_connected(&peer_id);
@ -582,6 +577,7 @@ where
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
}
@ -625,13 +621,22 @@ where
);
let local_addr = connection.local_addr.clone();
let send_back_addr = connection.send_back_addr.clone();
if let Err(e) = this.network.accept(connection, handler) {
log::warn!("Incoming connection rejected: {:?}", e);
match this.network.accept(connection, handler) {
Ok(_connection_id) => {
return Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
}
Err((connection_limit, handler)) => {
this.behaviour.inject_listen_failure(
&local_addr,
&send_back_addr,
handler.into_protocols_handler(),
);
log::warn!("Incoming connection rejected: {:?}", connection_limit);
}
}
return Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
}
Poll::Ready(NetworkEvent::NewListenerAddress {
listener_id,
@ -711,53 +716,39 @@ where
}
Poll::Ready(NetworkEvent::DialError {
peer_id,
multiaddr,
error,
attempts_remaining,
handler,
}) => {
this.behaviour
.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
let error = error.into();
let num_remaining: u32;
match attempts_remaining {
DialAttemptsRemaining::Some(n) => {
num_remaining = n.into();
}
DialAttemptsRemaining::None(handler) => {
num_remaining = 0;
this.behaviour.inject_dial_failure(
&peer_id,
handler.into_protocols_handler(),
DialError::UnreachableAddr(multiaddr.clone()),
);
}
}
log::debug!(
"Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
peer_id, multiaddr, error, num_remaining,
this.behaviour.inject_dial_failure(
Some(peer_id),
handler.into_protocols_handler(),
&error,
);
return Poll::Ready(SwarmEvent::UnreachableAddr {
log::debug!(
"Connection attempt to {:?} failed with {:?}.",
peer_id,
address: multiaddr,
error,
attempts_remaining: num_remaining,
);
return Poll::Ready(SwarmEvent::OutgoingConnectionError {
peer_id: Some(peer_id),
error,
});
}
Poll::Ready(NetworkEvent::UnknownPeerDialError {
multiaddr, error, ..
}) => {
log::debug!(
"Connection attempt to address {:?} of unknown peer failed with {:?}",
multiaddr,
error
Poll::Ready(NetworkEvent::UnknownPeerDialError { error, handler }) => {
log::debug!("Connection attempt to unknown peer failed with {:?}", error);
let error = error.into();
this.behaviour.inject_dial_failure(
None,
handler.into_protocols_handler(),
&error,
);
this.behaviour
.inject_addr_reach_failure(None, &multiaddr, &error);
return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
address: multiaddr,
error,
return Poll::Ready(SwarmEvent::OutgoingConnectionError {
peer_id: None,
error: error,
});
}
}
@ -827,29 +818,16 @@ where
return Poll::Ready(SwarmEvent::Dialing(peer_id));
}
} else {
// Even if the condition for a _new_ dialing attempt is not met,
// we always add any potentially new addresses of the peer to an
// ongoing dialing attempt, if there is one.
log::trace!(
"Condition for new dialing attempt to {:?} not met: {:?}",
peer_id,
condition
);
let self_listening = &this.listened_addrs;
if let Some(mut peer) = this.network.peer(peer_id).into_dialing() {
let addrs = this.behaviour.addresses_of_peer(peer.id());
let mut attempt = peer.some_attempt();
for a in addrs {
if !self_listening.contains(&a) {
attempt.add_address(a);
}
}
}
this.behaviour.inject_dial_failure(
&peer_id,
Some(peer_id),
handler,
DialError::DialPeerConditionFalse(condition),
&DialError::DialPeerConditionFalse(condition),
);
}
}
@ -962,6 +940,7 @@ fn notify_any<'a, TTrans, THandler, TBehaviour>(
) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
TTrans::Error: Send + 'static,
TBehaviour: NetworkBehaviour,
THandler: IntoConnectionHandler,
THandler::Handler: ConnectionHandler<
@ -1138,6 +1117,12 @@ where
self
}
/// Number of addresses concurrently dialed for a single outbound connection attempt.
pub fn dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.network_config = self.network_config.with_dial_concurrency_factor(factor);
self
}
/// Configures the connection limits.
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.network_config = self.network_config.with_connection_limits(limits);
@ -1201,17 +1186,13 @@ where
}
/// The possible failures of dialing.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum DialError {
/// The peer is currently banned.
Banned,
/// The configured limit for simultaneous outgoing connections
/// has been reached.
ConnectionLimit(ConnectionLimit),
/// The address given for dialing is invalid.
InvalidAddress(Multiaddr),
/// Tried to dial an address but it ended up being unreachaable.
UnreachableAddr(Multiaddr),
/// The peer being dialed is the local peer and thus the dial was aborted.
LocalPeerId,
/// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
@ -1219,6 +1200,15 @@ pub enum DialError {
NoAddresses,
/// The provided [`DialPeerCondition`] evaluated to false and thus the dial was aborted.
DialPeerConditionFalse(DialPeerCondition),
/// Pending connection attempt has been aborted.
Aborted,
/// The peer identity obtained on the connection did not
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,
/// An I/O error occurred on the connection.
ConnectionIo(io::Error),
/// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
}
impl DialError {
@ -1227,22 +1217,29 @@ impl DialError {
network::DialError::ConnectionLimit { limit, handler } => {
(DialError::ConnectionLimit(limit), handler)
}
network::DialError::InvalidAddress { address, handler } => {
(DialError::InvalidAddress(address), handler)
}
network::DialError::LocalPeerId { handler } => (DialError::LocalPeerId, handler),
}
}
}
impl From<PendingOutboundConnectionError<io::Error>> for DialError {
fn from(error: PendingOutboundConnectionError<io::Error>) -> Self {
match error {
PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit),
PendingConnectionError::Aborted => DialError::Aborted,
PendingConnectionError::InvalidPeerId => DialError::InvalidPeerId,
PendingConnectionError::IO(e) => DialError::ConnectionIo(e),
PendingConnectionError::Transport(e) => DialError::Transport(e),
}
}
}
impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::LocalPeerId => write!(f, "Dial error: tried to dial local peer id."),
DialError::InvalidAddress(a) => write!(f, "Dial error: invalid address: {}", a),
DialError::UnreachableAddr(a) => write!(f, "Dial error: unreachable address: {}", a),
DialError::Banned => write!(f, "Dial error: peer is banned."),
DialError::DialPeerConditionFalse(c) => {
write!(
@ -1251,6 +1248,16 @@ impl fmt::Display for DialError {
c
)
}
DialError::Aborted => write!(
f,
"Dial error: Pending connection attempt has been aborted."
),
DialError::InvalidPeerId => write!(f, "Dial error: Invalid peer ID."),
DialError::ConnectionIo(e) => write!(
f,
"Dial error: An I/O error occurred on the connection: {:?}.", e
),
DialError::Transport(e) => write!(f, "An error occurred while negotiating the transport protocol(s) on a connection: {:?}.", e),
}
}
}
@ -1259,12 +1266,14 @@ impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::InvalidAddress(_) => None,
DialError::UnreachableAddr(_) => None,
DialError::LocalPeerId => None,
DialError::NoAddresses => None,
DialError::Banned => None,
DialError::DialPeerConditionFalse(_) => None,
DialError::Aborted => None,
DialError::InvalidPeerId => None,
DialError::ConnectionIo(_) => None,
DialError::Transport(_) => None,
}
}
}