time to establish connection (#3134)

Implementing #2745 , adding  a metric to break down time from connection pending to connection established, per protocol stack.

````
$curl -s http://127.0.0.1:42183/metrics | grep nt_duration
# HELP libp2p_swarm_connection_establishment_duration Time it took (locally) to finish establishing connections.
# TYPE libp2p_swarm_connection_establishment_duration histogram
libp2p_swarm_connection_establishment_duration_sum{role="Listener",protocols="/ip4/tcp"} 0.007
libp2p_swarm_connection_establishment_duration_count{role="Listener",protocols="/ip4/tcp"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.001"} 0
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.002"} 0
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.004"} 0
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.008"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.016"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.032"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.064"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.128"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.256"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="0.512"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Listener",protocols="/ip4/tcp",le="+Inf"} 1

lbl@chomp:~lbl
$curl -s http://127.0.0.1:34283/metrics | grep nt_duration
# HELP libp2p_swarm_connection_establishment_duration Time it took (locally) to finish establishing connections.
# TYPE libp2p_swarm_connection_establishment_duration histogram
libp2p_swarm_connection_establishment_duration_sum{role="Dialer",protocols="/ip4/tcp"} 0.009
libp2p_swarm_connection_establishment_duration_count{role="Dialer",protocols="/ip4/tcp"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.001"} 0
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.002"} 0
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.004"} 0
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.008"} 0
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.016"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.032"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.064"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.128"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.256"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="0.512"} 1
libp2p_swarm_connection_establishment_duration_bucket{role="Dialer",protocols="/ip4/tcp",le="+Inf"} 1
````
This commit is contained in:
John Turpish
2022-12-12 09:40:36 -05:00
committed by GitHub
parent 63ffc7fb0f
commit f8f19baad0
6 changed files with 58 additions and 10 deletions

View File

@ -1,5 +1,7 @@
# 0.12.0 [unreleased] # 0.12.0 [unreleased]
- Add `connections_establishment_duration` metric. See [PR 3134].
- Update to `libp2p-dcutr` `v0.9.0`. - Update to `libp2p-dcutr` `v0.9.0`.
- Update to `libp2p-ping` `v0.42.0`. - Update to `libp2p-ping` `v0.42.0`.
@ -12,6 +14,8 @@
- Update to `libp2p-swarm` `v0.42.0`. - Update to `libp2p-swarm` `v0.42.0`.
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134/
# 0.11.0 # 0.11.0
- Update to `libp2p-dcutr` `v0.8.0`. - Update to `libp2p-dcutr` `v0.8.0`.

View File

@ -20,8 +20,11 @@
use crate::protocol_stack; use crate::protocol_stack;
use prometheus_client::encoding::text::Encode; use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::{
use prometheus_client::metrics::family::Family; counter::Counter,
family::Family,
histogram::{exponential_buckets, Histogram},
};
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
pub struct Metrics { pub struct Metrics {
@ -29,6 +32,7 @@ pub struct Metrics {
connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>, connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
connections_established: Family<ConnectionEstablishedLabels, Counter>, connections_established: Family<ConnectionEstablishedLabels, Counter>,
connections_establishment_duration: Family<ConnectionEstablishmentDurationLabels, Histogram>,
connections_closed: Family<ConnectionClosedLabels, Counter>, connections_closed: Family<ConnectionClosedLabels, Counter>,
new_listen_addr: Family<AddressLabels, Counter>, new_listen_addr: Family<AddressLabels, Counter>,
@ -123,6 +127,15 @@ impl Metrics {
Box::new(connections_closed.clone()), Box::new(connections_closed.clone()),
); );
let connections_establishment_duration = Family::new_with_constructor(
create_connection_establishment_duration_histogram as fn() -> Histogram,
);
sub_registry.register(
"connections_establishment_duration",
"Time it took (locally) to establish connections",
Box::new(connections_establishment_duration.clone()),
);
Self { Self {
connections_incoming, connections_incoming,
connections_incoming_error, connections_incoming_error,
@ -135,6 +148,7 @@ impl Metrics {
dial_attempt, dial_attempt,
outgoing_connection_error, outgoing_connection_error,
connected_to_banned_peer, connected_to_banned_peer,
connections_establishment_duration,
} }
} }
} }
@ -143,13 +157,19 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) { fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
match event { match event {
libp2p_swarm::SwarmEvent::Behaviour(_) => {} libp2p_swarm::SwarmEvent::Behaviour(_) => {}
libp2p_swarm::SwarmEvent::ConnectionEstablished { endpoint, .. } => { libp2p_swarm::SwarmEvent::ConnectionEstablished {
self.connections_established endpoint,
.get_or_create(&ConnectionEstablishedLabels { established_in: time_taken,
role: endpoint.into(), ..
protocols: protocol_stack::as_string(endpoint.get_remote_address()), } => {
}) let labels = ConnectionEstablishedLabels {
.inc(); role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
};
self.connections_established.get_or_create(&labels).inc();
self.connections_establishment_duration
.get_or_create(&labels)
.observe(time_taken.as_secs_f64());
} }
libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => { libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => {
self.connections_closed self.connections_closed
@ -279,6 +299,8 @@ struct ConnectionEstablishedLabels {
protocols: String, protocols: String,
} }
type ConnectionEstablishmentDurationLabels = ConnectionEstablishedLabels;
#[derive(Encode, Hash, Clone, Eq, PartialEq)] #[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct ConnectionClosedLabels { struct ConnectionClosedLabels {
role: Role, role: Role,
@ -372,3 +394,7 @@ impl<TTransErr> From<&libp2p_swarm::PendingInboundConnectionError<TTransErr>>
} }
} }
} }
fn create_connection_establishment_duration_histogram() -> Histogram {
Histogram::new(exponential_buckets(1e-3, 2., 10))
}

View File

@ -195,6 +195,7 @@ async fn test_dial_back() {
}, },
num_established, num_established,
concurrent_dial_errors, concurrent_dial_errors,
established_in: _,
} => { } => {
assert_eq!(peer_id, client_id); assert_eq!(peer_id, client_id);
assert_eq!(num_established, NonZeroU32::new(2).unwrap()); assert_eq!(num_established, NonZeroU32::new(2).unwrap());

View File

@ -2,7 +2,10 @@
- Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170]. - Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170].
- Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134].
[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
# 0.41.1 # 0.41.1

View File

@ -38,6 +38,7 @@ use futures::{
ready, ready,
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use instant::Instant;
use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint};
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
use std::{ use std::{
@ -195,6 +196,8 @@ struct PendingConnection<THandler> {
endpoint: PendingPoint, endpoint: PendingPoint,
/// When dropped, notifies the task which then knows to terminate. /// When dropped, notifies the task which then knows to terminate.
abort_notifier: Option<oneshot::Sender<Void>>, abort_notifier: Option<oneshot::Sender<Void>>,
/// The moment we became aware of this possible connection, useful for timing metrics.
accepted_at: Instant,
} }
impl<THandler> PendingConnection<THandler> { impl<THandler> PendingConnection<THandler> {
@ -237,6 +240,8 @@ where
/// Addresses are dialed in parallel. Contains the addresses and errors /// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial. /// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<TTrans::Error>)>>, concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<TTrans::Error>)>>,
/// How long it took to establish this connection.
established_in: std::time::Duration,
}, },
/// An established connection was closed. /// An established connection was closed.
@ -493,6 +498,7 @@ where
handler, handler,
endpoint, endpoint,
abort_notifier: Some(abort_notifier), abort_notifier: Some(abort_notifier),
accepted_at: Instant::now(),
}, },
); );
Ok(connection_id) Ok(connection_id)
@ -540,6 +546,7 @@ where
handler, handler,
endpoint: endpoint.into(), endpoint: endpoint.into(),
abort_notifier: Some(abort_notifier), abort_notifier: Some(abort_notifier),
accepted_at: Instant::now(),
}, },
); );
Ok(connection_id) Ok(connection_id)
@ -634,6 +641,7 @@ where
handler, handler,
endpoint, endpoint,
abort_notifier: _, abort_notifier: _,
accepted_at,
} = self } = self
.pending .pending
.remove(&id) .remove(&id)
@ -783,13 +791,14 @@ where
) )
.boxed(), .boxed(),
); );
let established_in = accepted_at.elapsed();
return Poll::Ready(PoolEvent::ConnectionEstablished { return Poll::Ready(PoolEvent::ConnectionEstablished {
peer_id: obtained_peer_id, peer_id: obtained_peer_id,
endpoint, endpoint,
id, id,
other_established_connection_ids, other_established_connection_ids,
concurrent_dial_errors, concurrent_dial_errors,
established_in,
}); });
} }
task::PendingConnectionEvent::PendingFailed { id, error } => { task::PendingConnectionEvent::PendingFailed { id, error } => {
@ -798,6 +807,7 @@ where
handler, handler,
endpoint, endpoint,
abort_notifier: _, abort_notifier: _,
accepted_at: _, // Ignoring the time it took for the connection to fail.
}) = self.pending.remove(&id) }) = self.pending.remove(&id)
{ {
self.counters.dec_pending(&endpoint); self.counters.dec_pending(&endpoint);

View File

@ -191,6 +191,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Addresses are dialed concurrently. Contains the addresses and errors /// Addresses are dialed concurrently. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial. /// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>, concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
/// How long it took to establish this connection
established_in: std::time::Duration,
}, },
/// A connection with the given peer has been closed, /// A connection with the given peer has been closed,
/// possibly as a result of an error. /// possibly as a result of an error.
@ -808,6 +810,7 @@ where
endpoint, endpoint,
other_established_connection_ids, other_established_connection_ids,
concurrent_dial_errors, concurrent_dial_errors,
established_in,
} => { } => {
if self.banned_peers.contains(&peer_id) { if self.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any // Mark the connection for the banned peer as banned, thus withholding any
@ -848,6 +851,7 @@ where
num_established, num_established,
endpoint, endpoint,
concurrent_dial_errors, concurrent_dial_errors,
established_in,
}); });
} }
} }