feat(swarm)!: introduce ListenError (#3375)

In case an error happens for an outgoing connection, `Pool` reports an `OutgoingConnectionError`. This one is mapped to a `DialError` and reported via `SwarmEvent::OutgoingConnectionError` and `FromSwarm::DialFailure`.

For incoming connections, we didn't quite do the same thing. For one, `SwarmEvent::IncomingConnectionError` directly contained a `PendingInboundConnectionError`. Two, `FromSwarm::ListenFailure` did not include an error at all.

With this patch, we now introduce a `ListenError` enum which we use in `SwarmEvent::IncomingConnectionError` and we pass a reference to it along in `FromSwarm::ListenFailure`.
This commit is contained in:
Thomas Eizinger 2023-01-27 10:23:55 +11:00 committed by GitHub
parent e55202200c
commit d1336a7d81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 43 deletions

View File

@ -214,37 +214,35 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
match error { match error {
libp2p_core::transport::TransportError::MultiaddrNotSupported( libp2p_core::transport::TransportError::MultiaddrNotSupported(
_, _,
) => record( ) => {
OutgoingConnectionErrorError::TransportMultiaddrNotSupported, record(OutgoingConnectionError::TransportMultiaddrNotSupported)
), }
libp2p_core::transport::TransportError::Other(_) => { libp2p_core::transport::TransportError::Other(_) => {
record(OutgoingConnectionErrorError::TransportOther) record(OutgoingConnectionError::TransportOther)
} }
}; };
} }
} }
libp2p_swarm::DialError::Banned => record(OutgoingConnectionErrorError::Banned), libp2p_swarm::DialError::Banned => record(OutgoingConnectionError::Banned),
libp2p_swarm::DialError::ConnectionLimit(_) => { libp2p_swarm::DialError::ConnectionLimit(_) => {
record(OutgoingConnectionErrorError::ConnectionLimit) record(OutgoingConnectionError::ConnectionLimit)
} }
libp2p_swarm::DialError::LocalPeerId { .. } => { libp2p_swarm::DialError::LocalPeerId { .. } => {
record(OutgoingConnectionErrorError::LocalPeerId) record(OutgoingConnectionError::LocalPeerId)
} }
libp2p_swarm::DialError::NoAddresses => { libp2p_swarm::DialError::NoAddresses => {
record(OutgoingConnectionErrorError::NoAddresses) record(OutgoingConnectionError::NoAddresses)
} }
libp2p_swarm::DialError::DialPeerConditionFalse(_) => { libp2p_swarm::DialError::DialPeerConditionFalse(_) => {
record(OutgoingConnectionErrorError::DialPeerConditionFalse) record(OutgoingConnectionError::DialPeerConditionFalse)
}
libp2p_swarm::DialError::Aborted => {
record(OutgoingConnectionErrorError::Aborted)
} }
libp2p_swarm::DialError::Aborted => record(OutgoingConnectionError::Aborted),
libp2p_swarm::DialError::InvalidPeerId { .. } => { libp2p_swarm::DialError::InvalidPeerId { .. } => {
record(OutgoingConnectionErrorError::InvalidPeerId) record(OutgoingConnectionError::InvalidPeerId)
} }
libp2p_swarm::DialError::WrongPeerId { .. } => { libp2p_swarm::DialError::WrongPeerId { .. } => {
record(OutgoingConnectionErrorError::WrongPeerId) record(OutgoingConnectionError::WrongPeerId)
} }
}; };
} }
@ -325,7 +323,7 @@ impl From<&libp2p_core::ConnectedPoint> for Role {
#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct OutgoingConnectionErrorLabels { struct OutgoingConnectionErrorLabels {
peer: PeerStatus, peer: PeerStatus,
error: OutgoingConnectionErrorError, error: OutgoingConnectionError,
} }
#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Copy, Debug)] #[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Copy, Debug)]
@ -335,7 +333,7 @@ enum PeerStatus {
} }
#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum OutgoingConnectionErrorError { enum OutgoingConnectionError {
Banned, Banned,
ConnectionLimit, ConnectionLimit,
LocalPeerId, LocalPeerId,
@ -350,12 +348,12 @@ enum OutgoingConnectionErrorError {
#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct IncomingConnectionErrorLabels { struct IncomingConnectionErrorLabels {
error: PendingInboundConnectionError, error: IncomingConnectionError,
protocols: String, protocols: String,
} }
#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum PendingInboundConnectionError { enum IncomingConnectionError {
WrongPeerId, WrongPeerId,
LocalPeerId, LocalPeerId,
TransportErrorMultiaddrNotSupported, TransportErrorMultiaddrNotSupported,
@ -364,27 +362,21 @@ enum PendingInboundConnectionError {
ConnectionLimit, ConnectionLimit,
} }
impl From<&libp2p_swarm::PendingInboundConnectionError> for PendingInboundConnectionError { impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self { fn from(error: &libp2p_swarm::ListenError) -> Self {
match error { match error {
libp2p_swarm::PendingInboundConnectionError::WrongPeerId { .. } => { libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
PendingInboundConnectionError::WrongPeerId libp2p_swarm::ListenError::ConnectionLimit(_) => {
IncomingConnectionError::ConnectionLimit
} }
libp2p_swarm::PendingInboundConnectionError::LocalPeerId { .. } => { libp2p_swarm::ListenError::LocalPeerId { .. } => IncomingConnectionError::LocalPeerId,
PendingInboundConnectionError::LocalPeerId libp2p_swarm::ListenError::Transport(
}
libp2p_swarm::PendingInboundConnectionError::ConnectionLimit(_) => {
PendingInboundConnectionError::ConnectionLimit
}
libp2p_swarm::PendingInboundConnectionError::Transport(
libp2p_core::transport::TransportError::MultiaddrNotSupported(_), libp2p_core::transport::TransportError::MultiaddrNotSupported(_),
) => PendingInboundConnectionError::TransportErrorMultiaddrNotSupported, ) => IncomingConnectionError::TransportErrorMultiaddrNotSupported,
libp2p_swarm::PendingInboundConnectionError::Transport( libp2p_swarm::ListenError::Transport(
libp2p_core::transport::TransportError::Other(_), libp2p_core::transport::TransportError::Other(_),
) => PendingInboundConnectionError::TransportErrorOther, ) => IncomingConnectionError::TransportErrorOther,
libp2p_swarm::PendingInboundConnectionError::Aborted => { libp2p_swarm::ListenError::Aborted => IncomingConnectionError::Aborted,
PendingInboundConnectionError::Aborted
}
} }
} }
} }

View File

@ -344,6 +344,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
self.#i.on_swarm_event(#from_swarm::ListenFailure(#listen_failure { self.#i.on_swarm_event(#from_swarm::ListenFailure(#listen_failure {
local_addr, local_addr,
send_back_addr, send_back_addr,
error,
handler, handler,
})); }));
}, },
@ -351,6 +352,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
self.#enum_n.on_swarm_event(#from_swarm::ListenFailure(#listen_failure { self.#enum_n.on_swarm_event(#from_swarm::ListenFailure(#listen_failure {
local_addr, local_addr,
send_back_addr, send_back_addr,
error,
handler, handler,
})); }));
}, },
@ -747,7 +749,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#dial_failure { peer_id, handler: handlers, error }) #dial_failure { peer_id, handler: handlers, error })
=> { #(#on_dial_failure_stmts)* } => { #(#on_dial_failure_stmts)* }
#from_swarm::ListenFailure( #from_swarm::ListenFailure(
#listen_failure { local_addr, send_back_addr, handler: handlers }) #listen_failure { local_addr, send_back_addr, handler: handlers, error })
=> { #(#on_listen_failure_stmts)* } => { #(#on_listen_failure_stmts)* }
#from_swarm::NewListener( #from_swarm::NewListener(
#new_listener { listener_id }) #new_listener { listener_id })

View File

@ -33,6 +33,9 @@
This was never constructed. This was never constructed.
See [PR 3374]. See [PR 3374].
- Introduce `ListenError` and use it within `SwarmEvent::IncomingConnectionError`.
See [PR 3375].
[PR 3364]: https://github.com/libp2p/rust-libp2p/pull/3364 [PR 3364]: https://github.com/libp2p/rust-libp2p/pull/3364
[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
@ -44,6 +47,7 @@
[PR 3377]: https://github.com/libp2p/rust-libp2p/pull/3377 [PR 3377]: https://github.com/libp2p/rust-libp2p/pull/3377
[PR 3373]: https://github.com/libp2p/rust-libp2p/pull/3373 [PR 3373]: https://github.com/libp2p/rust-libp2p/pull/3373
[PR 3374]: https://github.com/libp2p/rust-libp2p/pull/3374 [PR 3374]: https://github.com/libp2p/rust-libp2p/pull/3374
[PR 3375]: https://github.com/libp2p/rust-libp2p/pull/3375
# 0.41.1 # 0.41.1

View File

@ -29,7 +29,7 @@ pub use listen_addresses::ListenAddresses;
use crate::connection::ConnectionId; use crate::connection::ConnectionId;
use crate::dial_opts::DialOpts; use crate::dial_opts::DialOpts;
use crate::handler::{ConnectionHandler, IntoConnectionHandler}; use crate::handler::{ConnectionHandler, IntoConnectionHandler};
use crate::{AddressRecord, AddressScore, DialError, THandlerOutEvent}; use crate::{AddressRecord, AddressScore, DialError, ListenError, THandlerOutEvent};
use libp2p_core::{transport::ListenerId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_core::{transport::ListenerId, ConnectedPoint, Multiaddr, PeerId};
use std::{task::Context, task::Poll}; use std::{task::Context, task::Poll};
@ -742,6 +742,7 @@ pub struct DialFailure<'a, Handler> {
pub struct ListenFailure<'a, Handler> { pub struct ListenFailure<'a, Handler> {
pub local_addr: &'a Multiaddr, pub local_addr: &'a Multiaddr,
pub send_back_addr: &'a Multiaddr, pub send_back_addr: &'a Multiaddr,
pub error: &'a ListenError,
pub handler: Handler, pub handler: Handler,
} }
@ -870,10 +871,12 @@ impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> {
FromSwarm::ListenFailure(ListenFailure { FromSwarm::ListenFailure(ListenFailure {
local_addr, local_addr,
send_back_addr, send_back_addr,
error,
handler, handler,
}) => Some(FromSwarm::ListenFailure(ListenFailure { }) => Some(FromSwarm::ListenFailure(ListenFailure {
local_addr, local_addr,
send_back_addr, send_back_addr,
error,
handler: map_into_handler(handler)?, handler: map_into_handler(handler)?,
})), })),
FromSwarm::NewListener(NewListener { listener_id }) => { FromSwarm::NewListener(NewListener { listener_id }) => {

View File

@ -376,7 +376,7 @@ impl<'a> IncomingInfo<'a> {
} }
/// Information about a connection limit. /// Information about a connection limit.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Copy)]
pub struct ConnectionLimit { pub struct ConnectionLimit {
/// The maximum number of connections. /// The maximum number of connections.
pub limit: u32, pub limit: u32,

View File

@ -221,7 +221,7 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Address used to send back data to the remote. /// Address used to send back data to the remote.
send_back_addr: Multiaddr, send_back_addr: Multiaddr,
}, },
/// An error happened on a connection during its initial handshake. /// An error happened on an inbound connection during its initial handshake.
/// ///
/// This can include, for example, an error during the handshake of the encryption layer, or /// This can include, for example, an error during the handshake of the encryption layer, or
/// the connection unexpectedly closed. /// the connection unexpectedly closed.
@ -233,9 +233,9 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Address used to send back data to the remote. /// Address used to send back data to the remote.
send_back_addr: Multiaddr, send_back_addr: Multiaddr,
/// The error that happened. /// The error that happened.
error: PendingInboundConnectionError, error: ListenError,
}, },
/// Outgoing connection attempt failed. /// An error happened on an outbound connection.
OutgoingConnectionError { OutgoingConnectionError {
/// If known, [`PeerId`] of the peer we tried to reach. /// If known, [`PeerId`] of the peer we tried to reach.
peer_id: Option<PeerId>, peer_id: Option<PeerId>,
@ -850,11 +850,14 @@ where
error, error,
handler, handler,
} => { } => {
let error = error.into();
log::debug!("Incoming connection failed: {:?}", error); log::debug!("Incoming connection failed: {:?}", error);
self.behaviour self.behaviour
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: &local_addr, local_addr: &local_addr,
send_back_addr: &send_back_addr, send_back_addr: &send_back_addr,
error: &error,
handler, handler,
})); }));
return Some(SwarmEvent::IncomingConnectionError { return Some(SwarmEvent::IncomingConnectionError {
@ -970,10 +973,13 @@ where
}); });
} }
Err((connection_limit, handler)) => { Err((connection_limit, handler)) => {
let error = ListenError::ConnectionLimit(connection_limit);
self.behaviour self.behaviour
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: &local_addr, local_addr: &local_addr,
send_back_addr: &send_back_addr, send_back_addr: &send_back_addr,
error: &error,
handler, handler,
})); }));
log::warn!("Incoming connection rejected: {:?}", connection_limit); log::warn!("Incoming connection rejected: {:?}", connection_limit);
@ -1572,7 +1578,7 @@ where
} }
} }
/// The possible failures of dialing. /// Possible errors when trying to establish or upgrade an outbound connection.
#[derive(Debug)] #[derive(Debug)]
pub enum DialError { pub enum DialError {
/// The peer is currently banned. /// The peer is currently banned.
@ -1681,6 +1687,80 @@ impl error::Error for DialError {
} }
} }
/// Possible errors when upgrading an inbound connection.
#[derive(Debug)]
pub enum ListenError {
/// The configured limit for simultaneous outgoing connections
/// has been reached.
ConnectionLimit(ConnectionLimit),
/// Pending connection attempt has been aborted.
Aborted,
/// The peer identity obtained on the connection did not match the one that was expected.
WrongPeerId {
obtained: PeerId,
endpoint: ConnectedPoint,
},
/// The peer identity obtained on the connection did not match the one that was expected.
LocalPeerId { endpoint: ConnectedPoint },
/// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(TransportError<io::Error>),
}
impl From<PendingInboundConnectionError> for ListenError {
fn from(error: PendingInboundConnectionError) -> Self {
match error {
PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
PendingInboundConnectionError::ConnectionLimit(inner) => {
ListenError::ConnectionLimit(inner)
}
PendingInboundConnectionError::Aborted => ListenError::Aborted,
PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
ListenError::WrongPeerId { obtained, endpoint }
}
PendingInboundConnectionError::LocalPeerId { endpoint } => {
ListenError::LocalPeerId { endpoint }
}
}
}
}
impl fmt::Display for ListenError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ListenError::ConnectionLimit(_) => write!(f, "Listen error"),
ListenError::Aborted => write!(
f,
"Listen error: Pending connection attempt has been aborted."
),
ListenError::WrongPeerId { obtained, endpoint } => write!(
f,
"Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
),
ListenError::Transport(_) => {
write!(f, "Listen error: Failed to negotiate transport protocol(s)")
}
ListenError::LocalPeerId { endpoint } => {
write!(
f,
"Listen error: Pending connection: Local peer ID at {endpoint:?}."
)
}
}
}
}
impl error::Error for ListenError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
ListenError::ConnectionLimit(err) => Some(err),
ListenError::WrongPeerId { .. } => None,
ListenError::Transport(err) => Some(err),
ListenError::Aborted => None,
ListenError::LocalPeerId { .. } => None,
}
}
}
/// Information about the connections obtained by [`Swarm::network_info()`]. /// Information about the connections obtained by [`Swarm::network_info()`].
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct NetworkInfo { pub struct NetworkInfo {
@ -2327,7 +2407,7 @@ mod tests {
network_1_established = true; network_1_established = true;
} }
Poll::Ready(Some(SwarmEvent::IncomingConnectionError { Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
error: PendingConnectionError::ConnectionLimit(err), error: ListenError::ConnectionLimit(err),
.. ..
})) => { })) => {
assert_eq!(err.limit, limit); assert_eq!(err.limit, limit);