feat(swarm): expose ConnectionId and add conn duration metric

- Exposes the `ConnectionId` in the various `SwarmEvent` variants.
- Tracks connection duration in `libp2p-metrics::swarm`.

Pull-Request: #3927.
This commit is contained in:
Max Inden
2023-05-17 07:19:53 +02:00
committed by GitHub
parent a580906a88
commit cc5b346a7c
13 changed files with 159 additions and 51 deletions

5
Cargo.lock generated
View File

@ -2721,6 +2721,7 @@ dependencies = [
name = "libp2p-metrics" name = "libp2p-metrics"
version = "0.13.0" version = "0.13.0"
dependencies = [ dependencies = [
"instant",
"libp2p-core", "libp2p-core",
"libp2p-dcutr", "libp2p-dcutr",
"libp2p-gossipsub", "libp2p-gossipsub",
@ -4000,9 +4001,9 @@ dependencies = [
[[package]] [[package]]
name = "prometheus-client" name = "prometheus-client"
version = "0.21.0" version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38974b1966bd5b6c7c823a20c1e07d5b84b171db20bac601e9b529720f7299f8" checksum = "78c2f43e8969d51935d2a7284878ae053ba30034cd563f673cde37ba5205685e"
dependencies = [ dependencies = [
"dtoa", "dtoa",
"itoa", "itoa",

View File

@ -267,7 +267,7 @@ fn main() -> Result<(), Box<dyn Error>> {
} => { } => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint); info!("Established connection to {:?} via {:?}", peer_id, endpoint);
} }
SwarmEvent::OutgoingConnectionError { peer_id, error } => { SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error); info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
} }
_ => {} _ => {}

View File

@ -329,7 +329,10 @@ impl EventLoop {
} }
} }
SwarmEvent::IncomingConnectionError { .. } => {} SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {peer_id}"), SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} => eprintln!("Dialing {peer_id}"),
e => panic!("{e:?}"), e => panic!("{e:?}"),
} }
} }

View File

@ -22,7 +22,12 @@
- Raise MSRV to 1.65. - Raise MSRV to 1.65.
See [PR 3715]. See [PR 3715].
- Replace `libp2p_swarm_connections_closed` `Counter` with `libp2p_swarm_connections_duration` `Histogram` which additionally tracks the duration of a connection.
Note that you can use the `_count` metric of the `Histogram` as a replacement for the `Counter`.
See [PR 3927].
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3927]: https://github.com/libp2p/rust-libp2p/pull/3927
[PR 3325]: https://github.com/libp2p/rust-libp2p/pull/3325 [PR 3325]: https://github.com/libp2p/rust-libp2p/pull/3325
## 0.12.0 ## 0.12.0

View File

@ -19,6 +19,7 @@ relay = ["libp2p-relay"]
dcutr = ["libp2p-dcutr"] dcutr = ["libp2p-dcutr"]
[dependencies] [dependencies]
instant = "0.1.11"
libp2p-core = { workspace = true } libp2p-core = { workspace = true }
libp2p-dcutr = { workspace = true, optional = true } libp2p-dcutr = { workspace = true, optional = true }
libp2p-identify = { workspace = true, optional = true } libp2p-identify = { workspace = true, optional = true }
@ -27,7 +28,7 @@ libp2p-ping = { workspace = true, optional = true }
libp2p-relay = { workspace = true, optional = true } libp2p-relay = { workspace = true, optional = true }
libp2p-swarm = { workspace = true } libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true } libp2p-identity = { workspace = true }
prometheus-client = { version = "0.21.0" } prometheus-client = { version = "0.21.1"}
once_cell = "1.16.0" once_cell = "1.16.0"
[target.'cfg(not(target_os = "unknown"))'.dependencies] [target.'cfg(not(target_os = "unknown"))'.dependencies]

View File

@ -18,20 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::protocol_stack; use crate::protocol_stack;
use instant::Instant;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family; use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry; use prometheus_client::registry::{Registry, Unit};
pub(crate) struct Metrics { pub(crate) struct Metrics {
connections_incoming: Family<AddressLabels, Counter>, connections_incoming: Family<AddressLabels, Counter>,
connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>, connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
connections_established: Family<ConnectionEstablishedLabels, Counter>, connections_established: Family<ConnectionLabels, Counter>,
connections_establishment_duration: Family<ConnectionEstablishmentDurationLabels, Histogram>, connections_establishment_duration: Family<ConnectionLabels, Histogram>,
connections_closed: Family<ConnectionClosedLabels, Counter>, connections_duration: Family<ConnectionClosedLabels, Histogram>,
new_listen_addr: Family<AddressLabels, Counter>, new_listen_addr: Family<AddressLabels, Counter>,
expired_listen_addr: Family<AddressLabels, Counter>, expired_listen_addr: Family<AddressLabels, Counter>,
@ -41,6 +46,8 @@ pub(crate) struct Metrics {
dial_attempt: Counter, dial_attempt: Counter,
outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>, outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,
connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
} }
impl Metrics { impl Metrics {
@ -110,27 +117,33 @@ impl Metrics {
connections_established.clone(), connections_established.clone(),
); );
let connections_closed = Family::default(); let connections_establishment_duration = {
sub_registry.register( let constructor: fn() -> Histogram =
"connections_closed", || Histogram::new(exponential_buckets(0.01, 1.5, 20));
"Number of connections closed", Family::new_with_constructor(constructor)
connections_closed.clone(), };
);
let connections_establishment_duration = Family::new_with_constructor(
create_connection_establishment_duration_histogram as fn() -> Histogram,
);
sub_registry.register( sub_registry.register(
"connections_establishment_duration", "connections_establishment_duration",
"Time it took (locally) to establish connections", "Time it took (locally) to establish connections",
connections_establishment_duration.clone(), connections_establishment_duration.clone(),
); );
let connections_duration = {
let constructor: fn() -> Histogram =
|| Histogram::new(exponential_buckets(0.01, 3.0, 20));
Family::new_with_constructor(constructor)
};
sub_registry.register_with_unit(
"connections_establishment_duration",
"Time it took (locally) to establish connections",
Unit::Seconds,
connections_establishment_duration.clone(),
);
Self { Self {
connections_incoming, connections_incoming,
connections_incoming_error, connections_incoming_error,
connections_established, connections_established,
connections_closed,
new_listen_addr, new_listen_addr,
expired_listen_addr, expired_listen_addr,
listener_closed, listener_closed,
@ -138,6 +151,8 @@ impl Metrics {
dial_attempt, dial_attempt,
outgoing_connection_error, outgoing_connection_error,
connections_establishment_duration, connections_establishment_duration,
connections_duration,
connections: Default::default(),
} }
} }
} }
@ -149,9 +164,10 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ConnectionEstablished { libp2p_swarm::SwarmEvent::ConnectionEstablished {
endpoint, endpoint,
established_in: time_taken, established_in: time_taken,
connection_id,
.. ..
} => { } => {
let labels = ConnectionEstablishedLabels { let labels = ConnectionLabels {
role: endpoint.into(), role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()), protocols: protocol_stack::as_string(endpoint.get_remote_address()),
}; };
@ -159,14 +175,33 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
self.connections_establishment_duration self.connections_establishment_duration
.get_or_create(&labels) .get_or_create(&labels)
.observe(time_taken.as_secs_f64()); .observe(time_taken.as_secs_f64());
self.connections
.lock()
.expect("lock not to be poisoned")
.insert(*connection_id, Instant::now());
} }
libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => { libp2p_swarm::SwarmEvent::ConnectionClosed {
self.connections_closed endpoint,
.get_or_create(&ConnectionClosedLabels { connection_id,
cause,
..
} => {
let labels = ConnectionClosedLabels {
connection: ConnectionLabels {
role: endpoint.into(), role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()), protocols: protocol_stack::as_string(endpoint.get_remote_address()),
}) },
.inc(); cause: cause.as_ref().map(Into::into),
};
self.connections_duration.get_or_create(&labels).observe(
self.connections
.lock()
.expect("lock not to be poisoned")
.remove(connection_id)
.expect("closed connection to previously be established")
.elapsed()
.as_secs_f64(),
);
} }
libp2p_swarm::SwarmEvent::IncomingConnection { send_back_addr, .. } => { libp2p_swarm::SwarmEvent::IncomingConnection { send_back_addr, .. } => {
self.connections_incoming self.connections_incoming
@ -187,7 +222,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
}) })
.inc(); .inc();
} }
libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id } => { libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
let peer = match peer_id { let peer = match peer_id {
Some(_) => PeerStatus::Known, Some(_) => PeerStatus::Known,
None => PeerStatus::Unknown, None => PeerStatus::Unknown,
@ -261,7 +296,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ListenerError { .. } => { libp2p_swarm::SwarmEvent::ListenerError { .. } => {
self.listener_error.inc(); self.listener_error.inc();
} }
libp2p_swarm::SwarmEvent::Dialing(_) => { libp2p_swarm::SwarmEvent::Dialing { .. } => {
self.dial_attempt.inc(); self.dial_attempt.inc();
} }
} }
@ -269,17 +304,33 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
} }
#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionEstablishedLabels { struct ConnectionLabels {
role: Role, role: Role,
protocols: String, protocols: String,
} }
type ConnectionEstablishmentDurationLabels = ConnectionEstablishedLabels;
#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionClosedLabels { struct ConnectionClosedLabels {
role: Role, cause: Option<ConnectionError>,
protocols: String, #[prometheus(flatten)]
connection: ConnectionLabels,
}
#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum ConnectionError {
Io,
KeepAliveTimeout,
Handler,
}
impl<E> From<&libp2p_swarm::ConnectionError<E>> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError<E>) -> Self {
match value {
libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler,
}
}
} }
#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
@ -359,7 +410,3 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
} }
} }
} }
fn create_connection_establishment_duration_histogram() -> Histogram {
Histogram::new(exponential_buckets(0.01, 1.5, 20))
}

View File

@ -96,6 +96,7 @@ async fn test_dial_back() {
num_established, num_established,
concurrent_dial_errors, concurrent_dial_errors,
established_in: _, established_in: _,
connection_id: _,
} => { } => {
assert_eq!(peer_id, client_id); assert_eq!(peer_id, client_id);
assert_eq!(num_established, NonZeroU32::new(2).unwrap()); assert_eq!(num_established, NonZeroU32::new(2).unwrap());
@ -103,7 +104,10 @@ async fn test_dial_back() {
assert_eq!(address, expect_addr); assert_eq!(address, expect_addr);
break; break;
} }
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id), SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {} SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."), other => panic!("Unexpected swarm event: {other:?}."),
} }
@ -143,12 +147,15 @@ async fn test_dial_error() {
loop { loop {
match server.next_swarm_event().await { match server.next_swarm_event().await {
SwarmEvent::OutgoingConnectionError { peer_id, error } => { SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
assert_eq!(peer_id.unwrap(), client_id); assert_eq!(peer_id.unwrap(), client_id);
assert!(matches!(error, DialError::Transport(_))); assert!(matches!(error, DialError::Transport(_)));
break; break;
} }
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id), SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {} SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."), other => panic!("Unexpected swarm event: {other:?}."),
} }
@ -307,7 +314,10 @@ async fn test_dial_multiple_addr() {
assert_eq!(address, dial_addresses[1]); assert_eq!(address, dial_addresses[1]);
break; break;
} }
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id), SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {} SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."), other => panic!("Unexpected swarm event: {other:?}."),
} }

View File

@ -193,7 +193,10 @@ async fn wait_for_reservation(
break; break;
} }
} }
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{e:?}"), e => panic!("{e:?}"),
} }

View File

@ -86,7 +86,7 @@ async fn main() -> Result<()> {
let server_peer_id = loop { let server_peer_id = loop {
match swarm.next().await.unwrap() { match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id, SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::OutgoingConnectionError { peer_id, error } => { SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
bail!("Outgoing connection error to {:?}: {:?}", peer_id, error); bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
} }
e => panic!("{e:?}"), e => panic!("{e:?}"),
@ -113,7 +113,7 @@ async fn main() -> Result<()> {
} => { } => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint); info!("Established connection to {:?} via {:?}", peer_id, endpoint);
} }
SwarmEvent::OutgoingConnectionError { peer_id, error } => { SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error); info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
} }
SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?, SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?,

View File

@ -53,7 +53,7 @@ async fn perf() {
.wait(|e| match e { .wait(|e| match e {
SwarmEvent::IncomingConnection { .. } => panic!(), SwarmEvent::IncomingConnection { .. } => panic!(),
SwarmEvent::ConnectionEstablished { .. } => None, SwarmEvent::ConnectionEstablished { .. } => None,
SwarmEvent::Dialing(_) => None, SwarmEvent::Dialing { .. } => None,
SwarmEvent::Behaviour(client::Event { result, .. }) => Some(result), SwarmEvent::Behaviour(client::Event { result, .. }) => Some(result),
e => panic!("{e:?}"), e => panic!("{e:?}"),
}) })

View File

@ -222,7 +222,10 @@ async fn connection_established_to(
) { ) {
loop { loop {
match swarm.select_next_some().await { match swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) if peer == other => { SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) if peer == other => {
break break
@ -419,7 +422,10 @@ async fn wait_for_reservation(
async fn wait_for_dial(client: &mut Swarm<Client>, remote: PeerId) -> bool { async fn wait_for_dial(client: &mut Swarm<Client>, remote: PeerId) -> bool {
loop { loop {
match client.select_next_some().await { match client.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == remote => {} SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == remote => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == remote => return true, SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == remote => return true,
SwarmEvent::OutgoingConnectionError { peer_id, .. } if peer_id == Some(remote) => { SwarmEvent::OutgoingConnectionError { peer_id, .. } if peer_id == Some(remote) => {
return false return false

View File

@ -50,6 +50,10 @@
- Remove deprecated `NetworkBehaviourAction` type. - Remove deprecated `NetworkBehaviourAction` type.
See [PR 3919]. See [PR 3919].
- Expose `ConnectionId` on `SwarmEvent::{ConnectionEstablished,ConnectionClosed,IncomingConnection,IncomingConnectionError,OutgoingConnectionError,Dialing}`.
Also emit `SwarmEvent::Dialing` for dials with unknown `PeerId`.
See [PR 3927].
- Rename `ConnectionHandlerEvent::Custom` to `ConnectionHandlerEvent::NotifyBehaviour`. See [PR 3955]. - Rename `ConnectionHandlerEvent::Custom` to `ConnectionHandlerEvent::NotifyBehaviour`. See [PR 3955].
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
@ -64,6 +68,7 @@
[PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 [PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886
[PR 3912]: https://github.com/libp2p/rust-libp2p/pull/3912 [PR 3912]: https://github.com/libp2p/rust-libp2p/pull/3912
[PR 3919]: https://github.com/libp2p/rust-libp2p/pull/3919 [PR 3919]: https://github.com/libp2p/rust-libp2p/pull/3919
[PR 3927]: https://github.com/libp2p/rust-libp2p/pull/3927
[PR 3955]: https://github.com/libp2p/rust-libp2p/pull/3955 [PR 3955]: https://github.com/libp2p/rust-libp2p/pull/3955
## 0.42.2 ## 0.42.2

View File

@ -187,6 +187,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
ConnectionEstablished { ConnectionEstablished {
/// Identity of the peer that we have connected to. /// Identity of the peer that we have connected to.
peer_id: PeerId, peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been opened. /// Endpoint of the connection that has been opened.
endpoint: ConnectedPoint, endpoint: ConnectedPoint,
/// Number of established connections to this peer, including the one that has just been /// Number of established connections to this peer, including the one that has just been
@ -204,6 +206,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
ConnectionClosed { ConnectionClosed {
/// Identity of the peer that we have connected to. /// Identity of the peer that we have connected to.
peer_id: PeerId, peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been closed. /// Endpoint of the connection that has been closed.
endpoint: ConnectedPoint, endpoint: ConnectedPoint,
/// Number of other remaining connections to this same peer. /// Number of other remaining connections to this same peer.
@ -218,6 +222,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
/// generated for this connection. /// generated for this connection.
IncomingConnection { IncomingConnection {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address. /// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr) /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event. /// event.
@ -230,6 +236,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// This can include, for example, an error during the handshake of the encryption layer, or /// This can include, for example, an error during the handshake of the encryption layer, or
/// the connection unexpectedly closed. /// the connection unexpectedly closed.
IncomingConnectionError { IncomingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address. /// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr) /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event. /// event.
@ -241,6 +249,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
}, },
/// An error happened on an outbound connection. /// An error happened on an outbound connection.
OutgoingConnectionError { OutgoingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// If known, [`PeerId`] of the peer we tried to reach. /// If known, [`PeerId`] of the peer we tried to reach.
peer_id: Option<PeerId>, peer_id: Option<PeerId>,
/// Error that has been encountered. /// Error that has been encountered.
@ -286,7 +296,13 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// reported if the dialing attempt succeeds, otherwise a /// reported if the dialing attempt succeeds, otherwise a
/// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
/// is reported. /// is reported.
Dialing(PeerId), Dialing {
/// Identity of the peer that we are connecting to.
peer_id: Option<PeerId>,
/// Identifier of the connection.
connection_id: ConnectionId,
},
} }
impl<TBehaviourOutEvent, THandlerErr> SwarmEvent<TBehaviourOutEvent, THandlerErr> { impl<TBehaviourOutEvent, THandlerErr> SwarmEvent<TBehaviourOutEvent, THandlerErr> {
@ -773,6 +789,7 @@ where
return Some(SwarmEvent::OutgoingConnectionError { return Some(SwarmEvent::OutgoingConnectionError {
peer_id: Some(peer_id), peer_id: Some(peer_id),
connection_id: id,
error: dial_error, error: dial_error,
}); });
} }
@ -801,6 +818,7 @@ where
)); ));
return Some(SwarmEvent::IncomingConnectionError { return Some(SwarmEvent::IncomingConnectionError {
connection_id: id,
send_back_addr, send_back_addr,
local_addr, local_addr,
error: listen_error, error: listen_error,
@ -856,6 +874,7 @@ where
self.supported_protocols = supported_protocols; self.supported_protocols = supported_protocols;
return Some(SwarmEvent::ConnectionEstablished { return Some(SwarmEvent::ConnectionEstablished {
peer_id, peer_id,
connection_id: id,
num_established, num_established,
endpoint, endpoint,
concurrent_dial_errors, concurrent_dial_errors,
@ -884,6 +903,7 @@ where
return Some(SwarmEvent::OutgoingConnectionError { return Some(SwarmEvent::OutgoingConnectionError {
peer_id: peer, peer_id: peer,
connection_id,
error, error,
}); });
} }
@ -904,6 +924,7 @@ where
connection_id: id, connection_id: id,
})); }));
return Some(SwarmEvent::IncomingConnectionError { return Some(SwarmEvent::IncomingConnectionError {
connection_id: id,
local_addr, local_addr,
send_back_addr, send_back_addr,
error, error,
@ -946,6 +967,7 @@ where
})); }));
return Some(SwarmEvent::ConnectionClosed { return Some(SwarmEvent::ConnectionClosed {
peer_id, peer_id,
connection_id: id,
endpoint, endpoint,
cause: error, cause: error,
num_established, num_established,
@ -1008,6 +1030,7 @@ where
})); }));
return Some(SwarmEvent::IncomingConnectionError { return Some(SwarmEvent::IncomingConnectionError {
connection_id,
local_addr, local_addr,
send_back_addr, send_back_addr,
error: listen_error, error: listen_error,
@ -1025,6 +1048,7 @@ where
); );
Some(SwarmEvent::IncomingConnection { Some(SwarmEvent::IncomingConnection {
connection_id,
local_addr, local_addr,
send_back_addr, send_back_addr,
}) })
@ -1111,10 +1135,12 @@ where
ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)), ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)),
ToSwarm::Dial { opts } => { ToSwarm::Dial { opts } => {
let peer_id = opts.get_or_parse_peer_id(); let peer_id = opts.get_or_parse_peer_id();
let connection_id = opts.connection_id();
if let Ok(()) = self.dial(opts) { if let Ok(()) = self.dial(opts) {
if let Ok(Some(peer_id)) = peer_id { return Some(SwarmEvent::Dialing {
return Some(SwarmEvent::Dialing(peer_id)); peer_id: peer_id.ok().flatten(),
} connection_id,
});
} }
} }
ToSwarm::NotifyHandler { ToSwarm::NotifyHandler {
@ -2432,6 +2458,7 @@ mod tests {
peer_id, peer_id,
// multiaddr, // multiaddr,
error: DialError::Transport(errors), error: DialError::Transport(errors),
..
} => { } => {
assert_eq!(target, peer_id.unwrap()); assert_eq!(target, peer_id.unwrap());