From 173fc04b306392f7b6059e75b1345c3ea748cda3 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Fri, 6 Dec 2019 11:03:19 +0100 Subject: [PATCH] Fix tests. --- core/src/nodes/listeners.rs | 8 +- core/tests/network_simult.rs | 185 +++++++++--------- core/tests/transport_upgrade.rs | 35 ++-- .../src/length_delimited.rs | 20 +- transports/uds/src/lib.rs | 71 ++++--- 5 files changed, 156 insertions(+), 163 deletions(-) diff --git a/core/src/nodes/listeners.rs b/core/src/nodes/listeners.rs index 861f3e75..5663b81a 100644 --- a/core/src/nodes/listeners.rs +++ b/core/src/nodes/listeners.rs @@ -354,11 +354,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::transport::{self, ListenerEvent}; - use assert_matches::assert_matches; - use std::{io, iter::FromIterator}; - use futures::{future::{self}, stream}; - use crate::PeerId; + use crate::transport; #[test] fn incoming_event() { @@ -383,7 +379,7 @@ mod tests { }); match listeners.next().await.unwrap() { - ListenersEvent::Incoming { local_addr, upgrade, send_back_addr, .. } => { + ListenersEvent::Incoming { local_addr, send_back_addr, .. } => { assert_eq!(local_addr, address); assert_eq!(send_back_addr, address); }, diff --git a/core/tests/network_simult.rs b/core/tests/network_simult.rs index 7d7a247a..35c18315 100644 --- a/core/tests/network_simult.rs +++ b/core/tests/network_simult.rs @@ -31,10 +31,9 @@ use libp2p_swarm::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, }; -use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{io, task::Context, task::Poll, time::Duration}; use wasm_timer::Delay; -// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ? struct TestHandler(std::marker::PhantomData); impl Default for TestHandler { @@ -114,8 +113,6 @@ fn raw_swarm_simultaneous_connect() { .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); Network::new(transport, local_public_key.into_peer_id()) @@ -129,8 +126,6 @@ fn raw_swarm_simultaneous_connect() { .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); Network::new(transport, local_public_key.into_peer_id()) @@ -139,29 +134,38 @@ fn raw_swarm_simultaneous_connect() { swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm2.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let (swarm1_listen_addr, swarm2_listen_addr, mut swarm1, mut swarm2) = futures::executor::block_on( - future::lazy(move |cx| { - let swarm1_listen_addr = - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) { - listen_addr - } else { - panic!("Was expecting the listen address to be reported") - }; + let swarm1_listen_addr = future::poll_fn(|cx| { + if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) { + Poll::Ready(listen_addr) + } else { + panic!("Was expecting the listen address to be reported") + } + }) + .now_or_never() + .expect("listen address of swarm1"); - let swarm2_listen_addr = - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) { - listen_addr - } else { - panic!("Was expecting the listen address to be reported") - }; + let swarm2_listen_addr = future::poll_fn(|cx| { + if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) { + Poll::Ready(listen_addr) + } else { + panic!("Was expecting the listen address to be reported") + } + }) + .now_or_never() + .expect("listen address of swarm2"); - Ok::<_, void::Void>((swarm1_listen_addr, swarm2_listen_addr, swarm1, swarm2)) - }) - ).unwrap(); + #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] + enum Step { + Start, + Dialing, + Connected, + Replaced, + Errored + } loop { - let mut swarm1_step = 0; - let mut swarm2_step = 0; + let mut swarm1_step = Step::Start; + let mut swarm2_step = Step::Start; let mut swarm1_dial_start = Delay::new(Duration::new(0, rand::random::() % 50_000_000)); let mut swarm2_dial_start = Delay::new(Duration::new(0, rand::random::() % 50_000_000)); @@ -174,31 +178,29 @@ fn raw_swarm_simultaneous_connect() { // We add a lot of randomness. In a real-life situation the swarm also has to // handle other nodes, which may delay the processing. - if swarm1_step == 0 { - match Future::poll(Pin::new(&mut swarm1_dial_start), cx) { - Poll::Ready(_) => { - let handler = TestHandler::default().into_node_handler_builder(); - swarm1.peer(swarm2.local_peer_id().clone()) - .into_not_connected() - .unwrap() - .connect(swarm2_listen_addr.clone(), handler); - swarm1_step = 1; - }, - Poll::Pending => swarm1_not_ready = true, + if swarm1_step == Step::Start { + if swarm1_dial_start.poll_unpin(cx).is_ready() { + let handler = TestHandler::default().into_node_handler_builder(); + swarm1.peer(swarm2.local_peer_id().clone()) + .into_not_connected() + .unwrap() + .connect(swarm2_listen_addr.clone(), handler); + swarm1_step = Step::Dialing; + } else { + swarm1_not_ready = true } } - if swarm2_step == 0 { - match Future::poll(Pin::new(&mut swarm2_dial_start), cx) { - Poll::Ready(_) => { - let handler = TestHandler::default().into_node_handler_builder(); - swarm2.peer(swarm1.local_peer_id().clone()) - .into_not_connected() - .unwrap() - .connect(swarm1_listen_addr.clone(), handler); - swarm2_step = 1; - }, - Poll::Pending => swarm2_not_ready = true, + if swarm2_step == Step::Start { + if swarm2_dial_start.poll_unpin(cx).is_ready() { + let handler = TestHandler::default().into_node_handler_builder(); + swarm2.peer(swarm1.local_peer_id().clone()) + .into_not_connected() + .unwrap() + .connect(swarm1_listen_addr.clone(), handler); + swarm2_step = Step::Dialing; + } else { + swarm2_not_ready = true } } @@ -207,29 +209,29 @@ fn raw_swarm_simultaneous_connect() { Poll::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => { - assert_eq!(swarm1_step, 2); - swarm1_step = 3; - }, + assert_eq!(swarm1_step, Step::Connected); + swarm1_step = Step::Errored + } Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => { assert_eq!(conn_info, *swarm2.local_peer_id()); - if swarm1_step == 0 { + if swarm1_step == Step::Start { // The connection was established before // swarm1 started dialing; discard the test run. return Poll::Ready(false) } - assert_eq!(swarm1_step, 1); - swarm1_step = 2; - }, + assert_eq!(swarm1_step, Step::Dialing); + swarm1_step = Step::Connected + } Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => { assert_eq!(new_info, *swarm2.local_peer_id()); - assert_eq!(swarm1_step, 2); - swarm1_step = 3; - }, + assert_eq!(swarm1_step, Step::Connected); + swarm1_step = Step::Replaced + } Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { - inc.accept(TestHandler::default().into_node_handler_builder()); - }, + inc.accept(TestHandler::default().into_node_handler_builder()) + } Poll::Ready(ev) => panic!("swarm1: unexpected event: {:?}", ev), - Poll::Pending => swarm1_not_ready = true, + Poll::Pending => swarm1_not_ready = true } } @@ -238,39 +240,42 @@ fn raw_swarm_simultaneous_connect() { Poll::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => { - assert_eq!(swarm2_step, 2); - swarm2_step = 3; - }, + assert_eq!(swarm2_step, Step::Connected); + swarm2_step = Step::Errored + } Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => { assert_eq!(conn_info, *swarm1.local_peer_id()); - if swarm2_step == 0 { + if swarm2_step == Step::Start { // The connection was established before // swarm2 started dialing; discard the test run. return Poll::Ready(false) } - assert_eq!(swarm2_step, 1); - swarm2_step = 2; - }, + assert_eq!(swarm2_step, Step::Dialing); + swarm2_step = Step::Connected + } Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => { assert_eq!(new_info, *swarm1.local_peer_id()); - assert_eq!(swarm2_step, 2); - swarm2_step = 3; - }, + assert_eq!(swarm2_step, Step::Connected); + swarm2_step = Step::Replaced + } Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { - inc.accept(TestHandler::default().into_node_handler_builder()); - }, + inc.accept(TestHandler::default().into_node_handler_builder()) + } Poll::Ready(ev) => panic!("swarm2: unexpected event: {:?}", ev), - Poll::Pending => swarm2_not_ready = true, + Poll::Pending => swarm2_not_ready = true } } - // TODO: make sure that >= 5 is correct - if swarm1_step + swarm2_step >= 5 { - return Poll::Ready(true); + match (swarm1_step, swarm2_step) { + | (Step::Connected, Step::Replaced) + | (Step::Connected, Step::Errored) + | (Step::Replaced, Step::Connected) + | (Step::Errored, Step::Connected) => return Poll::Ready(true), + _else => () } if swarm1_not_ready && swarm2_not_ready { - return Poll::Pending; + return Poll::Pending } } }); @@ -278,19 +283,19 @@ fn raw_swarm_simultaneous_connect() { if futures::executor::block_on(future) { // The test exercised what we wanted to exercise: a simultaneous connect. break - } else { - // The test did not trigger a simultaneous connect; ensure the nodes - // are disconnected and re-run the test. - match swarm1.peer(swarm2.local_peer_id().clone()) { - Peer::Connected(p) => p.close(), - Peer::PendingConnect(p) => p.interrupt(), - x => panic!("Unexpected state for swarm1: {:?}", x) - } - match swarm2.peer(swarm1.local_peer_id().clone()) { - Peer::Connected(p) => p.close(), - Peer::PendingConnect(p) => p.interrupt(), - x => panic!("Unexpected state for swarm2: {:?}", x) - } + } + + // The test did not trigger a simultaneous connect; ensure the nodes + // are disconnected and re-run the test. + match swarm1.peer(swarm2.local_peer_id().clone()) { + Peer::Connected(p) => p.close(), + Peer::PendingConnect(p) => p.interrupt(), + x => panic!("Unexpected state for swarm1: {:?}", x) + } + match swarm2.peer(swarm1.local_peer_id().clone()) { + Peer::Connected(p) => p.close(), + Peer::PendingConnect(p) => p.interrupt(), + x => panic!("Unexpected state for swarm2: {:?}", x) } } } diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index f5347ca4..12e3e503 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -26,7 +26,7 @@ use libp2p_core::transport::{Transport, MemoryTransport}; use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade}; use libp2p_mplex::MplexConfig; use libp2p_secio::SecioConfig; -use multiaddr::Multiaddr; +use multiaddr::{Multiaddr, Protocol}; use rand::random; use std::{io, pin::Pin}; @@ -109,28 +109,29 @@ fn upgrade_pipeline() { util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); - let listen_addr: Multiaddr = format!("/memory/{}", random::()).parse().unwrap(); + let listen_addr1 = Multiaddr::from(Protocol::Memory(random::())); + let listen_addr2 = listen_addr1.clone(); - async_std::task::spawn({ - let listen_addr = listen_addr.clone(); - let dialer_id = dialer_id.clone(); - async move { - let mut listener = listener_transport.listen_on(listen_addr).unwrap(); - loop { - let (upgrade, _remote_addr) = match listener.next().await.unwrap().unwrap().into_upgrade() { + let mut listener = listener_transport.listen_on(listen_addr1).unwrap(); + + let server = async move { + loop { + let (upgrade, _remote_addr) = + match listener.next().await.unwrap().unwrap().into_upgrade() { Some(u) => u, None => continue }; - - let (peer, _mplex) = upgrade.await.unwrap(); - assert_eq!(peer, dialer_id); - } + let (peer, _mplex) = upgrade.await.unwrap(); + assert_eq!(peer, dialer_id); } - }); + }; - async_std::task::block_on(async move { - let (peer, _mplex) = dialer_transport.dial(listen_addr).unwrap().await.unwrap(); + let client = async move { + let (peer, _mplex) = dialer_transport.dial(listen_addr2).unwrap().await.unwrap(); assert_eq!(peer, listener_id); - }); + }; + + async_std::task::spawn(server); + async_std::task::block_on(client); } diff --git a/misc/multistream-select/src/length_delimited.rs b/misc/multistream-select/src/length_delimited.rs index 5d22fb10..91e3fe88 100644 --- a/misc/multistream-select/src/length_delimited.rs +++ b/misc/multistream-select/src/length_delimited.rs @@ -323,27 +323,11 @@ where R: AsyncWrite { fn write(&mut self, buf: &[u8]) -> io::Result { - // Try to drain the write buffer together with writing `buf`. - if !self.inner.write_buffer.is_empty() { - let n = self.inner.write_buffer.len(); - self.inner.write_buffer.extend_from_slice(buf); - let result = self.inner.poll_write_buffer(); - let written = n - self.inner.write_buffer.len(); - if written == 0 { - if let Err(e) = result { - return Err(e) - } + while !self.inner.write_buffer.is_empty() { + if self.inner.poll_write_buffer()?.is_not_ready() { return Err(io::ErrorKind::WouldBlock.into()) } - if written < buf.len() { - if self.inner.write_buffer.len() > n { - self.inner.write_buffer.split_off(n); // Never grow the buffer. - } - return Ok(written) - } - return Ok(buf.len()) } - self.inner_mut().write(buf) } diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index dc4192fe..6f4fd95d 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -45,15 +45,15 @@ #![cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))] use async_std::os::unix::net::{UnixListener, UnixStream}; -use futures::{prelude::*, future::Ready}; -use futures::stream::Stream; +use futures::{prelude::*, future::{BoxFuture, Ready}}; +use futures::stream::BoxStream; use libp2p_core::{ Transport, multiaddr::{Protocol, Multiaddr}, transport::{ListenerEvent, TransportError} }; use log::debug; -use std::{io, path::PathBuf, pin::Pin}; +use std::{io, path::PathBuf}; /// Represents the configuration for a Unix domain sockets transport capability for libp2p. /// @@ -65,7 +65,6 @@ pub struct UdsConfig { impl UdsConfig { /// Creates a new configuration object for Unix domain sockets. - #[inline] pub fn new() -> UdsConfig { UdsConfig {} } @@ -74,13 +73,13 @@ impl UdsConfig { impl Transport for UdsConfig { type Output = UnixStream; type Error = io::Error; - type Listener = Pin, Self::Error>> + Send>>; - type ListenerUpgrade = Ready>; - type Dial = Pin> + Send>>; + type Listener = BoxStream<'static, Result, Self::Error>>; + type ListenerUpgrade = Ready>; + type Dial = BoxFuture<'static, Result>; fn listen_on(self, addr: Multiaddr) -> Result> { if let Ok(path) = multiaddr_to_path(&addr) { - Ok(Box::pin(async move { UnixListener::bind(&path).await } + Ok(async move { UnixListener::bind(&path).await } .map_ok(move |listener| { stream::once({ let addr = addr.clone(); @@ -105,7 +104,8 @@ impl Transport for UdsConfig { } })) }) - .try_flatten_stream())) + .try_flatten_stream() + .boxed()) } else { Err(TransportError::MultiaddrNotSupported(addr)) } @@ -114,7 +114,7 @@ impl Transport for UdsConfig { fn dial(self, addr: Multiaddr) -> Result> { if let Ok(path) = multiaddr_to_path(&addr) { debug!("Dialing {}", addr); - Ok(Box::pin(async move { UnixStream::connect(&path).await })) + Ok(async move { UnixStream::connect(&path).await }.boxed()) } else { Err(TransportError::MultiaddrNotSupported(addr)) } @@ -149,12 +149,9 @@ fn multiaddr_to_path(addr: &Multiaddr) -> Result { #[cfg(test)] mod tests { use super::{multiaddr_to_path, UdsConfig}; - use futures::prelude::*; + use futures::{channel::oneshot, prelude::*}; use std::{self, borrow::Cow, path::Path}; - use libp2p_core::{ - Transport, - multiaddr::{Protocol, Multiaddr} - }; + use libp2p_core::{Transport, multiaddr::{Protocol, Multiaddr}}; use tempfile; #[test] @@ -179,26 +176,36 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let socket = temp_dir.path().join("socket"); let addr = Multiaddr::from(Protocol::Unix(Cow::Owned(socket.to_string_lossy().into_owned()))); - let addr2 = addr.clone(); - async_std::task::spawn( - UdsConfig::new().listen_on(addr2).unwrap() - .try_filter_map(|ev| future::ok(ev.into_upgrade())) - .try_for_each(|(sock, _)| { - async { - let mut sock = sock.await.unwrap(); - let mut buf = [0u8; 3]; - sock.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [1, 2, 3]); - Ok(()) - } - }) - ); + let (tx, rx) = oneshot::channel(); - futures::executor::block_on(async { + async_std::task::spawn(async move { + let mut listener = UdsConfig::new().listen_on(addr).unwrap(); + + let listen_addr = listener.try_next().await.unwrap() + .expect("some event") + .into_new_address() + .expect("listen address"); + + tx.send(listen_addr).unwrap(); + + let (sock, _addr) = listener.try_filter_map(|e| future::ok(e.into_upgrade())) + .try_next() + .await + .unwrap() + .expect("some event"); + + let mut sock = sock.await.unwrap(); + let mut buf = [0u8; 3]; + sock.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + }); + + async_std::task::block_on(async move { let uds = UdsConfig::new(); - let mut socket = uds.dial(addr.clone()).unwrap().await.unwrap(); - socket.write(&[0x1, 0x2, 0x3]).await.unwrap(); + let addr = rx.await.unwrap(); + let mut socket = uds.dial(addr).unwrap().await.unwrap(); + socket.write(&[1, 2, 3]).await.unwrap(); }); }