mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-18 20:41:25 +00:00
Remove libp2p-ping keep-alive functionality. (#1067)
* Fix connection & handler shutdown when using `KeepAlive::Now`. Delay::new(Instant::now()) is never immediately ready, resulting in `KeepAlive::Now` to have no effect, since the delay is re-created on every execution of `poll()` in the `NodeHandlerWrapper`. It can also send the node handler into a busy-loop, since every newly created Delay will trigger a task wakeup, which creates a new Delay with Instant::now(), and so forth. The use of `Delay::new(Instant::now())` for "immediate" connection shutdown is therefore removed here entirely. An important assumption is thereby that as long as the node handler non-empty `negotiating_in` and `negotiating_out`, the handler is not dependent on such a Delay for task wakeup. * Correction to the libp2p-ping connection timeout. The current connection timeout is always short of one `interval`, because the "countdown" begins with the last received or sent pong (depending on the policy). In effect, the current default config has a connection timeout of 5 seconds (20 - 15) from the point when a ping is sent. Instead, the "countdown" of the connection timeout should always begin with the next scheduled ping. That also makes all configurations valid, avoiding pitfalls. The important properties of the ping handler are now checked to hold for all configurations, in particular: * The next ping must be scheduled no earlier than the ping interval and no later than the connection timeout. * The "countdown" for the connection timeout starts on the next ping, i.e. the full connection timeout remains at the instant when the next ping is sent. * Do not keep connections alive. The ping protocol is not supposed to keep otherwise idle connections alive, only to add an additional condition for terminating them in the form of a configurable number of consecutive failed ping requests. In this context, the `PingPolicy` does not seem useful any longer.
This commit is contained in:
@ -27,7 +27,7 @@ use libp2p_core::protocols_handler::{
|
||||
ProtocolsHandler,
|
||||
ProtocolsHandlerUpgrErr,
|
||||
};
|
||||
use std::{io, num::NonZeroU32, time::{Duration, Instant}};
|
||||
use std::{error::Error, io, fmt, num::NonZeroU32, time::{Duration, Instant}};
|
||||
use std::collections::VecDeque;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::Delay;
|
||||
@ -44,9 +44,7 @@ pub struct PingConfig {
|
||||
/// The maximum number of failed outbound pings before the associated
|
||||
/// connection is deemed unhealthy, indicating to the `Swarm` that it
|
||||
/// should be closed.
|
||||
max_timeouts: NonZeroU32,
|
||||
/// The policy for outbound pings.
|
||||
policy: PingPolicy
|
||||
max_failures: NonZeroU32,
|
||||
}
|
||||
|
||||
impl PingConfig {
|
||||
@ -54,36 +52,20 @@ impl PingConfig {
|
||||
///
|
||||
/// * [`PingConfig::with_interval`] 15s
|
||||
/// * [`PingConfig::with_timeout`] 20s
|
||||
/// * [`PingConfig::with_max_timeouts`] 1
|
||||
/// * [`PingConfig::with_policy`] [`PingPolicy::Always`]
|
||||
/// * [`PingConfig::with_max_failures`] 1
|
||||
///
|
||||
/// These settings have the following effect:
|
||||
///
|
||||
/// * A ping is sent every 15 seconds on a healthy connection.
|
||||
/// * Every ping sent must yield a response within 20 seconds in order to
|
||||
/// be successful.
|
||||
/// * The duration of a single ping timeout without sending or receiving
|
||||
/// a pong is sufficient for the connection to be subject to being closed,
|
||||
/// i.e. the connection timeout is reset to 20 seconds from the current
|
||||
/// [`Instant`] upon a sent or received pong.
|
||||
///
|
||||
/// In general, every successfully sent or received pong resets the connection
|
||||
/// timeout, which is defined by
|
||||
/// ```raw
|
||||
/// max_timeouts * timeout
|
||||
/// ```raw
|
||||
/// relative to the current [`Instant`].
|
||||
///
|
||||
/// A sensible configuration should thus obey the invariant:
|
||||
/// ```raw
|
||||
/// max_timeouts * timeout > interval
|
||||
/// ```
|
||||
/// * A single ping failure is sufficient for the connection to be subject
|
||||
/// to being closed.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
timeout: Duration::from_secs(20),
|
||||
interval: Duration::from_secs(15),
|
||||
max_timeouts: NonZeroU32::new(1).expect("1 != 0"),
|
||||
policy: PingPolicy::Always
|
||||
max_failures: NonZeroU32::new(1).expect("1 != 0"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -99,49 +81,12 @@ impl PingConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the number of successive ping timeouts upon which the remote
|
||||
/// Sets the maximum number of consecutive ping failures upon which the remote
|
||||
/// peer is considered unreachable and the connection closed.
|
||||
///
|
||||
/// > **Note**: Successful inbound pings from the remote peer can keep
|
||||
/// > the connection alive, even if outbound pings fail. I.e.
|
||||
/// > the connection is closed after `ping_timeout * max_timeouts`
|
||||
/// > only if in addition to failing outbound pings, no ping from
|
||||
/// > the remote is received within that time window.
|
||||
pub fn with_max_timeouts(mut self, n: NonZeroU32) -> Self {
|
||||
self.max_timeouts = n;
|
||||
pub fn with_max_failures(mut self, n: NonZeroU32) -> Self {
|
||||
self.max_failures = n;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the [`PingPolicy`] to use for outbound pings.
|
||||
pub fn with_policy(mut self, p: PingPolicy) -> Self {
|
||||
self.policy = p;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// The `PingPolicy` determines under what conditions an outbound ping
|
||||
/// is sent w.r.t. inbound pings.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum PingPolicy {
|
||||
/// Always send a ping in the configured interval, regardless
|
||||
/// of received pings.
|
||||
///
|
||||
/// This policy is appropriate e.g. if continuous measurement of
|
||||
/// the RTT to the remote is desired.
|
||||
Always,
|
||||
/// Only sent a ping as necessary to keep the connection alive.
|
||||
///
|
||||
/// This policy resets the local ping timer whenever an inbound ping
|
||||
/// is received, effectively letting the peer with the lower ping
|
||||
/// frequency drive the ping protocol. Hence, to avoid superfluous ping
|
||||
/// traffic, the ping interval of the peers should ideally not be the
|
||||
/// same when using this policy, e.g. through randomization.
|
||||
///
|
||||
/// This policy is appropriate if the ping protocol is only used
|
||||
/// as an application-layer connection keep-alive, without a need
|
||||
/// for measuring the round-trip times on both peers, as it tries
|
||||
/// to keep the ping traffic to a minimum.
|
||||
KeepAlive
|
||||
}
|
||||
|
||||
/// The result of an inbound or outbound ping.
|
||||
@ -168,6 +113,24 @@ pub enum PingFailure {
|
||||
Other { error: Box<dyn std::error::Error + Send + 'static> }
|
||||
}
|
||||
|
||||
impl fmt::Display for PingFailure {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
PingFailure::Timeout => f.write_str("Ping timeout"),
|
||||
PingFailure::Other { error } => write!(f, "Ping error: {}", error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for PingFailure {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
match self {
|
||||
PingFailure::Timeout => None,
|
||||
PingFailure::Other { error } => Some(&**error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Protocol handler that handles pinging the remote at a regular period
|
||||
/// and answering ping queries.
|
||||
///
|
||||
@ -177,33 +140,22 @@ pub struct PingHandler<TSubstream> {
|
||||
config: PingConfig,
|
||||
/// The timer for when to send the next ping.
|
||||
next_ping: Delay,
|
||||
/// The connection timeout.
|
||||
connection_timeout: Duration,
|
||||
/// The current keep-alive, i.e. until when the connection that
|
||||
/// the handler operates on should be kept alive.
|
||||
connection_keepalive: KeepAlive,
|
||||
/// The pending results from inbound or outbound pings, ready
|
||||
/// to be `poll()`ed.
|
||||
pending_results: VecDeque<(Instant, PingResult)>,
|
||||
pending_results: VecDeque<PingResult>,
|
||||
/// The number of consecutive ping failures that occurred.
|
||||
failures: u32,
|
||||
_marker: std::marker::PhantomData<TSubstream>
|
||||
}
|
||||
|
||||
impl<TSubstream> PingHandler<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
impl<TSubstream> PingHandler<TSubstream> {
|
||||
/// Builds a new `PingHandler` with the given configuration.
|
||||
pub fn new(config: PingConfig) -> Self {
|
||||
let now = Instant::now();
|
||||
let connection_timeout = config.timeout * config.max_timeouts.get();
|
||||
let connection_keepalive = KeepAlive::Until(now + connection_timeout);
|
||||
let next_ping = Delay::new(now);
|
||||
PingHandler {
|
||||
config,
|
||||
next_ping,
|
||||
connection_timeout,
|
||||
connection_keepalive,
|
||||
next_ping: Delay::new(Instant::now()),
|
||||
pending_results: VecDeque::with_capacity(2),
|
||||
failures: 0,
|
||||
_marker: std::marker::PhantomData
|
||||
}
|
||||
}
|
||||
@ -215,7 +167,7 @@ where
|
||||
{
|
||||
type InEvent = Void;
|
||||
type OutEvent = PingResult;
|
||||
type Error = ProtocolsHandlerUpgrErr<io::Error>;
|
||||
type Error = PingFailure;
|
||||
type Substream = TSubstream;
|
||||
type InboundProtocol = protocol::Ping;
|
||||
type OutboundProtocol = protocol::Ping;
|
||||
@ -227,39 +179,45 @@ where
|
||||
|
||||
fn inject_fully_negotiated_inbound(&mut self, _: ()) {
|
||||
// A ping from a remote peer has been answered.
|
||||
self.pending_results.push_front((Instant::now(), Ok(PingSuccess::Pong)));
|
||||
self.pending_results.push_front(Ok(PingSuccess::Pong));
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(&mut self, rtt: Duration, _info: ()) {
|
||||
// A ping initiated by the local peer was answered by the remote.
|
||||
self.pending_results.push_front((Instant::now(), Ok(PingSuccess::Ping { rtt })));
|
||||
self.pending_results.push_front(Ok(PingSuccess::Ping { rtt }));
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, _: Void) {}
|
||||
|
||||
fn inject_dial_upgrade_error(&mut self, _info: (), error: Self::Error) {
|
||||
fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<io::Error>) {
|
||||
self.pending_results.push_front(
|
||||
(Instant::now(), Err(match error {
|
||||
Err(match error {
|
||||
ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout,
|
||||
e => PingFailure::Other { error: Box::new(e) }
|
||||
})))
|
||||
}))
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.connection_keepalive
|
||||
// Returning `Now` indicates that, as far as this handler is concerned,
|
||||
// the connection may be closed. I.e. the ping handler does not keep
|
||||
// the connection alive, it merely adds another condition (failed pings)
|
||||
// for terminating it.
|
||||
KeepAlive::Now
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<protocol::Ping, (), PingResult>, Self::Error> {
|
||||
if let Some((instant, result)) = self.pending_results.pop_back() {
|
||||
if result.is_ok() {
|
||||
self.connection_keepalive = KeepAlive::Until(instant + self.connection_timeout);
|
||||
let reset = match result {
|
||||
Ok(PingSuccess::Ping { .. }) => true,
|
||||
Ok(PingSuccess::Pong) => self.config.policy == PingPolicy::KeepAlive,
|
||||
_ => false
|
||||
};
|
||||
if reset {
|
||||
self.next_ping.reset(instant + self.config.interval);
|
||||
if let Some(result) = self.pending_results.pop_back() {
|
||||
if let Ok(PingSuccess::Ping { .. }) = result {
|
||||
let next_ping = Instant::now() + self.config.interval;
|
||||
self.failures = 0;
|
||||
self.next_ping.reset(next_ping);
|
||||
}
|
||||
if let Err(e) = result {
|
||||
self.failures += 1;
|
||||
if self.failures >= self.config.max_failures.get() {
|
||||
return Err(e)
|
||||
} else {
|
||||
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(Err(e))))
|
||||
}
|
||||
}
|
||||
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(result)))
|
||||
@ -276,7 +234,101 @@ where
|
||||
}))
|
||||
},
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(_) => Err(ProtocolsHandlerUpgrErr::Timer)
|
||||
Err(e) => Err(PingFailure::Other { error: Box::new(e) })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use futures::future;
|
||||
use quickcheck::*;
|
||||
use rand::Rng;
|
||||
use tokio_tcp::TcpStream;
|
||||
use tokio::runtime::current_thread::Runtime;
|
||||
|
||||
impl Arbitrary for PingConfig {
|
||||
fn arbitrary<G: Gen>(g: &mut G) -> PingConfig {
|
||||
PingConfig::new()
|
||||
.with_timeout(Duration::from_secs(g.gen_range(0, 3600)))
|
||||
.with_interval(Duration::from_secs(g.gen_range(0, 3600)))
|
||||
.with_max_failures(NonZeroU32::new(g.gen_range(1, 100)).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
fn tick(h: &mut PingHandler<TcpStream>) -> Result<
|
||||
ProtocolsHandlerEvent<protocol::Ping, (), PingResult>,
|
||||
PingFailure
|
||||
> {
|
||||
Runtime::new().unwrap().block_on(future::poll_fn(|| h.poll() ))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ping_interval() {
|
||||
fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool {
|
||||
let mut h = PingHandler::<TcpStream>::new(cfg);
|
||||
|
||||
// The first ping is scheduled "immediately".
|
||||
let start = h.next_ping.deadline();
|
||||
assert!(start <= Instant::now());
|
||||
|
||||
// Send ping
|
||||
match tick(&mut h) {
|
||||
Ok(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ }) => {
|
||||
// The handler must use the configured timeout.
|
||||
assert_eq!(protocol.timeout(), &h.config.timeout);
|
||||
// The next ping must be scheduled no earlier than the ping timeout.
|
||||
assert!(h.next_ping.deadline() >= start + h.config.timeout);
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
|
||||
// Receive pong
|
||||
h.inject_fully_negotiated_outbound(ping_rtt, ());
|
||||
match tick(&mut h) {
|
||||
Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt }))) => {
|
||||
// The handler must report the given RTT.
|
||||
assert_eq!(rtt, ping_rtt);
|
||||
// The next ping must be scheduled no earlier than the ping interval.
|
||||
assert!(now + h.config.interval <= h.next_ping.deadline());
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
quickcheck(prop as fn(_,_) -> _);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn max_failures() {
|
||||
let cfg = PingConfig::arbitrary(&mut StdGen::new(rand::thread_rng(), 100));
|
||||
let mut h = PingHandler::<TcpStream>::new(cfg);
|
||||
for _ in 0 .. h.config.max_failures.get() - 1 {
|
||||
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
|
||||
match tick(&mut h) {
|
||||
Ok(ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout))) => {}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
}
|
||||
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
|
||||
match tick(&mut h) {
|
||||
Err(PingFailure::Timeout) => {
|
||||
assert_eq!(h.failures, h.config.max_failures.get());
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
h.inject_fully_negotiated_outbound(Duration::from_secs(1), ());
|
||||
match tick(&mut h) {
|
||||
Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. }))) => {
|
||||
// A success resets the counter for consecutive failures.
|
||||
assert_eq!(h.failures, 0);
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user