From e083e82212b7f5f02a0884d39a9b0e3d0ae7684e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 25 Nov 2019 17:33:59 +0100 Subject: [PATCH] Fix tests of libp2p-ping (#1321) --- protocols/ping/Cargo.toml | 3 +- protocols/ping/src/handler.rs | 33 +++++---------- protocols/ping/src/protocol.rs | 34 ++++++--------- protocols/ping/tests/ping.rs | 77 ++++++++++++++-------------------- 4 files changed, 57 insertions(+), 90 deletions(-) diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index d2f561ca..e8b0de35 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -21,9 +21,8 @@ wasm-timer = "0.2" void = "1.0" [dev-dependencies] +async-std = "1.0" libp2p-tcp = { version = "0.13.0", path = "../../transports/tcp" } libp2p-secio = { version = "0.13.0", path = "../../protocols/secio" } libp2p-yamux = { version = "0.13.0", path = "../../muxers/yamux" } quickcheck = "0.9.0" -tokio = "0.1" -tokio-tcp = "0.1" diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 578f0b7b..e7584419 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -265,11 +265,10 @@ where mod tests { use super::*; + use async_std::net::TcpStream; use futures::future; use quickcheck::*; use rand::Rng; - use tokio_tcp::TcpStream; - use tokio::runtime::current_thread::Runtime; impl Arbitrary for PingConfig { fn arbitrary(g: &mut G) -> PingConfig { @@ -280,11 +279,10 @@ mod tests { } } - fn tick(h: &mut PingHandler) -> Result< - ProtocolsHandlerEvent, - PingFailure - > { - futures::executor::block_on(future::poll_fn(|| h.poll() )) + fn tick(h: &mut PingHandler) + -> ProtocolsHandlerEvent + { + futures::executor::block_on(future::poll_fn(|cx| h.poll(cx) )) } #[test] @@ -292,34 +290,25 @@ mod tests { fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool { let mut h = PingHandler::::new(cfg); - // The first ping is scheduled "immediately". - let start = h.next_ping.deadline(); - assert!(start <= Instant::now()); - // Send ping match tick(&mut h) { - Ok(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ }) => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ } => { // The handler must use the configured timeout. assert_eq!(protocol.timeout(), &h.config.timeout); - // The next ping must be scheduled no earlier than the ping timeout. - assert!(h.next_ping.deadline() >= start + h.config.timeout); } e => panic!("Unexpected event: {:?}", e) } - let now = Instant::now(); - // Receive pong h.inject_fully_negotiated_outbound(ping_rtt, ()); match tick(&mut h) { - Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt }))) => { + ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt })) => { // The handler must report the given RTT. assert_eq!(rtt, ping_rtt); - // The next ping must be scheduled no earlier than the ping interval. - assert!(now + h.config.interval <= h.next_ping.deadline()); } e => panic!("Unexpected event: {:?}", e) } + true } @@ -333,20 +322,20 @@ mod tests { for _ in 0 .. h.config.max_failures.get() - 1 { h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout); match tick(&mut h) { - Ok(ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout))) => {} + ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout)) => {} e => panic!("Unexpected event: {:?}", e) } } h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout); match tick(&mut h) { - Err(PingFailure::Timeout) => { + ProtocolsHandlerEvent::Close(PingFailure::Timeout) => { assert_eq!(h.failures, h.config.max_failures.get()); } e => panic!("Unexpected event: {:?}", e) } h.inject_fully_negotiated_outbound(Duration::from_secs(1), ()); match tick(&mut h) { - Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. }))) => { + ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. })) => { // A success resets the counter for consecutive failures. assert_eq!(h.failures, 0); } diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index a5f105c7..df729722 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -121,31 +121,23 @@ mod tests { let mut listener = MemoryTransport.listen_on(mem_addr).unwrap(); let listener_addr = - if let Ok(Poll::Ready(Some(ListenerEvent::NewAddress(a)))) = listener.poll() { + if let Some(Some(Ok(ListenerEvent::NewAddress(a)))) = listener.next().now_or_never() { a } else { panic!("MemoryTransport not listening on an address!"); }; + + async_std::task::spawn(async move { + let listener_event = listener.next().await.unwrap(); + let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap(); + let conn = listener_upgrade.await.unwrap(); + upgrade::apply_inbound(conn, Ping::default()).await.unwrap(); + }); - let server = listener - .into_future() - .map_err(|(e, _)| e) - .and_then(|(listener_event, _)| { - let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap(); - let conn = listener_upgrade.wait().unwrap(); - upgrade::apply_inbound(conn, Ping::default()) - .map_err(|e| panic!(e)) - }); - - let client = MemoryTransport.dial(listener_addr).unwrap() - .and_then(|c| { - upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1) - .map_err(|e| panic!(e)) - }); - - let mut runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.spawn(server.map_err(|e| panic!(e))); - let rtt = runtime.block_on(client).expect("RTT"); - assert!(rtt > Duration::from_secs(0)); + async_std::task::block_on(async move { + let c = MemoryTransport.dial(listener_addr).unwrap().await.unwrap(); + let rtt = upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1).await.unwrap(); + assert!(rtt > Duration::from_secs(0)); + }); } } diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 1b9fbc77..2c214319 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -23,20 +23,18 @@ use libp2p_core::{ Multiaddr, PeerId, - Negotiated, identity, + muxing::StreamMuxerBox, transport::{Transport, boxed::Boxed}, either::EitherError, upgrade::{self, UpgradeError} }; use libp2p_ping::*; -use libp2p_yamux::{self as yamux, Yamux}; -use libp2p_secio::{SecioConfig, SecioOutput, SecioError}; +use libp2p_secio::{SecioConfig, SecioError}; use libp2p_swarm::Swarm; -use libp2p_tcp::{TcpConfig, TcpTransStream}; -use futures::{future, prelude::*}; -use std::{io, time::Duration, sync::mpsc::sync_channel}; -use tokio::runtime::Runtime; +use libp2p_tcp::TcpConfig; +use futures::{prelude::*, channel::mpsc}; +use std::{io, time::Duration}; #[test] fn ping() { @@ -48,56 +46,45 @@ fn ping() { let (peer2_id, trans) = mk_transport(); let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone()); - let (tx, rx) = sync_channel::(1); + let (mut tx, mut rx) = mpsc::channel::(1); let pid1 = peer1_id.clone(); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - let mut listening = false; Swarm::listen_on(&mut swarm1, addr).unwrap(); - let peer1 = future::poll_fn(move || -> Result<_, ()> { + + let peer1 = async move { + while let Some(_) = swarm1.next().now_or_never() {} + + for l in Swarm::listeners(&swarm1) { + tx.send(l.clone()).await.unwrap(); + } + loop { - match swarm1.poll().expect("Error while polling swarm") { - Async::Ready(Some(PingEvent { peer, result })) => match result { - Ok(PingSuccess::Ping { rtt }) => - return Ok(Async::Ready((pid1.clone(), peer, rtt))), - _ => {} + match swarm1.next().await.unwrap().unwrap() { + PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { + return (pid1.clone(), peer, rtt) }, - _ => { - if !listening { - for l in Swarm::listeners(&swarm1) { - tx.send(l.clone()).unwrap(); - listening = true; - } - } - return Ok(Async::NotReady) - } + _ => {} } } - }); + }; let pid2 = peer2_id.clone(); - let mut dialing = false; - let peer2 = future::poll_fn(move || -> Result<_, ()> { + let peer2 = async move { + Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap(); + loop { - match swarm2.poll().expect("Error while polling swarm") { - Async::Ready(Some(PingEvent { peer, result })) => match result { - Ok(PingSuccess::Ping { rtt }) => - return Ok(Async::Ready((pid2.clone(), peer, rtt))), - _ => {} + match swarm2.next().await.unwrap().unwrap() { + PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { + return (pid2.clone(), peer, rtt) }, - _ => { - if !dialing { - Swarm::dial_addr(&mut swarm2, rx.recv().unwrap()).unwrap(); - dialing = true; - } - return Ok(Async::NotReady) - } + _ => {} } } - }); + }; - let result = peer1.select(peer2).map_err(|e| panic!(e)); - let ((p1, p2, rtt), _) = futures::executor::block_on(result).unwrap(); + let result = future::select(Box::pin(peer1), Box::pin(peer2)); + let ((p1, p2, rtt), _) = futures::executor::block_on(result).factor_first(); assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id); assert!(rtt < Duration::from_millis(50)); } @@ -105,7 +92,7 @@ fn ping() { fn mk_transport() -> ( PeerId, Boxed< - (PeerId, Yamux>>>), + (PeerId, StreamMuxerBox), EitherError>, UpgradeError> > ) { @@ -115,8 +102,8 @@ fn mk_transport() -> ( .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(SecioConfig::new(id_keys)) - .multiplex(yamux::Config::default()) + .multiplex(libp2p_yamux::Config::default()) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .boxed(); (peer_id, transport) } -