diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index 6d82b843..3fb1ef6d 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -1,5 +1,9 @@ # 0.21.0 [unreleased] +- Refactor the ping protocol for conformity by (re)using +a single substream for outbound pings, addressing +[#1601](https://github.com/libp2p/rust-libp2p/issues/1601). + - Bump `libp2p-core` and `libp2p-swarm` dependencies. # 0.20.0 [2020-07-01] diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 78aa6453..db6eba0f 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -20,14 +20,23 @@ use crate::protocol; use futures::prelude::*; +use futures::future::BoxFuture; use libp2p_swarm::{ KeepAlive, + NegotiatedSubstream, SubstreamProtocol, ProtocolsHandler, ProtocolsHandlerUpgrErr, ProtocolsHandlerEvent }; -use std::{error::Error, io, fmt, num::NonZeroU32, pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{ + error::Error, + io, + fmt, + num::NonZeroU32, + task::{Context, Poll}, + time::Duration +}; use std::collections::VecDeque; use wasm_timer::Delay; use void::Void; @@ -160,13 +169,21 @@ impl Error for PingFailure { pub struct PingHandler { /// Configuration options. config: PingConfig, - /// The timer for when to send the next ping. - next_ping: Delay, - /// The pending results from inbound or outbound pings, ready - /// to be `poll()`ed. - pending_results: VecDeque, + /// The timer used for the delay to the next ping as well as + /// the ping timeout. + timer: Delay, + /// Outbound ping failures that are pending to be processed by `poll()`. + pending_errors: VecDeque, /// The number of consecutive ping failures that occurred. + /// + /// Each successful ping resets this counter to 0. failures: u32, + /// The outbound ping state. + outbound: Option, + /// The inbound pong handler, i.e. if there is an inbound + /// substream, this is always a future that waits for the + /// next inbound ping to be answered. + inbound: Option, } impl PingHandler { @@ -174,9 +191,11 @@ impl PingHandler { pub fn new(config: PingConfig) -> Self { PingHandler { config, - next_ping: Delay::new(Duration::new(0, 0)), - pending_results: VecDeque::with_capacity(2), + timer: Delay::new(Duration::new(0, 0)), + pending_errors: VecDeque::with_capacity(2), failures: 0, + outbound: None, + inbound: None, } } } @@ -193,24 +212,25 @@ impl ProtocolsHandler for PingHandler { SubstreamProtocol::new(protocol::Ping) } - fn inject_fully_negotiated_inbound(&mut self, _: ()) { - // A ping from a remote peer has been answered. - self.pending_results.push_front(Ok(PingSuccess::Pong)); + fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream) { + self.inbound = Some(protocol::recv_ping(stream).boxed()); } - fn inject_fully_negotiated_outbound(&mut self, rtt: Duration, _info: ()) { - // A ping initiated by the local peer was answered by the remote. - self.pending_results.push_front(Ok(PingSuccess::Ping { rtt })); + fn inject_fully_negotiated_outbound(&mut self, stream: NegotiatedSubstream, (): ()) { + self.timer.reset(self.config.timeout); + self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed())); } fn inject_event(&mut self, _: Void) {} - fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr) { - self.pending_results.push_front( - Err(match error { + fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr) { + self.outbound = None; // Request a new substream on the next `poll`. + self.pending_errors.push_front( + match error { + // Note: This timeout only covers protocol negotiation. ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout, - e => PingFailure::Other { error: Box::new(e) } - })) + e => PingFailure::Other { error: Box::new(e) }, + }) } fn connection_keep_alive(&self) -> KeepAlive { @@ -222,117 +242,117 @@ impl ProtocolsHandler for PingHandler { } fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { - if let Some(result) = self.pending_results.pop_back() { - if let Ok(PingSuccess::Ping { .. }) = result { - self.failures = 0; - self.next_ping.reset(self.config.interval); + // Respond to inbound pings. + if let Some(fut) = self.inbound.as_mut() { + match fut.poll_unpin(cx) { + Poll::Pending => {}, + Poll::Ready(Err(e)) => { + log::debug!("Inbound ping error: {:?}", e); + self.inbound = None; + } + Poll::Ready(Ok(stream)) => { + // A ping from a remote peer has been answered, wait for the next. + self.inbound = Some(protocol::recv_ping(stream).boxed()); + return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Pong))) + } } - if let Err(e) = result { + } + + loop { + // Check for outbound ping failures. + if let Some(error) = self.pending_errors.pop_back() { + log::debug!("Ping failure: {:?}", error); + self.failures += 1; - if self.failures >= self.config.max_failures.get() { - return Poll::Ready(ProtocolsHandlerEvent::Close(e)) - } else { - return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(e))) + + // Note: For backward-compatibility, with configured + // `max_failures == 1`, the first failure is always "free" + // and silent. This allows peers who still use a new substream + // for each ping to have successful ping exchanges with peers + // that use a single substream, since every successful ping + // resets `failures` to `0`, while at the same time emitting + // events only for `max_failures - 1` failures, as before. + if self.failures > 1 || self.config.max_failures.get() > 1 { + if self.failures >= self.config.max_failures.get() { + log::debug!("Too many failures ({}). Closing connection.", self.failures); + return Poll::Ready(ProtocolsHandlerEvent::Close(error)) + } + + return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(error))) + } + } + + // Continue outbound pings. + match self.outbound.take() { + Some(PingState::Ping(mut ping)) => match ping.poll_unpin(cx) { + Poll::Pending => { + if self.timer.poll_unpin(cx).is_ready() { + self.pending_errors.push_front(PingFailure::Timeout); + } else { + self.outbound = Some(PingState::Ping(ping)); + break + } + }, + Poll::Ready(Ok((stream, rtt))) => { + self.failures = 0; + self.timer.reset(self.config.interval); + self.outbound = Some(PingState::Idle(stream)); + return Poll::Ready( + ProtocolsHandlerEvent::Custom( + Ok(PingSuccess::Ping { rtt }))) + } + Poll::Ready(Err(e)) => { + self.pending_errors.push_front(PingFailure::Other { + error: Box::new(e) + }); + } + }, + Some(PingState::Idle(stream)) => match self.timer.poll_unpin(cx) { + Poll::Pending => { + self.outbound = Some(PingState::Idle(stream)); + break + }, + Poll::Ready(Ok(())) => { + self.timer.reset(self.config.timeout); + self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed())); + }, + Poll::Ready(Err(e)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close( + PingFailure::Other { + error: Box::new(e) + })) + } + } + Some(PingState::OpenStream) => { + self.outbound = Some(PingState::OpenStream); + break + } + None => { + self.outbound = Some(PingState::OpenStream); + let protocol = SubstreamProtocol::new(protocol::Ping) + .with_timeout(self.config.timeout); + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol, + info: (), + }) } } - return Poll::Ready(ProtocolsHandlerEvent::Custom(result)) } - match Future::poll(Pin::new(&mut self.next_ping), cx) { - Poll::Ready(Ok(())) => { - self.next_ping.reset(self.config.timeout); - let protocol = SubstreamProtocol::new(protocol::Ping) - .with_timeout(self.config.timeout); - Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol, - info: (), - }) - }, - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => - Poll::Ready(ProtocolsHandlerEvent::Close(PingFailure::Other { error: Box::new(e) })) - } + Poll::Pending } } -#[cfg(test)] -mod tests { - use super::*; +type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>; +type PongFuture = BoxFuture<'static, Result>; - use futures::future; - use quickcheck::*; - use rand::Rng; - - impl Arbitrary for PingConfig { - fn arbitrary(g: &mut G) -> PingConfig { - PingConfig::new() - .with_timeout(Duration::from_secs(g.gen_range(0, 3600))) - .with_interval(Duration::from_secs(g.gen_range(0, 3600))) - .with_max_failures(NonZeroU32::new(g.gen_range(1, 100)).unwrap()) - } - } - - fn tick(h: &mut PingHandler) - -> ProtocolsHandlerEvent - { - async_std::task::block_on(future::poll_fn(|cx| h.poll(cx) )) - } - - #[test] - fn ping_interval() { - fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool { - let mut h = PingHandler::new(cfg); - - // Send ping - match tick(&mut h) { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ } => { - // The handler must use the configured timeout. - assert_eq!(protocol.timeout(), &h.config.timeout); - } - e => panic!("Unexpected event: {:?}", e) - } - - // Receive pong - h.inject_fully_negotiated_outbound(ping_rtt, ()); - match tick(&mut h) { - ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt })) => { - // The handler must report the given RTT. - assert_eq!(rtt, ping_rtt); - } - e => panic!("Unexpected event: {:?}", e) - } - - true - } - - quickcheck(prop as fn(_,_) -> _); - } - - #[test] - fn max_failures() { - let cfg = PingConfig::arbitrary(&mut StdGen::new(rand::thread_rng(), 100)); - let mut h = PingHandler::new(cfg); - for _ in 0 .. h.config.max_failures.get() - 1 { - h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout); - match tick(&mut h) { - ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout)) => {} - e => panic!("Unexpected event: {:?}", e) - } - } - h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout); - match tick(&mut h) { - ProtocolsHandlerEvent::Close(PingFailure::Timeout) => { - assert_eq!(h.failures, h.config.max_failures.get()); - } - e => panic!("Unexpected event: {:?}", e) - } - h.inject_fully_negotiated_outbound(Duration::from_secs(1), ()); - match tick(&mut h) { - ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. })) => { - // A success resets the counter for consecutive failures. - assert_eq!(h.failures, 0); - } - e => panic!("Unexpected event: {:?}", e) - } - } +/// The current state w.r.t. outbound pings. +enum PingState { + /// A new substream is being negotiated for the ping protocol. + OpenStream, + /// The substream is idle, waiting to send the next ping. + Idle(NegotiatedSubstream), + /// A ping is being sent and the response awaited. + Ping(PingFuture), } + diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 386465e1..659ab9f0 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -28,15 +28,14 @@ //! //! The [`Ping`] struct implements the [`NetworkBehaviour`] trait. When used with a [`Swarm`], //! it will respond to inbound ping requests and as necessary periodically send outbound -//! ping requests on every established connection. If a configurable number of pings fail, -//! the connection will be closed. +//! ping requests on every established connection. If a configurable number of consecutive +//! pings fail, the connection will be closed. //! //! The `Ping` network behaviour produces [`PingEvent`]s, which may be consumed from the `Swarm` //! by an application, e.g. to collect statistics. //! -//! > **Note**: The ping protocol does not keep otherwise idle connections alive, -//! > it only adds an additional condition for terminating the connection, namely -//! > a certain number of failed ping requests. +//! > **Note**: The ping protocol does not keep otherwise idle connections alive +//! > by default, see [`PingConfig::with_keep_alive`] for changing this behaviour. //! //! [`Swarm`]: libp2p_swarm::Swarm //! [`Transport`]: libp2p_core::Transport diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 8e7e963b..f0437985 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -18,23 +18,27 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{future::BoxFuture, prelude::*}; +use futures::prelude::*; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use log::debug; +use libp2p_swarm::NegotiatedSubstream; use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; +use void::Void; use wasm_timer::Instant; -/// Represents a prototype for an upgrade to handle the ping protocol. +/// The `Ping` protocol upgrade. /// -/// The protocol works the following way: +/// The ping protocol sends 32 bytes of random data in configurable +/// intervals over a single outbound substream, expecting to receive +/// the same bytes as a response. At the same time, incoming pings +/// on inbound substreams are answered by sending back the received bytes. /// -/// - Dialer sends 32 bytes of random data. -/// - Listener receives the data and sends it back. -/// - Dialer receives the data and verifies that it matches what it sent. +/// At most a single inbound and outbound substream is kept open at +/// any time. In case of a ping timeout or another error on a substream, the +/// substream is dropped. If a configurable number of consecutive +/// outbound pings fail, the connection is closed. /// -/// The dialer produces a `Duration`, which corresponds to the round-trip time -/// of the payload. +/// Successful pings report the round-trip time. /// /// > **Note**: The round-trip time of a ping may be subject to delays induced /// > by the underlying transport, e.g. in the case of TCP there is @@ -55,59 +59,60 @@ impl UpgradeInfo for Ping { } } -impl InboundUpgrade for Ping -where - TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, -{ - type Output = (); - type Error = io::Error; - type Future = BoxFuture<'static, Result<(), io::Error>>; +impl InboundUpgrade for Ping { + type Output = NegotiatedSubstream; + type Error = Void; + type Future = future::Ready>; - fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { - async move { - let mut payload = [0u8; PING_SIZE]; - while let Ok(_) = socket.read_exact(&mut payload).await { - socket.write_all(&payload).await?; - } - socket.close().await?; - Ok(()) - }.boxed() + fn upgrade_inbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + future::ok(stream) } } -impl OutboundUpgrade for Ping -where - TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, -{ - type Output = Duration; - type Error = io::Error; - type Future = BoxFuture<'static, Result>; +impl OutboundUpgrade for Ping { + type Output = NegotiatedSubstream; + type Error = Void; + type Future = future::Ready>; - fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { - let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard); - debug!("Preparing ping payload {:?}", payload); - async move { - socket.write_all(&payload).await?; - socket.close().await?; - let started = Instant::now(); - - let mut recv_payload = [0u8; 32]; - socket.read_exact(&mut recv_payload).await?; - if recv_payload == payload { - Ok(started.elapsed()) - } else { - Err(io::Error::new(io::ErrorKind::InvalidData, "Ping payload mismatch")) - } - }.boxed() + fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + future::ok(stream) } } +/// Sends a ping and waits for the pong. +pub async fn send_ping(mut stream: S) -> io::Result<(S, Duration)> +where + S: AsyncRead + AsyncWrite + Unpin +{ + let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard); + log::debug!("Preparing ping payload {:?}", payload); + stream.write_all(&payload).await?; + let started = Instant::now(); + let mut recv_payload = [0u8; PING_SIZE]; + stream.read_exact(&mut recv_payload).await?; + if recv_payload == payload { + Ok((stream, started.elapsed())) + } else { + Err(io::Error::new(io::ErrorKind::InvalidData, "Ping payload mismatch")) + } +} + +/// Waits for a ping and sends a pong. +pub async fn recv_ping(mut stream: S) -> io::Result +where + S: AsyncRead + AsyncWrite + Unpin +{ + let mut payload = [0u8; PING_SIZE]; + stream.read_exact(&mut payload).await?; + stream.write_all(&payload).await?; + stream.flush().await?; + Ok(stream) +} + #[cfg(test)] mod tests { - use super::Ping; - use futures::prelude::*; + use super::*; use libp2p_core::{ - upgrade, multiaddr::multiaddr, transport::{ Transport, @@ -134,12 +139,12 @@ mod tests { let listener_event = listener.next().await.unwrap(); let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap(); let conn = listener_upgrade.await.unwrap(); - upgrade::apply_inbound(conn, Ping::default()).await.unwrap(); + recv_ping(conn).await.unwrap(); }); async_std::task::block_on(async move { let c = MemoryTransport.dial(listener_addr).unwrap().await.unwrap(); - let rtt = upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1).await.unwrap(); + let (_, rtt) = send_ping(c).await.unwrap(); assert!(rtt > Duration::from_secs(0)); }); } diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 30e8de60..1c2c2f58 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -30,64 +30,168 @@ use libp2p_core::{ }; use libp2p_ping::*; use libp2p_secio::SecioConfig; -use libp2p_swarm::Swarm; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_tcp::TcpConfig; use futures::{prelude::*, channel::mpsc}; -use std::{io, time::Duration}; +use quickcheck::*; +use std::{io, num::NonZeroU8, time::Duration}; #[test] -fn ping() { - let cfg = PingConfig::new().with_keep_alive(true); +fn ping_pong() { + fn prop(count: NonZeroU8) { + let cfg = PingConfig::new() + .with_keep_alive(true) + .with_interval(Duration::from_millis(10)); - let (peer1_id, trans) = mk_transport(); - let mut swarm1 = Swarm::new(trans, Ping::new(cfg.clone()), peer1_id.clone()); + let (peer1_id, trans) = mk_transport(); + let mut swarm1 = Swarm::new(trans, Ping::new(cfg.clone()), peer1_id.clone()); - let (peer2_id, trans) = mk_transport(); - let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone()); + let (peer2_id, trans) = mk_transport(); + let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone()); - let (mut tx, mut rx) = mpsc::channel::(1); + let (mut tx, mut rx) = mpsc::channel::(1); - let pid1 = peer1_id.clone(); - let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); + let pid1 = peer1_id.clone(); + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + Swarm::listen_on(&mut swarm1, addr).unwrap(); - let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} + let mut count1 = count.get(); + let mut count2 = count.get(); - for l in Swarm::listeners(&swarm1) { - tx.send(l.clone()).await.unwrap(); - } + let peer1 = async move { + while let Some(_) = swarm1.next().now_or_never() {} - loop { - match swarm1.next().await { - PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { - return (pid1.clone(), peer, rtt) - }, - _ => {} + for l in Swarm::listeners(&swarm1) { + tx.send(l.clone()).await.unwrap(); } - } - }; - let pid2 = peer2_id.clone(); - let peer2 = async move { - Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap(); - - loop { - match swarm2.next().await { - PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { - return (pid2.clone(), peer, rtt) - }, - _ => {} + loop { + match swarm1.next().await { + PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { + count1 -= 1; + if count1 == 0 { + return (pid1.clone(), peer, rtt) + } + }, + PingEvent { result: Err(e), .. } => panic!("Ping failure: {:?}", e), + _ => {} + } } - } - }; + }; - let result = future::select(Box::pin(peer1), Box::pin(peer2)); - let ((p1, p2, rtt), _) = async_std::task::block_on(result).factor_first(); - assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id); - assert!(rtt < Duration::from_millis(50)); + let pid2 = peer2_id.clone(); + let peer2 = async move { + Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap(); + + loop { + match swarm2.next().await { + PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { + count2 -= 1; + if count2 == 0 { + return (pid2.clone(), peer, rtt) + } + }, + PingEvent { result: Err(e), .. } => panic!("Ping failure: {:?}", e), + _ => {} + } + } + }; + + let result = future::select(Box::pin(peer1), Box::pin(peer2)); + let ((p1, p2, rtt), _) = async_std::task::block_on(result).factor_first(); + assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id); + assert!(rtt < Duration::from_millis(50)); + } + + QuickCheck::new().tests(3).quickcheck(prop as fn(_)) } + +/// Tests that the connection is closed upon a configurable +/// number of consecutive ping failures. +#[test] +fn max_failures() { + fn prop(max_failures: NonZeroU8) { + let cfg = PingConfig::new() + .with_keep_alive(true) + .with_interval(Duration::from_millis(10)) + .with_timeout(Duration::from_millis(0)) + .with_max_failures(max_failures.into()); + + let (peer1_id, trans) = mk_transport(); + let mut swarm1 = Swarm::new(trans, Ping::new(cfg.clone()), peer1_id.clone()); + + let (peer2_id, trans) = mk_transport(); + let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone()); + + let (mut tx, mut rx) = mpsc::channel::(1); + + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + Swarm::listen_on(&mut swarm1, addr).unwrap(); + + let peer1 = async move { + while let Some(_) = swarm1.next().now_or_never() {} + + for l in Swarm::listeners(&swarm1) { + tx.send(l.clone()).await.unwrap(); + } + + let mut count1: u8 = 0; + + loop { + match swarm1.next_event().await { + SwarmEvent::Behaviour(PingEvent { + result: Ok(PingSuccess::Ping { .. }), .. + }) => { + count1 = 0; // there may be an occasional success + } + SwarmEvent::Behaviour(PingEvent { + result: Err(_), .. + }) => { + count1 += 1; + } + SwarmEvent::ConnectionClosed { .. } => { + return count1 + } + _ => {} + } + } + }; + + let peer2 = async move { + Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap(); + + let mut count2: u8 = 0; + + loop { + match swarm2.next_event().await { + SwarmEvent::Behaviour(PingEvent { + result: Ok(PingSuccess::Ping { .. }), .. + }) => { + count2 = 0; // there may be an occasional success + } + SwarmEvent::Behaviour(PingEvent { + result: Err(_), .. + }) => { + count2 += 1; + } + SwarmEvent::ConnectionClosed { .. } => { + return count2 + } + _ => {} + } + } + }; + + let future = future::join(peer1, peer2); + let (count1, count2) = async_std::task::block_on(future); + assert_eq!(u8::max(count1, count2), max_failures.get() - 1); + } + + QuickCheck::new().tests(3).quickcheck(prop as fn(_)) +} + + fn mk_transport() -> ( PeerId, Boxed<