[ping] Add missing flush after write. (#1770)

After sending the ping and before awaiting the pong,
the stream must be flushed to guarantee the data is
sent before awaiting the reply.
This commit is contained in:
Roman Borschel
2020-09-28 10:57:02 +02:00
committed by GitHub
parent 0b18b864f2
commit f66f40bcd7
4 changed files with 45 additions and 12 deletions

View File

@ -1,3 +1,10 @@
# 0.22.1 [unreleased]
- Ensure the outbound ping is flushed before awaiting
the response. Otherwise the behaviour depends on
implementation details of the stream muxer used.
The current behaviour resulted in stalls with Mplex.
# 0.22.0 [2020-09-09]
- Update `libp2p-swarm` and `libp2p-core`.

View File

@ -2,7 +2,7 @@
name = "libp2p-ping"
edition = "2018"
description = "Ping protocol for libp2p"
version = "0.22.0"
version = "0.22.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -23,4 +23,5 @@ async-std = "1.6.2"
libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] }
libp2p-noise = { path = "../../protocols/noise" }
libp2p-yamux = { path = "../../muxers/yamux" }
libp2p-mplex = { path = "../../muxers/mplex" }
quickcheck = "0.9.0"

View File

@ -87,8 +87,10 @@ where
let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
log::debug!("Preparing ping payload {:?}", payload);
stream.write_all(&payload).await?;
stream.flush().await?;
let started = Instant::now();
let mut recv_payload = [0u8; PING_SIZE];
log::debug!("Awaiting pong for {:?}", payload);
stream.read_exact(&mut recv_payload).await?;
if recv_payload == payload {
Ok((stream, started.elapsed()))
@ -103,7 +105,9 @@ where
S: AsyncRead + AsyncWrite + Unpin
{
let mut payload = [0u8; PING_SIZE];
log::debug!("Waiting for ping ...");
stream.read_exact(&mut payload).await?;
log::debug!("Sending pong for {:?}", payload);
stream.write_all(&payload).await?;
stream.flush().await?;
Ok(stream)

View File

@ -28,25 +28,28 @@ use libp2p_core::{
transport::{Transport, boxed::Boxed},
upgrade
};
use libp2p_mplex as mplex;
use libp2p_noise as noise;
use libp2p_ping::*;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_tcp::TcpConfig;
use libp2p_yamux as yamux;
use futures::{prelude::*, channel::mpsc};
use quickcheck::*;
use rand::prelude::*;
use std::{io, num::NonZeroU8, time::Duration};
#[test]
fn ping_pong() {
fn prop(count: NonZeroU8) {
fn prop(count: NonZeroU8, muxer: MuxerChoice) {
let cfg = PingConfig::new()
.with_keep_alive(true)
.with_interval(Duration::from_millis(10));
let (peer1_id, trans) = mk_transport();
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 = Swarm::new(trans, Ping::new(cfg.clone()), peer1_id.clone());
let (peer2_id, trans) = mk_transport();
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone());
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
@ -103,25 +106,24 @@ fn ping_pong() {
assert!(rtt < Duration::from_millis(50));
}
QuickCheck::new().tests(3).quickcheck(prop as fn(_))
QuickCheck::new().tests(10).quickcheck(prop as fn(_,_))
}
/// Tests that the connection is closed upon a configurable
/// number of consecutive ping failures.
#[test]
fn max_failures() {
fn prop(max_failures: NonZeroU8) {
fn prop(max_failures: NonZeroU8, muxer: MuxerChoice) {
let cfg = PingConfig::new()
.with_keep_alive(true)
.with_interval(Duration::from_millis(10))
.with_timeout(Duration::from_millis(0))
.with_max_failures(max_failures.into());
let (peer1_id, trans) = mk_transport();
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 = Swarm::new(trans, Ping::new(cfg.clone()), peer1_id.clone());
let (peer2_id, trans) = mk_transport();
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone());
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
@ -188,11 +190,11 @@ fn max_failures() {
assert_eq!(u8::max(count1, count2), max_failures.get() - 1);
}
QuickCheck::new().tests(3).quickcheck(prop as fn(_))
QuickCheck::new().tests(10).quickcheck(prop as fn(_,_))
}
fn mk_transport() -> (
fn mk_transport(muxer: MuxerChoice) -> (
PeerId,
Boxed<
(PeerId, StreamMuxerBox),
@ -202,13 +204,32 @@ fn mk_transport() -> (
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().into_peer_id();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
let transport = TcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_yamux::Config::default())
.multiplex(match muxer {
MuxerChoice::Yamux =>
upgrade::EitherUpgrade::A(yamux::Config::default()),
MuxerChoice::Mplex =>
upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
})
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
(peer_id, transport)
}
#[derive(Debug, Copy, Clone)]
enum MuxerChoice {
Mplex,
Yamux,
}
impl Arbitrary for MuxerChoice {
fn arbitrary<G: Gen>(g: &mut G) -> MuxerChoice {
*[MuxerChoice::Mplex, MuxerChoice::Yamux].choose(g).unwrap()
}
}