// Copyright 2019 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::{protocol, PROTOCOL_NAME}; use futures::future::{BoxFuture, Either}; use futures::prelude::*; use futures_timer::Delay; use libp2p_core::upgrade::ReadyUpgrade; use libp2p_identity::PeerId; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; use std::{ error::Error, fmt, io, task::{Context, Poll}, time::Duration, }; use void::Void; /// The configuration for outbound pings. #[derive(Debug, Clone)] pub struct Config { /// The timeout of an outbound ping. timeout: Duration, /// The duration between outbound pings. interval: Duration, } impl Config { /// Creates a new [`Config`] with the following default settings: /// /// * [`Config::with_interval`] 15s /// * [`Config::with_timeout`] 20s /// /// These settings have the following effect: /// /// * A ping is sent every 15 seconds on a healthy connection. /// * Every ping sent must yield a response within 20 seconds in order to /// be successful. pub fn new() -> Self { Self { timeout: Duration::from_secs(20), interval: Duration::from_secs(15), } } /// Sets the ping timeout. pub fn with_timeout(mut self, d: Duration) -> Self { self.timeout = d; self } /// Sets the ping interval. pub fn with_interval(mut self, d: Duration) -> Self { self.interval = d; self } } impl Default for Config { fn default() -> Self { Self::new() } } /// An outbound ping failure. #[derive(Debug)] pub enum Failure { /// The ping timed out, i.e. no response was received within the /// configured ping timeout. Timeout, /// The peer does not support the ping protocol. Unsupported, /// The ping failed for reasons other than a timeout. Other { error: Box, }, } impl Failure { fn other(e: impl std::error::Error + Send + 'static) -> Self { Self::Other { error: Box::new(e) } } } impl fmt::Display for Failure { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Failure::Timeout => f.write_str("Ping timeout"), Failure::Other { error } => write!(f, "Ping error: {error}"), Failure::Unsupported => write!(f, "Ping protocol not supported"), } } } impl Error for Failure { fn source(&self) -> Option<&(dyn Error + 'static)> { match self { Failure::Timeout => None, Failure::Other { error } => Some(&**error), Failure::Unsupported => None, } } } /// Protocol handler that handles pinging the remote at a regular period /// and answering ping queries. pub struct Handler { /// Configuration options. config: Config, /// The timer used for the delay to the next ping. interval: Delay, /// Outbound ping failures that are pending to be processed by `poll()`. pending_errors: VecDeque, /// The number of consecutive ping failures that occurred. /// /// Each successful ping resets this counter to 0. failures: u32, /// The outbound ping state. outbound: Option, /// The inbound pong handler, i.e. if there is an inbound /// substream, this is always a future that waits for the /// next inbound ping to be answered. inbound: Option, /// Tracks the state of our handler. state: State, /// The peer we are connected to. peer: PeerId, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum State { /// We are inactive because the other peer doesn't support ping. Inactive { /// Whether or not we've reported the missing support yet. /// /// This is used to avoid repeated events being emitted for a specific connection. reported: bool, }, /// We are actively pinging the other peer. Active, } impl Handler { /// Builds a new [`Handler`] with the given configuration. pub fn new(config: Config, peer: PeerId) -> Self { Handler { peer, config, interval: Delay::new(Duration::new(0, 0)), pending_errors: VecDeque::with_capacity(2), failures: 0, outbound: None, inbound: None, state: State::Active, } } fn on_dial_upgrade_error( &mut self, DialUpgradeError { error, .. }: DialUpgradeError< ::OutboundOpenInfo, ::OutboundProtocol, >, ) { self.outbound = None; // Request a new substream on the next `poll`. let error = match error { StreamUpgradeError::NegotiationFailed => { debug_assert_eq!(self.state, State::Active); self.state = State::Inactive { reported: false }; return; } // Note: This timeout only covers protocol negotiation. StreamUpgradeError::Timeout => Failure::Other { error: Box::new(std::io::Error::new( std::io::ErrorKind::TimedOut, "ping protocol negotiation timed out", )), }, StreamUpgradeError::Apply(e) => void::unreachable(e), StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) }, }; self.pending_errors.push_front(error); } } impl ConnectionHandler for Handler { type FromBehaviour = Void; type ToBehaviour = Result; type Error = Void; type InboundProtocol = ReadyUpgrade; type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol, ()> { SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) } fn on_behaviour_event(&mut self, _: Void) {} fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No } fn poll( &mut self, cx: &mut Context<'_>, ) -> Poll< ConnectionHandlerEvent< ReadyUpgrade, (), Result, Self::Error, >, > { match self.state { State::Inactive { reported: true } => { return Poll::Pending; // nothing to do on this connection } State::Inactive { reported: false } => { self.state = State::Inactive { reported: true }; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( Failure::Unsupported, ))); } State::Active => {} } // Respond to inbound pings. if let Some(fut) = self.inbound.as_mut() { match fut.poll_unpin(cx) { Poll::Pending => {} Poll::Ready(Err(e)) => { log::debug!("Inbound ping error: {:?}", e); self.inbound = None; } Poll::Ready(Ok(stream)) => { log::trace!("answered inbound ping from {}", self.peer); // A ping from a remote peer has been answered, wait for the next. self.inbound = Some(protocol::recv_ping(stream).boxed()); } } } loop { // Check for outbound ping failures. if let Some(error) = self.pending_errors.pop_back() { log::debug!("Ping failure: {:?}", error); self.failures += 1; // Note: For backward-compatibility the first failure is always "free" // and silent. This allows peers who use a new substream // for each ping to have successful ping exchanges with peers // that use a single substream, since every successful ping // resets `failures` to `0`. if self.failures > 1 { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error))); } } // Continue outbound pings. match self.outbound.take() { Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) { Poll::Pending => { self.outbound = Some(OutboundState::Ping(ping)); break; } Poll::Ready(Ok((stream, rtt))) => { log::debug!("latency to {} is {}ms", self.peer, rtt.as_millis()); self.failures = 0; self.interval.reset(self.config.interval); self.outbound = Some(OutboundState::Idle(stream)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt))); } Poll::Ready(Err(e)) => { self.pending_errors.push_front(e); } }, Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) { Poll::Pending => { self.outbound = Some(OutboundState::Idle(stream)); break; } Poll::Ready(()) => { self.outbound = Some(OutboundState::Ping( send_ping(stream, self.config.timeout).boxed(), )); } }, Some(OutboundState::OpenStream) => { self.outbound = Some(OutboundState::OpenStream); break; } None => { self.outbound = Some(OutboundState::OpenStream); let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol, }); } } } Poll::Pending } fn on_connection_event( &mut self, event: ConnectionEvent< Self::InboundProtocol, Self::OutboundProtocol, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol: stream, .. }) => { self.inbound = Some(protocol::recv_ping(stream).boxed()); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol: stream, .. }) => { self.outbound = Some(OutboundState::Ping( send_ping(stream, self.config.timeout).boxed(), )); } ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) | ConnectionEvent::LocalProtocolsChange(_) | ConnectionEvent::RemoteProtocolsChange(_) => {} } } } type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>; type PongFuture = BoxFuture<'static, Result>; /// The current state w.r.t. outbound pings. enum OutboundState { /// A new substream is being negotiated for the ping protocol. OpenStream, /// The substream is idle, waiting to send the next ping. Idle(Stream), /// A ping is being sent and the response awaited. Ping(PingFuture), } /// A wrapper around [`protocol::send_ping`] that enforces a time out. async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> { let ping = protocol::send_ping(stream); futures::pin_mut!(ping); match future::select(ping, Delay::new(timeout)).await { Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)), Either::Left((Err(e), _)) => Err(Failure::other(e)), Either::Right(((), _)) => Err(Failure::Timeout), } }