mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-13 18:11:22 +00:00
[ping] Refactor the ping protocol for conformity. (#1692)
* Refactor the ping protocol. Such that pings are sent over a single substream, as it is done in other libp2p implementations. Note that, since each peer sends its pings over a single, dedicated substream, every peer that participates in the protocol has effectively two open substreams. * Cleanup * Update ping changelog. * Update protocols/ping/src/protocol.rs Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -20,14 +20,23 @@
|
||||
|
||||
use crate::protocol;
|
||||
use futures::prelude::*;
|
||||
use futures::future::BoxFuture;
|
||||
use libp2p_swarm::{
|
||||
KeepAlive,
|
||||
NegotiatedSubstream,
|
||||
SubstreamProtocol,
|
||||
ProtocolsHandler,
|
||||
ProtocolsHandlerUpgrErr,
|
||||
ProtocolsHandlerEvent
|
||||
};
|
||||
use std::{error::Error, io, fmt, num::NonZeroU32, pin::Pin, task::Context, task::Poll, time::Duration};
|
||||
use std::{
|
||||
error::Error,
|
||||
io,
|
||||
fmt,
|
||||
num::NonZeroU32,
|
||||
task::{Context, Poll},
|
||||
time::Duration
|
||||
};
|
||||
use std::collections::VecDeque;
|
||||
use wasm_timer::Delay;
|
||||
use void::Void;
|
||||
@ -160,13 +169,21 @@ impl Error for PingFailure {
|
||||
pub struct PingHandler {
|
||||
/// Configuration options.
|
||||
config: PingConfig,
|
||||
/// The timer for when to send the next ping.
|
||||
next_ping: Delay,
|
||||
/// The pending results from inbound or outbound pings, ready
|
||||
/// to be `poll()`ed.
|
||||
pending_results: VecDeque<PingResult>,
|
||||
/// The timer used for the delay to the next ping as well as
|
||||
/// the ping timeout.
|
||||
timer: Delay,
|
||||
/// Outbound ping failures that are pending to be processed by `poll()`.
|
||||
pending_errors: VecDeque<PingFailure>,
|
||||
/// The number of consecutive ping failures that occurred.
|
||||
///
|
||||
/// Each successful ping resets this counter to 0.
|
||||
failures: u32,
|
||||
/// The outbound ping state.
|
||||
outbound: Option<PingState>,
|
||||
/// The inbound pong handler, i.e. if there is an inbound
|
||||
/// substream, this is always a future that waits for the
|
||||
/// next inbound ping to be answered.
|
||||
inbound: Option<PongFuture>,
|
||||
}
|
||||
|
||||
impl PingHandler {
|
||||
@ -174,9 +191,11 @@ impl PingHandler {
|
||||
pub fn new(config: PingConfig) -> Self {
|
||||
PingHandler {
|
||||
config,
|
||||
next_ping: Delay::new(Duration::new(0, 0)),
|
||||
pending_results: VecDeque::with_capacity(2),
|
||||
timer: Delay::new(Duration::new(0, 0)),
|
||||
pending_errors: VecDeque::with_capacity(2),
|
||||
failures: 0,
|
||||
outbound: None,
|
||||
inbound: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -193,24 +212,25 @@ impl ProtocolsHandler for PingHandler {
|
||||
SubstreamProtocol::new(protocol::Ping)
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(&mut self, _: ()) {
|
||||
// A ping from a remote peer has been answered.
|
||||
self.pending_results.push_front(Ok(PingSuccess::Pong));
|
||||
fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream) {
|
||||
self.inbound = Some(protocol::recv_ping(stream).boxed());
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(&mut self, rtt: Duration, _info: ()) {
|
||||
// A ping initiated by the local peer was answered by the remote.
|
||||
self.pending_results.push_front(Ok(PingSuccess::Ping { rtt }));
|
||||
fn inject_fully_negotiated_outbound(&mut self, stream: NegotiatedSubstream, (): ()) {
|
||||
self.timer.reset(self.config.timeout);
|
||||
self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed()));
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, _: Void) {}
|
||||
|
||||
fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<io::Error>) {
|
||||
self.pending_results.push_front(
|
||||
Err(match error {
|
||||
fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<Void>) {
|
||||
self.outbound = None; // Request a new substream on the next `poll`.
|
||||
self.pending_errors.push_front(
|
||||
match error {
|
||||
// Note: This timeout only covers protocol negotiation.
|
||||
ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout,
|
||||
e => PingFailure::Other { error: Box::new(e) }
|
||||
}))
|
||||
e => PingFailure::Other { error: Box::new(e) },
|
||||
})
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
@ -222,117 +242,117 @@ impl ProtocolsHandler for PingHandler {
|
||||
}
|
||||
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<protocol::Ping, (), PingResult, Self::Error>> {
|
||||
if let Some(result) = self.pending_results.pop_back() {
|
||||
if let Ok(PingSuccess::Ping { .. }) = result {
|
||||
self.failures = 0;
|
||||
self.next_ping.reset(self.config.interval);
|
||||
// Respond to inbound pings.
|
||||
if let Some(fut) = self.inbound.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Pending => {},
|
||||
Poll::Ready(Err(e)) => {
|
||||
log::debug!("Inbound ping error: {:?}", e);
|
||||
self.inbound = None;
|
||||
}
|
||||
Poll::Ready(Ok(stream)) => {
|
||||
// A ping from a remote peer has been answered, wait for the next.
|
||||
self.inbound = Some(protocol::recv_ping(stream).boxed());
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Pong)))
|
||||
}
|
||||
}
|
||||
if let Err(e) = result {
|
||||
}
|
||||
|
||||
loop {
|
||||
// Check for outbound ping failures.
|
||||
if let Some(error) = self.pending_errors.pop_back() {
|
||||
log::debug!("Ping failure: {:?}", error);
|
||||
|
||||
self.failures += 1;
|
||||
if self.failures >= self.config.max_failures.get() {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(e))
|
||||
} else {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(e)))
|
||||
|
||||
// Note: For backward-compatibility, with configured
|
||||
// `max_failures == 1`, the first failure is always "free"
|
||||
// and silent. This allows peers who still use a new substream
|
||||
// for each ping to have successful ping exchanges with peers
|
||||
// that use a single substream, since every successful ping
|
||||
// resets `failures` to `0`, while at the same time emitting
|
||||
// events only for `max_failures - 1` failures, as before.
|
||||
if self.failures > 1 || self.config.max_failures.get() > 1 {
|
||||
if self.failures >= self.config.max_failures.get() {
|
||||
log::debug!("Too many failures ({}). Closing connection.", self.failures);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(error))
|
||||
}
|
||||
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(error)))
|
||||
}
|
||||
}
|
||||
|
||||
// Continue outbound pings.
|
||||
match self.outbound.take() {
|
||||
Some(PingState::Ping(mut ping)) => match ping.poll_unpin(cx) {
|
||||
Poll::Pending => {
|
||||
if self.timer.poll_unpin(cx).is_ready() {
|
||||
self.pending_errors.push_front(PingFailure::Timeout);
|
||||
} else {
|
||||
self.outbound = Some(PingState::Ping(ping));
|
||||
break
|
||||
}
|
||||
},
|
||||
Poll::Ready(Ok((stream, rtt))) => {
|
||||
self.failures = 0;
|
||||
self.timer.reset(self.config.interval);
|
||||
self.outbound = Some(PingState::Idle(stream));
|
||||
return Poll::Ready(
|
||||
ProtocolsHandlerEvent::Custom(
|
||||
Ok(PingSuccess::Ping { rtt })))
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
self.pending_errors.push_front(PingFailure::Other {
|
||||
error: Box::new(e)
|
||||
});
|
||||
}
|
||||
},
|
||||
Some(PingState::Idle(stream)) => match self.timer.poll_unpin(cx) {
|
||||
Poll::Pending => {
|
||||
self.outbound = Some(PingState::Idle(stream));
|
||||
break
|
||||
},
|
||||
Poll::Ready(Ok(())) => {
|
||||
self.timer.reset(self.config.timeout);
|
||||
self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed()));
|
||||
},
|
||||
Poll::Ready(Err(e)) => {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(
|
||||
PingFailure::Other {
|
||||
error: Box::new(e)
|
||||
}))
|
||||
}
|
||||
}
|
||||
Some(PingState::OpenStream) => {
|
||||
self.outbound = Some(PingState::OpenStream);
|
||||
break
|
||||
}
|
||||
None => {
|
||||
self.outbound = Some(PingState::OpenStream);
|
||||
let protocol = SubstreamProtocol::new(protocol::Ping)
|
||||
.with_timeout(self.config.timeout);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol,
|
||||
info: (),
|
||||
})
|
||||
}
|
||||
}
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(result))
|
||||
}
|
||||
|
||||
match Future::poll(Pin::new(&mut self.next_ping), cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
self.next_ping.reset(self.config.timeout);
|
||||
let protocol = SubstreamProtocol::new(protocol::Ping)
|
||||
.with_timeout(self.config.timeout);
|
||||
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol,
|
||||
info: (),
|
||||
})
|
||||
},
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) =>
|
||||
Poll::Ready(ProtocolsHandlerEvent::Close(PingFailure::Other { error: Box::new(e) }))
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>;
|
||||
type PongFuture = BoxFuture<'static, Result<NegotiatedSubstream, io::Error>>;
|
||||
|
||||
use futures::future;
|
||||
use quickcheck::*;
|
||||
use rand::Rng;
|
||||
|
||||
impl Arbitrary for PingConfig {
|
||||
fn arbitrary<G: Gen>(g: &mut G) -> PingConfig {
|
||||
PingConfig::new()
|
||||
.with_timeout(Duration::from_secs(g.gen_range(0, 3600)))
|
||||
.with_interval(Duration::from_secs(g.gen_range(0, 3600)))
|
||||
.with_max_failures(NonZeroU32::new(g.gen_range(1, 100)).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
fn tick(h: &mut PingHandler)
|
||||
-> ProtocolsHandlerEvent<protocol::Ping, (), PingResult, PingFailure>
|
||||
{
|
||||
async_std::task::block_on(future::poll_fn(|cx| h.poll(cx) ))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ping_interval() {
|
||||
fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool {
|
||||
let mut h = PingHandler::new(cfg);
|
||||
|
||||
// Send ping
|
||||
match tick(&mut h) {
|
||||
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ } => {
|
||||
// The handler must use the configured timeout.
|
||||
assert_eq!(protocol.timeout(), &h.config.timeout);
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
|
||||
// Receive pong
|
||||
h.inject_fully_negotiated_outbound(ping_rtt, ());
|
||||
match tick(&mut h) {
|
||||
ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt })) => {
|
||||
// The handler must report the given RTT.
|
||||
assert_eq!(rtt, ping_rtt);
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
quickcheck(prop as fn(_,_) -> _);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn max_failures() {
|
||||
let cfg = PingConfig::arbitrary(&mut StdGen::new(rand::thread_rng(), 100));
|
||||
let mut h = PingHandler::new(cfg);
|
||||
for _ in 0 .. h.config.max_failures.get() - 1 {
|
||||
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
|
||||
match tick(&mut h) {
|
||||
ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout)) => {}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
}
|
||||
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
|
||||
match tick(&mut h) {
|
||||
ProtocolsHandlerEvent::Close(PingFailure::Timeout) => {
|
||||
assert_eq!(h.failures, h.config.max_failures.get());
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
h.inject_fully_negotiated_outbound(Duration::from_secs(1), ());
|
||||
match tick(&mut h) {
|
||||
ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. })) => {
|
||||
// A success resets the counter for consecutive failures.
|
||||
assert_eq!(h.failures, 0);
|
||||
}
|
||||
e => panic!("Unexpected event: {:?}", e)
|
||||
}
|
||||
}
|
||||
/// The current state w.r.t. outbound pings.
|
||||
enum PingState {
|
||||
/// A new substream is being negotiated for the ping protocol.
|
||||
OpenStream,
|
||||
/// The substream is idle, waiting to send the next ping.
|
||||
Idle(NegotiatedSubstream),
|
||||
/// A ping is being sent and the response awaited.
|
||||
Ping(PingFuture),
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user