Fix tests of libp2p-ping (#1321)

This commit is contained in:
Pierre Krieger 2019-11-25 17:33:59 +01:00 committed by Toralf Wittner
parent b7644722ee
commit e083e82212
4 changed files with 57 additions and 90 deletions

View File

@ -21,9 +21,8 @@ wasm-timer = "0.2"
void = "1.0" void = "1.0"
[dev-dependencies] [dev-dependencies]
async-std = "1.0"
libp2p-tcp = { version = "0.13.0", path = "../../transports/tcp" } libp2p-tcp = { version = "0.13.0", path = "../../transports/tcp" }
libp2p-secio = { version = "0.13.0", path = "../../protocols/secio" } libp2p-secio = { version = "0.13.0", path = "../../protocols/secio" }
libp2p-yamux = { version = "0.13.0", path = "../../muxers/yamux" } libp2p-yamux = { version = "0.13.0", path = "../../muxers/yamux" }
quickcheck = "0.9.0" quickcheck = "0.9.0"
tokio = "0.1"
tokio-tcp = "0.1"

View File

@ -265,11 +265,10 @@ where
mod tests { mod tests {
use super::*; use super::*;
use async_std::net::TcpStream;
use futures::future; use futures::future;
use quickcheck::*; use quickcheck::*;
use rand::Rng; use rand::Rng;
use tokio_tcp::TcpStream;
use tokio::runtime::current_thread::Runtime;
impl Arbitrary for PingConfig { impl Arbitrary for PingConfig {
fn arbitrary<G: Gen>(g: &mut G) -> PingConfig { fn arbitrary<G: Gen>(g: &mut G) -> PingConfig {
@ -280,11 +279,10 @@ mod tests {
} }
} }
fn tick(h: &mut PingHandler<TcpStream>) -> Result< fn tick(h: &mut PingHandler<TcpStream>)
ProtocolsHandlerEvent<protocol::Ping, (), PingResult>, -> ProtocolsHandlerEvent<protocol::Ping, (), PingResult, PingFailure>
PingFailure {
> { futures::executor::block_on(future::poll_fn(|cx| h.poll(cx) ))
futures::executor::block_on(future::poll_fn(|| h.poll() ))
} }
#[test] #[test]
@ -292,34 +290,25 @@ mod tests {
fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool { fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool {
let mut h = PingHandler::<TcpStream>::new(cfg); let mut h = PingHandler::<TcpStream>::new(cfg);
// The first ping is scheduled "immediately".
let start = h.next_ping.deadline();
assert!(start <= Instant::now());
// Send ping // Send ping
match tick(&mut h) { match tick(&mut h) {
Ok(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ }) => { ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ } => {
// The handler must use the configured timeout. // The handler must use the configured timeout.
assert_eq!(protocol.timeout(), &h.config.timeout); assert_eq!(protocol.timeout(), &h.config.timeout);
// The next ping must be scheduled no earlier than the ping timeout.
assert!(h.next_ping.deadline() >= start + h.config.timeout);
} }
e => panic!("Unexpected event: {:?}", e) e => panic!("Unexpected event: {:?}", e)
} }
let now = Instant::now();
// Receive pong // Receive pong
h.inject_fully_negotiated_outbound(ping_rtt, ()); h.inject_fully_negotiated_outbound(ping_rtt, ());
match tick(&mut h) { match tick(&mut h) {
Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt }))) => { ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt })) => {
// The handler must report the given RTT. // The handler must report the given RTT.
assert_eq!(rtt, ping_rtt); assert_eq!(rtt, ping_rtt);
// The next ping must be scheduled no earlier than the ping interval.
assert!(now + h.config.interval <= h.next_ping.deadline());
} }
e => panic!("Unexpected event: {:?}", e) e => panic!("Unexpected event: {:?}", e)
} }
true true
} }
@ -333,20 +322,20 @@ mod tests {
for _ in 0 .. h.config.max_failures.get() - 1 { for _ in 0 .. h.config.max_failures.get() - 1 {
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout); h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
match tick(&mut h) { match tick(&mut h) {
Ok(ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout))) => {} ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout)) => {}
e => panic!("Unexpected event: {:?}", e) e => panic!("Unexpected event: {:?}", e)
} }
} }
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout); h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
match tick(&mut h) { match tick(&mut h) {
Err(PingFailure::Timeout) => { ProtocolsHandlerEvent::Close(PingFailure::Timeout) => {
assert_eq!(h.failures, h.config.max_failures.get()); assert_eq!(h.failures, h.config.max_failures.get());
} }
e => panic!("Unexpected event: {:?}", e) e => panic!("Unexpected event: {:?}", e)
} }
h.inject_fully_negotiated_outbound(Duration::from_secs(1), ()); h.inject_fully_negotiated_outbound(Duration::from_secs(1), ());
match tick(&mut h) { match tick(&mut h) {
Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. }))) => { ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. })) => {
// A success resets the counter for consecutive failures. // A success resets the counter for consecutive failures.
assert_eq!(h.failures, 0); assert_eq!(h.failures, 0);
} }

View File

@ -121,31 +121,23 @@ mod tests {
let mut listener = MemoryTransport.listen_on(mem_addr).unwrap(); let mut listener = MemoryTransport.listen_on(mem_addr).unwrap();
let listener_addr = let listener_addr =
if let Ok(Poll::Ready(Some(ListenerEvent::NewAddress(a)))) = listener.poll() { if let Some(Some(Ok(ListenerEvent::NewAddress(a)))) = listener.next().now_or_never() {
a a
} else { } else {
panic!("MemoryTransport not listening on an address!"); panic!("MemoryTransport not listening on an address!");
}; };
async_std::task::spawn(async move {
let listener_event = listener.next().await.unwrap();
let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap();
let conn = listener_upgrade.await.unwrap();
upgrade::apply_inbound(conn, Ping::default()).await.unwrap();
});
let server = listener async_std::task::block_on(async move {
.into_future() let c = MemoryTransport.dial(listener_addr).unwrap().await.unwrap();
.map_err(|(e, _)| e) let rtt = upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1).await.unwrap();
.and_then(|(listener_event, _)| { assert!(rtt > Duration::from_secs(0));
let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap(); });
let conn = listener_upgrade.wait().unwrap();
upgrade::apply_inbound(conn, Ping::default())
.map_err(|e| panic!(e))
});
let client = MemoryTransport.dial(listener_addr).unwrap()
.and_then(|c| {
upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1)
.map_err(|e| panic!(e))
});
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(server.map_err(|e| panic!(e)));
let rtt = runtime.block_on(client).expect("RTT");
assert!(rtt > Duration::from_secs(0));
} }
} }

View File

@ -23,20 +23,18 @@
use libp2p_core::{ use libp2p_core::{
Multiaddr, Multiaddr,
PeerId, PeerId,
Negotiated,
identity, identity,
muxing::StreamMuxerBox,
transport::{Transport, boxed::Boxed}, transport::{Transport, boxed::Boxed},
either::EitherError, either::EitherError,
upgrade::{self, UpgradeError} upgrade::{self, UpgradeError}
}; };
use libp2p_ping::*; use libp2p_ping::*;
use libp2p_yamux::{self as yamux, Yamux}; use libp2p_secio::{SecioConfig, SecioError};
use libp2p_secio::{SecioConfig, SecioOutput, SecioError};
use libp2p_swarm::Swarm; use libp2p_swarm::Swarm;
use libp2p_tcp::{TcpConfig, TcpTransStream}; use libp2p_tcp::TcpConfig;
use futures::{future, prelude::*}; use futures::{prelude::*, channel::mpsc};
use std::{io, time::Duration, sync::mpsc::sync_channel}; use std::{io, time::Duration};
use tokio::runtime::Runtime;
#[test] #[test]
fn ping() { fn ping() {
@ -48,56 +46,45 @@ fn ping() {
let (peer2_id, trans) = mk_transport(); let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone()); let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone());
let (tx, rx) = sync_channel::<Multiaddr>(1); let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let pid1 = peer1_id.clone(); let pid1 = peer1_id.clone();
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
let mut listening = false;
Swarm::listen_on(&mut swarm1, addr).unwrap(); Swarm::listen_on(&mut swarm1, addr).unwrap();
let peer1 = future::poll_fn(move || -> Result<_, ()> {
let peer1 = async move {
while let Some(_) = swarm1.next().now_or_never() {}
for l in Swarm::listeners(&swarm1) {
tx.send(l.clone()).await.unwrap();
}
loop { loop {
match swarm1.poll().expect("Error while polling swarm") { match swarm1.next().await.unwrap().unwrap() {
Async::Ready(Some(PingEvent { peer, result })) => match result { PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
Ok(PingSuccess::Ping { rtt }) => return (pid1.clone(), peer, rtt)
return Ok(Async::Ready((pid1.clone(), peer, rtt))),
_ => {}
}, },
_ => { _ => {}
if !listening {
for l in Swarm::listeners(&swarm1) {
tx.send(l.clone()).unwrap();
listening = true;
}
}
return Ok(Async::NotReady)
}
} }
} }
}); };
let pid2 = peer2_id.clone(); let pid2 = peer2_id.clone();
let mut dialing = false; let peer2 = async move {
let peer2 = future::poll_fn(move || -> Result<_, ()> { Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap();
loop { loop {
match swarm2.poll().expect("Error while polling swarm") { match swarm2.next().await.unwrap().unwrap() {
Async::Ready(Some(PingEvent { peer, result })) => match result { PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
Ok(PingSuccess::Ping { rtt }) => return (pid2.clone(), peer, rtt)
return Ok(Async::Ready((pid2.clone(), peer, rtt))),
_ => {}
}, },
_ => { _ => {}
if !dialing {
Swarm::dial_addr(&mut swarm2, rx.recv().unwrap()).unwrap();
dialing = true;
}
return Ok(Async::NotReady)
}
} }
} }
}); };
let result = peer1.select(peer2).map_err(|e| panic!(e)); let result = future::select(Box::pin(peer1), Box::pin(peer2));
let ((p1, p2, rtt), _) = futures::executor::block_on(result).unwrap(); let ((p1, p2, rtt), _) = futures::executor::block_on(result).factor_first();
assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id); assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id);
assert!(rtt < Duration::from_millis(50)); assert!(rtt < Duration::from_millis(50));
} }
@ -105,7 +92,7 @@ fn ping() {
fn mk_transport() -> ( fn mk_transport() -> (
PeerId, PeerId,
Boxed< Boxed<
(PeerId, Yamux<Negotiated<SecioOutput<Negotiated<TcpTransStream>>>>), (PeerId, StreamMuxerBox),
EitherError<EitherError<io::Error, UpgradeError<SecioError>>, UpgradeError<io::Error>> EitherError<EitherError<io::Error, UpgradeError<SecioError>>, UpgradeError<io::Error>>
> >
) { ) {
@ -115,8 +102,8 @@ fn mk_transport() -> (
.nodelay(true) .nodelay(true)
.upgrade(upgrade::Version::V1) .upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(id_keys)) .authenticate(SecioConfig::new(id_keys))
.multiplex(yamux::Config::default()) .multiplex(libp2p_yamux::Config::default())
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed(); .boxed();
(peer_id, transport) (peer_id, transport)
} }