libp2p-ping improvements. (#1049)

* libp2p-ping improvements.

  * re #950: Removes use of the `OneShotHandler`, but still sending each
    ping over a new substream, as seems to be intentional since #828.

  * re #842: Adds an integration test that exercises the ping behaviour through
    a Swarm, requiring the RTT to be below a threshold. This requires disabling
    Nagle's algorithm as it can interact badly with delayed ACKs (and has been
    observed to do so in the context of the new ping example and integration test).

  * re #864: Control of the inbound and outbound (sub)stream protocol upgrade
    timeouts has been moved from the `NodeHandlerWrapperBuilder` to the
    `ProtocolsHandler`. That may also alleviate the need for a custom timeout
    on an `OutboundSubstreamRequest` as a `ProtocolsHandler` is now free to
    adjust these timeouts over time.

Other changes:

  * A new ping example.
  * Documentation improvements.

* More documentation improvements.

* Add PingPolicy and ensure no event is dropped.

* Remove inbound_timeout/outbound_timeout.

As per review comment, the inbound timeout is now configured
as part of the `listen_protocol` and the outbound timeout as
part of the `OutboundSubstreamRequest`.

* Simplify and generalise.

Generalise `ListenProtocol` to `SubstreamProtocol`, reusing it in
the context of `ProtocolsHandlerEvent::OutboundSubstreamRequest`.

* Doc comments for SubstreamProtocol.

* Adapt to changes in master.

* Relax upper bound for ping integration test rtt.

For "slow" CI build machines?
This commit is contained in:
Roman Borschel
2019-04-16 15:57:29 +02:00
committed by GitHub
parent 9b6336672b
commit bee5c58b27
22 changed files with 897 additions and 382 deletions

View File

@ -21,9 +21,9 @@
use futures::{prelude::*, future, try_ready};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated};
use log::debug;
use rand::{distributions::Standard, prelude::*, rngs::EntropyRng};
use rand::{distributions, prelude::*};
use std::{io, iter, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::{io as nio, AsyncRead, AsyncWrite};
/// Represents a prototype for an upgrade to handle the ping protocol.
///
@ -33,8 +33,14 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// - Listener receives the data and sends it back.
/// - Dialer receives the data and verifies that it matches what it sent.
///
/// The dialer produces a `Duration`, which corresponds to the time between when we flushed the
/// substream and when we received back the payload.
/// The dialer produces a `Duration`, which corresponds to the round-trip time
/// of the payload.
///
/// > **Note**: The round-trip time of a ping may be subject to delays induced
/// > by the underlying transport, e.g. in the case of TCP there is
/// > Nagle's algorithm, delayed acks and similar configuration options
/// > which can affect latencies especially on otherwise low-volume
/// > connections.
#[derive(Default, Debug, Copy, Clone)]
pub struct Ping;
@ -47,20 +53,33 @@ impl UpgradeInfo for Ping {
}
}
type RecvPing<T> = nio::ReadExact<Negotiated<T>, [u8; 32]>;
type SendPong<T> = nio::WriteAll<Negotiated<T>, [u8; 32]>;
type Flush<T> = nio::Flush<Negotiated<T>>;
type Shutdown<T> = nio::Shutdown<Negotiated<T>>;
impl<TSocket> InboundUpgrade<TSocket> for Ping
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = ();
type Error = io::Error;
type Future = future::Map<future::AndThen<future::AndThen<future::AndThen<tokio_io::io::ReadExact<Negotiated<TSocket>, [u8; 32]>, tokio_io::io::WriteAll<Negotiated<TSocket>, [u8; 32]>, fn((Negotiated<TSocket>, [u8; 32])) -> tokio_io::io::WriteAll<Negotiated<TSocket>, [u8; 32]>>, tokio_io::io::Flush<Negotiated<TSocket>>, fn((Negotiated<TSocket>, [u8; 32])) -> tokio_io::io::Flush<Negotiated<TSocket>>>, tokio_io::io::Shutdown<Negotiated<TSocket>>, fn(Negotiated<TSocket>) -> tokio_io::io::Shutdown<Negotiated<TSocket>>>, fn(Negotiated<TSocket>) -> ()>;
type Future = future::Map<
future::AndThen<
future::AndThen<
future::AndThen<
RecvPing<TSocket>,
SendPong<TSocket>, fn((Negotiated<TSocket>, [u8; 32])) -> SendPong<TSocket>>,
Flush<TSocket>, fn((Negotiated<TSocket>, [u8; 32])) -> Flush<TSocket>>,
Shutdown<TSocket>, fn(Negotiated<TSocket>) -> Shutdown<TSocket>>,
fn(Negotiated<TSocket>) -> ()>;
#[inline]
fn upgrade_inbound(self, socket: Negotiated<TSocket>, _: Self::Info) -> Self::Future {
tokio_io::io::read_exact(socket, [0; 32])
.and_then::<fn(_) -> _, _>(|(socket, buffer)| tokio_io::io::write_all(socket, buffer))
.and_then::<fn(_) -> _, _>(|(socket, _)| tokio_io::io::flush(socket))
.and_then::<fn(_) -> _, _>(|socket| tokio_io::io::shutdown(socket))
nio::read_exact(socket, [0; 32])
.and_then::<fn(_) -> _, _>(|(sock, buf)| nio::write_all(sock, buf))
.and_then::<fn(_) -> _, _>(|(sock, _)| nio::flush(sock))
.and_then::<fn(_) -> _, _>(|sock| nio::shutdown(sock))
.map(|_| ())
}
}
@ -75,143 +94,135 @@ where
#[inline]
fn upgrade_outbound(self, socket: Negotiated<TSocket>, _: Self::Info) -> Self::Future {
let payload: [u8; 32] = EntropyRng::default().sample(Standard);
debug!("Preparing for ping with payload {:?}", payload);
let payload: [u8; 32] = thread_rng().sample(distributions::Standard);
debug!("Preparing ping payload {:?}", payload);
PingDialer {
inner: PingDialerInner::Write {
inner: tokio_io::io::write_all(socket, payload),
state: PingDialerState::Write {
inner: nio::write_all(socket, payload),
},
}
}
}
/// Sends a ping and receives a pong.
///
/// Implements `Future`. Finishes when the pong has arrived and has been verified.
/// A `PingDialer` is a future that sends a ping and expects to receive a pong.
pub struct PingDialer<TSocket> {
inner: PingDialerInner<TSocket>,
state: PingDialerState<TSocket>
}
enum PingDialerInner<TSocket> {
enum PingDialerState<TSocket> {
Write {
inner: tokio_io::io::WriteAll<TSocket, [u8; 32]>,
inner: nio::WriteAll<TSocket, [u8; 32]>,
},
Flush {
inner: tokio_io::io::Flush<TSocket>,
ping_payload: [u8; 32],
inner: nio::Flush<TSocket>,
payload: [u8; 32],
},
Read {
inner: tokio_io::io::ReadExact<TSocket, [u8; 32]>,
ping_payload: [u8; 32],
inner: nio::ReadExact<TSocket, [u8; 32]>,
payload: [u8; 32],
started: Instant,
},
Shutdown {
inner: tokio_io::io::Shutdown<TSocket>,
ping_time: Duration,
inner: nio::Shutdown<TSocket>,
rtt: Duration,
},
}
impl<TSocket> Future for PingDialer<TSocket>
where TSocket: AsyncRead + AsyncWrite,
where
TSocket: AsyncRead + AsyncWrite,
{
type Item = Duration;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let new_state = match self.inner {
PingDialerInner::Write { ref mut inner } => {
let (socket, ping_payload) = try_ready!(inner.poll());
PingDialerInner::Flush {
inner: tokio_io::io::flush(socket),
ping_payload,
self.state = match self.state {
PingDialerState::Write { ref mut inner } => {
let (socket, payload) = try_ready!(inner.poll());
PingDialerState::Flush {
inner: nio::flush(socket),
payload,
}
},
PingDialerInner::Flush { ref mut inner, ping_payload } => {
PingDialerState::Flush { ref mut inner, payload } => {
let socket = try_ready!(inner.poll());
let started = Instant::now();
PingDialerInner::Read {
inner: tokio_io::io::read_exact(socket, [0; 32]),
ping_payload,
PingDialerState::Read {
inner: nio::read_exact(socket, [0; 32]),
payload,
started,
}
},
PingDialerInner::Read { ref mut inner, ping_payload, started } => {
let (socket, obtained) = try_ready!(inner.poll());
let ping_time = started.elapsed();
if obtained != ping_payload {
return Err(io::Error::new(io::ErrorKind::InvalidData,
"Received ping payload doesn't match expected"));
PingDialerState::Read { ref mut inner, payload, started } => {
let (socket, payload_received) = try_ready!(inner.poll());
let rtt = started.elapsed();
if payload_received != payload {
return Err(io::Error::new(
io::ErrorKind::InvalidData, "Ping payload mismatch"));
}
PingDialerInner::Shutdown {
inner: tokio_io::io::shutdown(socket),
ping_time,
PingDialerState::Shutdown {
inner: nio::shutdown(socket),
rtt,
}
},
PingDialerInner::Shutdown { ref mut inner, ping_time } => {
let _ = try_ready!(inner.poll());
return Ok(Async::Ready(ping_time));
PingDialerState::Shutdown { ref mut inner, rtt } => {
try_ready!(inner.poll());
return Ok(Async::Ready(rtt));
},
};
self.inner = new_state;
}
}
}
}
/// Enum to merge the output of `Ping` for the dialer and listener.
#[derive(Debug, Copy, Clone)]
pub enum PingOutput {
/// Received a ping and sent back a pong.
Pong,
/// Sent a ping and received back a pong. Contains the ping time.
Ping(Duration),
}
impl From<Duration> for PingOutput {
#[inline]
fn from(duration: Duration) -> PingOutput {
PingOutput::Ping(duration)
}
}
impl From<()> for PingOutput {
#[inline]
fn from(_: ()) -> PingOutput {
PingOutput::Pong
}
}
#[cfg(test)]
mod tests {
use tokio_tcp::{TcpListener, TcpStream};
use super::Ping;
use futures::{Future, Stream};
use libp2p_core::upgrade;
// TODO: rewrite tests with the MemoryTransport
use futures::prelude::*;
use libp2p_core::{
upgrade,
multiaddr::multiaddr,
transport::{
Transport,
ListenerEvent,
memory::MemoryTransport
}
};
use rand::{thread_rng, Rng};
use std::time::Duration;
#[test]
fn ping_pong() {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let listener_addr = listener.local_addr().unwrap();
let mem_addr = multiaddr![Memory(thread_rng().gen::<u64>())];
let mut listener = MemoryTransport.listen_on(mem_addr).unwrap();
let listener_addr =
if let Ok(Async::Ready(Some(ListenerEvent::NewAddress(a)))) = listener.poll() {
a
} else {
panic!("MemoryTransport not listening on an address!");
};
let server = listener
.incoming()
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| {
upgrade::apply_inbound(c.unwrap(), Ping::default()).map_err(|_| panic!())
.and_then(|(listener_event, _)| {
let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap();
let conn = listener_upgrade.wait().unwrap();
upgrade::apply_inbound(conn, Ping::default())
.map_err(|e| panic!(e))
});
let client = TcpStream::connect(&listener_addr)
let client = MemoryTransport.dial(listener_addr).unwrap()
.and_then(|c| {
upgrade::apply_outbound(c, Ping::default()).map_err(|_| panic!())
})
.map(|_| ());
upgrade::apply_outbound(c, Ping::default())
.map_err(|e| panic!(e))
});
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(server.select(client).map_err(|_| panic!())).unwrap();
runtime.spawn(server.map_err(|e| panic!(e)));
let rtt = runtime.block_on(client).expect("RTT");
assert!(rtt > Duration::from_secs(0));
}
}