feat(swarm)!: allow NetworkBehaviours to manage connections

Previously, a `ConnectionHandler` was immediately requested from the `NetworkBehaviour` as soon as a new dial was initiated or a new incoming connection accepted.

With this patch, we delay the creation of the handler until the connection is actually established and fully upgraded, i.e authenticated and multiplexed.

As a consequence, `NetworkBehaviour::new_handler` is now deprecated in favor of a new set of callbacks:

- `NetworkBehaviour::handle_pending_inbound_connection`
- `NetworkBehaviour::handle_pending_outbound_connection`
- `NetworkBehaviour::handle_established_inbound_connection`
- `NetworkBehaviour::handle_established_outbound_connection`

All callbacks are fallible, allowing the `NetworkBehaviour` to abort the connection either immediately or after it is fully established. All callbacks also receive a `ConnectionId` parameter which uniquely identifies the connection. For example, in case a `NetworkBehaviour` issues a dial via `NetworkBehaviourAction::Dial`, it can unambiguously detect this dial in these lifecycle callbacks via the `ConnectionId`.

Finally, `NetworkBehaviour::handle_pending_outbound_connection` also replaces `NetworkBehaviour::addresses_of_peer` by allowing the behaviour to return more addresses to be used for the dial.

Resolves #2824.

Pull-Request: #3254.
This commit is contained in:
Thomas Eizinger
2023-02-24 10:43:33 +11:00
committed by GitHub
parent 794b2a23d0
commit 19a554965f
42 changed files with 1543 additions and 540 deletions

View File

@ -85,18 +85,21 @@ pub mod derive_prelude {
pub use crate::behaviour::NewListenAddr;
pub use crate::behaviour::NewListener;
pub use crate::connection::ConnectionId;
pub use crate::ConnectionDenied;
pub use crate::ConnectionHandler;
pub use crate::ConnectionHandlerSelect;
pub use crate::DialError;
pub use crate::IntoConnectionHandler;
pub use crate::IntoConnectionHandlerSelect;
pub use crate::NetworkBehaviour;
pub use crate::NetworkBehaviourAction;
pub use crate::PollParameters;
pub use crate::THandler;
pub use crate::THandlerInEvent;
pub use crate::THandlerOutEvent;
pub use either::Either;
pub use futures::prelude as futures;
pub use libp2p_core::transport::ListenerId;
pub use libp2p_core::ConnectedPoint;
pub use libp2p_core::Endpoint;
pub use libp2p_core::Multiaddr;
pub use libp2p_core::PeerId;
}
@ -110,15 +113,18 @@ pub use behaviour::{
pub use connection::pool::{ConnectionCounters, ConnectionLimits};
pub use connection::{ConnectionError, ConnectionId, ConnectionLimit};
pub use executor::Executor;
#[allow(deprecated)]
pub use handler::IntoConnectionHandler;
pub use handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr,
IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler,
OneShotHandlerConfig, SubstreamProtocol,
IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, OneShotHandlerConfig,
SubstreamProtocol,
};
#[cfg(feature = "macros")]
pub use libp2p_swarm_derive::NetworkBehaviour;
pub use registry::{AddAddressResult, AddressRecord, AddressScore};
use crate::handler::UpgradeInfoSend;
use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
use connection::IncomingInfo;
use connection::{
@ -133,8 +139,7 @@ use libp2p_core::{
multihash::Multihash,
muxing::StreamMuxerBox,
transport::{self, ListenerId, TransportError, TransportEvent},
upgrade::ProtocolName,
Endpoint, Multiaddr, Negotiated, PeerId, Transport,
Endpoint, Multiaddr, Negotiated, PeerId, ProtocolName, Transport,
};
use registry::{AddressIntoIter, Addresses};
use smallvec::SmallVec;
@ -146,7 +151,6 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use upgrade::UpgradeInfoSend as _;
/// Substream for which a protocol has been chosen.
///
@ -159,20 +163,19 @@ type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::OutEvent
/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
/// supports.
type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
#[allow(deprecated)]
pub type THandler<TBehaviour> =
<<TBehaviour as NetworkBehaviour>::ConnectionHandler as IntoConnectionHandler>::Handler;
/// Custom event that can be received by the [`ConnectionHandler`] of the
/// [`NetworkBehaviour`].
pub type THandlerInEvent<TBehaviour> =
<<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::InEvent;
/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
pub type THandlerOutEvent<TBehaviour> =
<<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;
pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::OutEvent;
/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
type THandlerErr<TBehaviour> =
<<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;
pub type THandlerErr<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::Error;
/// Event generated by the `Swarm`.
#[derive(Debug)]
@ -542,21 +545,46 @@ where
}
let addresses = {
let mut addresses = dial_opts.get_addresses();
let mut addresses_from_opts = dial_opts.get_addresses();
if let Some(peer_id) = peer_id {
if dial_opts.extend_addresses_through_behaviour() {
addresses.extend(self.behaviour.addresses_of_peer(&peer_id));
match self.behaviour.handle_pending_outbound_connection(
connection_id,
peer_id,
addresses_from_opts.as_slice(),
dial_opts.role_override(),
) {
Ok(addresses) => {
if dial_opts.extend_addresses_through_behaviour() {
addresses_from_opts.extend(addresses)
} else {
let num_addresses = addresses.len();
if num_addresses > 0 {
log::debug!("discarding {num_addresses} addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection {connection_id:?}")
}
}
}
Err(cause) => {
let error = DialError::Denied { cause };
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
error: &error,
connection_id,
}));
return Err(error);
}
}
let mut unique_addresses = HashSet::new();
addresses.retain(|addr| {
addresses_from_opts.retain(|addr| {
!self.listened_addrs.values().flatten().any(|a| a == addr)
&& unique_addresses.insert(addr.clone())
});
if addresses.is_empty() {
if addresses_from_opts.is_empty() {
let error = DialError::NoAddresses;
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
@ -567,7 +595,7 @@ where
return Err(error);
};
addresses
addresses_from_opts
};
let dials = addresses
@ -756,15 +784,76 @@ where
established_in,
} => {
if self.banned_peers.contains(&peer_id) {
self.pool.close_connection(connection);
return Some(SwarmEvent::BannedPeer { peer_id, endpoint });
}
let handler = self
.behaviour
.new_handler()
.into_handler(&peer_id, &endpoint);
let handler = match endpoint.clone() {
ConnectedPoint::Dialer {
address,
role_override,
} => {
match self.behaviour.handle_established_outbound_connection(
id,
peer_id,
&address,
role_override,
) {
Ok(handler) => handler,
Err(cause) => {
let dial_error = DialError::Denied { cause };
self.behaviour.on_swarm_event(FromSwarm::DialFailure(
DialFailure {
connection_id: id,
error: &dial_error,
peer_id: Some(peer_id),
},
));
return Some(SwarmEvent::OutgoingConnectionError {
peer_id: Some(peer_id),
error: dial_error,
});
}
}
}
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => {
match self.behaviour.handle_established_inbound_connection(
id,
peer_id,
&local_addr,
&send_back_addr,
) {
Ok(handler) => handler,
Err(cause) => {
let listen_error = ListenError::Denied { cause };
self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
ListenFailure {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
error: &listen_error,
connection_id: id,
},
));
return Some(SwarmEvent::IncomingConnectionError {
send_back_addr,
local_addr,
error: listen_error,
});
}
}
}
};
let supported_protocols = handler
.listen_protocol()
.upgrade()
.protocol_info()
.map(|p| p.protocol_name().to_owned())
.collect();
let other_established_connection_ids = self
.pool
.iter_established_connections_of_peer(&peer_id)
@ -802,6 +891,7 @@ where
other_established: other_established_connection_ids.len(),
},
));
self.supported_protocols = supported_protocols;
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
@ -938,6 +1028,31 @@ where
} => {
let connection_id = ConnectionId::next();
match self.behaviour.handle_pending_inbound_connection(
connection_id,
&local_addr,
&send_back_addr,
) {
Ok(()) => {}
Err(cause) => {
let listen_error = ListenError::Denied { cause };
self.behaviour
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
error: &listen_error,
connection_id,
}));
return Some(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error: listen_error,
});
}
}
match self.pool.add_incoming(
upgrade,
IncomingInfo {
@ -1277,8 +1392,7 @@ fn notify_any<THandler, TBehaviour>(
) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
where
TBehaviour: NetworkBehaviour,
THandler: IntoConnectionHandler,
THandler::Handler: ConnectionHandler<
THandler: ConnectionHandler<
InEvent = THandlerInEvent<TBehaviour>,
OutEvent = THandlerOutEvent<TBehaviour>,
>,
@ -1532,21 +1646,13 @@ where
}
/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self
.behaviour
.new_handler()
.inbound_protocol()
.protocol_info()
.map(|info| info.protocol_name().to_vec())
.collect();
pub fn build(self) -> Swarm<TBehaviour> {
Swarm {
local_peer_id: self.local_peer_id,
transport: self.transport,
pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits),
behaviour: self.behaviour,
supported_protocols,
supported_protocols: Default::default(),
listened_addrs: HashMap::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
@ -1564,7 +1670,9 @@ pub enum DialError {
/// has been reached.
ConnectionLimit(ConnectionLimit),
/// The peer identity obtained on the connection matches the local peer.
LocalPeerId { endpoint: ConnectedPoint },
LocalPeerId {
endpoint: ConnectedPoint,
},
/// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
/// for the peer to dial.
NoAddresses,
@ -1580,6 +1688,9 @@ pub enum DialError {
obtained: PeerId,
endpoint: ConnectedPoint,
},
Denied {
cause: ConnectionDenied,
},
/// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
}
@ -1634,6 +1745,9 @@ impl fmt::Display for DialError {
Ok(())
}
DialError::Denied { .. } => {
write!(f, "Dial error")
}
}
}
}
@ -1660,6 +1774,7 @@ impl error::Error for DialError {
DialError::InvalidPeerId { .. } => None,
DialError::WrongPeerId { .. } => None,
DialError::Transport(_) => None,
DialError::Denied { cause } => Some(cause),
}
}
}
@ -1677,8 +1792,13 @@ pub enum ListenError {
obtained: PeerId,
endpoint: ConnectedPoint,
},
/// The peer identity obtained on the connection did not match the one that was expected.
LocalPeerId { endpoint: ConnectedPoint },
/// The connection was dropped because it resolved to our own [`PeerId`].
LocalPeerId {
endpoint: ConnectedPoint,
},
Denied {
cause: ConnectionDenied,
},
/// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(TransportError<io::Error>),
}
@ -1716,11 +1836,11 @@ impl fmt::Display for ListenError {
ListenError::Transport(_) => {
write!(f, "Listen error: Failed to negotiate transport protocol(s)")
}
ListenError::Denied { .. } => {
write!(f, "Listen error")
}
ListenError::LocalPeerId { endpoint } => {
write!(
f,
"Listen error: Pending connection: Local peer ID at {endpoint:?}."
)
write!(f, "Listen error: Local peer ID at {endpoint:?}.")
}
}
}
@ -1733,11 +1853,37 @@ impl error::Error for ListenError {
ListenError::WrongPeerId { .. } => None,
ListenError::Transport(err) => Some(err),
ListenError::Aborted => None,
ListenError::Denied { cause } => Some(cause),
ListenError::LocalPeerId { .. } => None,
}
}
}
#[derive(Debug)]
pub struct ConnectionDenied {
inner: Box<dyn error::Error + Send + Sync + 'static>,
}
impl ConnectionDenied {
pub fn new(cause: impl error::Error + Send + Sync + 'static) -> Self {
Self {
inner: Box::new(cause),
}
}
}
impl fmt::Display for ConnectionDenied {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "connection denied")
}
}
impl error::Error for ConnectionDenied {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(self.inner.as_ref())
}
}
/// Information about the connections obtained by [`Swarm::network_info()`].
#[derive(Clone, Debug)]
pub struct NetworkInfo {
@ -1845,7 +1991,7 @@ mod tests {
) -> bool
where
TBehaviour: NetworkBehaviour,
<<TBehaviour::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent: Clone,
THandlerOutEvent<TBehaviour>: Clone,
{
swarm1
.behaviour()
@ -1865,7 +2011,7 @@ mod tests {
) -> bool
where
TBehaviour: NetworkBehaviour,
<<TBehaviour::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent: Clone
THandlerOutEvent<TBehaviour>: Clone,
{
swarm1
.behaviour()
@ -1963,10 +2109,10 @@ mod tests {
// connection. Check that it was not reported to the behaviour of the
// banning swarm.
assert_eq!(
swarm2.behaviour.on_connection_established.len(),
s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
swarm2.behaviour.on_connection_established.len(),
s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
// Setup to test that the banned connection is not reported upon closing
// even if the peer is unbanned.