Fix tests.

This commit is contained in:
Toralf Wittner
2019-12-06 11:03:19 +01:00
parent 9cefb52b1f
commit 173fc04b30
5 changed files with 156 additions and 163 deletions

View File

@ -354,11 +354,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::transport::{self, ListenerEvent}; use crate::transport;
use assert_matches::assert_matches;
use std::{io, iter::FromIterator};
use futures::{future::{self}, stream};
use crate::PeerId;
#[test] #[test]
fn incoming_event() { fn incoming_event() {
@ -383,7 +379,7 @@ mod tests {
}); });
match listeners.next().await.unwrap() { match listeners.next().await.unwrap() {
ListenersEvent::Incoming { local_addr, upgrade, send_back_addr, .. } => { ListenersEvent::Incoming { local_addr, send_back_addr, .. } => {
assert_eq!(local_addr, address); assert_eq!(local_addr, address);
assert_eq!(send_back_addr, address); assert_eq!(send_back_addr, address);
}, },

View File

@ -31,10 +31,9 @@ use libp2p_swarm::{
ProtocolsHandlerEvent, ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr, ProtocolsHandlerUpgrErr,
}; };
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; use std::{io, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay; use wasm_timer::Delay;
// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ?
struct TestHandler<TSubstream>(std::marker::PhantomData<TSubstream>); struct TestHandler<TSubstream>(std::marker::PhantomData<TSubstream>);
impl<TSubstream> Default for TestHandler<TSubstream> { impl<TSubstream> Default for TestHandler<TSubstream> {
@ -114,8 +113,6 @@ fn raw_swarm_simultaneous_connect() {
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()) .multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| { .and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
}); });
Network::new(transport, local_public_key.into_peer_id()) Network::new(transport, local_public_key.into_peer_id())
@ -129,8 +126,6 @@ fn raw_swarm_simultaneous_connect() {
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()) .multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| { .and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
}); });
Network::new(transport, local_public_key.into_peer_id()) Network::new(transport, local_public_key.into_peer_id())
@ -139,29 +134,38 @@ fn raw_swarm_simultaneous_connect() {
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
swarm2.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm2.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let (swarm1_listen_addr, swarm2_listen_addr, mut swarm1, mut swarm2) = futures::executor::block_on( let swarm1_listen_addr = future::poll_fn(|cx| {
future::lazy(move |cx| {
let swarm1_listen_addr =
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) { if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
listen_addr Poll::Ready(listen_addr)
} else { } else {
panic!("Was expecting the listen address to be reported") panic!("Was expecting the listen address to be reported")
}; }
let swarm2_listen_addr =
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) {
listen_addr
} else {
panic!("Was expecting the listen address to be reported")
};
Ok::<_, void::Void>((swarm1_listen_addr, swarm2_listen_addr, swarm1, swarm2))
}) })
).unwrap(); .now_or_never()
.expect("listen address of swarm1");
let swarm2_listen_addr = future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
}
})
.now_or_never()
.expect("listen address of swarm2");
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Step {
Start,
Dialing,
Connected,
Replaced,
Errored
}
loop { loop {
let mut swarm1_step = 0; let mut swarm1_step = Step::Start;
let mut swarm2_step = 0; let mut swarm2_step = Step::Start;
let mut swarm1_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000)); let mut swarm1_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm2_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000)); let mut swarm2_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));
@ -174,31 +178,29 @@ fn raw_swarm_simultaneous_connect() {
// We add a lot of randomness. In a real-life situation the swarm also has to // We add a lot of randomness. In a real-life situation the swarm also has to
// handle other nodes, which may delay the processing. // handle other nodes, which may delay the processing.
if swarm1_step == 0 { if swarm1_step == Step::Start {
match Future::poll(Pin::new(&mut swarm1_dial_start), cx) { if swarm1_dial_start.poll_unpin(cx).is_ready() {
Poll::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder(); let handler = TestHandler::default().into_node_handler_builder();
swarm1.peer(swarm2.local_peer_id().clone()) swarm1.peer(swarm2.local_peer_id().clone())
.into_not_connected() .into_not_connected()
.unwrap() .unwrap()
.connect(swarm2_listen_addr.clone(), handler); .connect(swarm2_listen_addr.clone(), handler);
swarm1_step = 1; swarm1_step = Step::Dialing;
}, } else {
Poll::Pending => swarm1_not_ready = true, swarm1_not_ready = true
} }
} }
if swarm2_step == 0 { if swarm2_step == Step::Start {
match Future::poll(Pin::new(&mut swarm2_dial_start), cx) { if swarm2_dial_start.poll_unpin(cx).is_ready() {
Poll::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder(); let handler = TestHandler::default().into_node_handler_builder();
swarm2.peer(swarm1.local_peer_id().clone()) swarm2.peer(swarm1.local_peer_id().clone())
.into_not_connected() .into_not_connected()
.unwrap() .unwrap()
.connect(swarm1_listen_addr.clone(), handler); .connect(swarm1_listen_addr.clone(), handler);
swarm2_step = 1; swarm2_step = Step::Dialing;
}, } else {
Poll::Pending => swarm2_not_ready = true, swarm2_not_ready = true
} }
} }
@ -207,29 +209,29 @@ fn raw_swarm_simultaneous_connect() {
Poll::Ready(NetworkEvent::IncomingConnectionError { Poll::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, .. error: IncomingError::DeniedLowerPriority, ..
}) => { }) => {
assert_eq!(swarm1_step, 2); assert_eq!(swarm1_step, Step::Connected);
swarm1_step = 3; swarm1_step = Step::Errored
}, }
Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => { Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm2.local_peer_id()); assert_eq!(conn_info, *swarm2.local_peer_id());
if swarm1_step == 0 { if swarm1_step == Step::Start {
// The connection was established before // The connection was established before
// swarm1 started dialing; discard the test run. // swarm1 started dialing; discard the test run.
return Poll::Ready(false) return Poll::Ready(false)
} }
assert_eq!(swarm1_step, 1); assert_eq!(swarm1_step, Step::Dialing);
swarm1_step = 2; swarm1_step = Step::Connected
}, }
Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => { Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => {
assert_eq!(new_info, *swarm2.local_peer_id()); assert_eq!(new_info, *swarm2.local_peer_id());
assert_eq!(swarm1_step, 2); assert_eq!(swarm1_step, Step::Connected);
swarm1_step = 3; swarm1_step = Step::Replaced
}, }
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { Poll::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder()); inc.accept(TestHandler::default().into_node_handler_builder())
}, }
Poll::Ready(ev) => panic!("swarm1: unexpected event: {:?}", ev), Poll::Ready(ev) => panic!("swarm1: unexpected event: {:?}", ev),
Poll::Pending => swarm1_not_ready = true, Poll::Pending => swarm1_not_ready = true
} }
} }
@ -238,39 +240,42 @@ fn raw_swarm_simultaneous_connect() {
Poll::Ready(NetworkEvent::IncomingConnectionError { Poll::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, .. error: IncomingError::DeniedLowerPriority, ..
}) => { }) => {
assert_eq!(swarm2_step, 2); assert_eq!(swarm2_step, Step::Connected);
swarm2_step = 3; swarm2_step = Step::Errored
}, }
Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => { Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm1.local_peer_id()); assert_eq!(conn_info, *swarm1.local_peer_id());
if swarm2_step == 0 { if swarm2_step == Step::Start {
// The connection was established before // The connection was established before
// swarm2 started dialing; discard the test run. // swarm2 started dialing; discard the test run.
return Poll::Ready(false) return Poll::Ready(false)
} }
assert_eq!(swarm2_step, 1); assert_eq!(swarm2_step, Step::Dialing);
swarm2_step = 2; swarm2_step = Step::Connected
}, }
Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => { Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => {
assert_eq!(new_info, *swarm1.local_peer_id()); assert_eq!(new_info, *swarm1.local_peer_id());
assert_eq!(swarm2_step, 2); assert_eq!(swarm2_step, Step::Connected);
swarm2_step = 3; swarm2_step = Step::Replaced
}, }
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { Poll::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder()); inc.accept(TestHandler::default().into_node_handler_builder())
}, }
Poll::Ready(ev) => panic!("swarm2: unexpected event: {:?}", ev), Poll::Ready(ev) => panic!("swarm2: unexpected event: {:?}", ev),
Poll::Pending => swarm2_not_ready = true, Poll::Pending => swarm2_not_ready = true
} }
} }
// TODO: make sure that >= 5 is correct match (swarm1_step, swarm2_step) {
if swarm1_step + swarm2_step >= 5 { | (Step::Connected, Step::Replaced)
return Poll::Ready(true); | (Step::Connected, Step::Errored)
| (Step::Replaced, Step::Connected)
| (Step::Errored, Step::Connected) => return Poll::Ready(true),
_else => ()
} }
if swarm1_not_ready && swarm2_not_ready { if swarm1_not_ready && swarm2_not_ready {
return Poll::Pending; return Poll::Pending
} }
} }
}); });
@ -278,7 +283,8 @@ fn raw_swarm_simultaneous_connect() {
if futures::executor::block_on(future) { if futures::executor::block_on(future) {
// The test exercised what we wanted to exercise: a simultaneous connect. // The test exercised what we wanted to exercise: a simultaneous connect.
break break
} else { }
// The test did not trigger a simultaneous connect; ensure the nodes // The test did not trigger a simultaneous connect; ensure the nodes
// are disconnected and re-run the test. // are disconnected and re-run the test.
match swarm1.peer(swarm2.local_peer_id().clone()) { match swarm1.peer(swarm2.local_peer_id().clone()) {
@ -294,5 +300,4 @@ fn raw_swarm_simultaneous_connect() {
} }
} }
} }
}

View File

@ -26,7 +26,7 @@ use libp2p_core::transport::{Transport, MemoryTransport};
use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade}; use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
use libp2p_mplex::MplexConfig; use libp2p_mplex::MplexConfig;
use libp2p_secio::SecioConfig; use libp2p_secio::SecioConfig;
use multiaddr::Multiaddr; use multiaddr::{Multiaddr, Protocol};
use rand::random; use rand::random;
use std::{io, pin::Pin}; use std::{io, pin::Pin};
@ -109,28 +109,29 @@ fn upgrade_pipeline() {
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
}); });
let listen_addr: Multiaddr = format!("/memory/{}", random::<u64>()).parse().unwrap(); let listen_addr1 = Multiaddr::from(Protocol::Memory(random::<u64>()));
let listen_addr2 = listen_addr1.clone();
async_std::task::spawn({ let mut listener = listener_transport.listen_on(listen_addr1).unwrap();
let listen_addr = listen_addr.clone();
let dialer_id = dialer_id.clone(); let server = async move {
async move {
let mut listener = listener_transport.listen_on(listen_addr).unwrap();
loop { loop {
let (upgrade, _remote_addr) = match listener.next().await.unwrap().unwrap().into_upgrade() { let (upgrade, _remote_addr) =
match listener.next().await.unwrap().unwrap().into_upgrade() {
Some(u) => u, Some(u) => u,
None => continue None => continue
}; };
let (peer, _mplex) = upgrade.await.unwrap(); let (peer, _mplex) = upgrade.await.unwrap();
assert_eq!(peer, dialer_id); assert_eq!(peer, dialer_id);
} }
} };
});
async_std::task::block_on(async move { let client = async move {
let (peer, _mplex) = dialer_transport.dial(listen_addr).unwrap().await.unwrap(); let (peer, _mplex) = dialer_transport.dial(listen_addr2).unwrap().await.unwrap();
assert_eq!(peer, listener_id); assert_eq!(peer, listener_id);
}); };
async_std::task::spawn(server);
async_std::task::block_on(client);
} }

View File

@ -323,27 +323,11 @@ where
R: AsyncWrite R: AsyncWrite
{ {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// Try to drain the write buffer together with writing `buf`. while !self.inner.write_buffer.is_empty() {
if !self.inner.write_buffer.is_empty() { if self.inner.poll_write_buffer()?.is_not_ready() {
let n = self.inner.write_buffer.len();
self.inner.write_buffer.extend_from_slice(buf);
let result = self.inner.poll_write_buffer();
let written = n - self.inner.write_buffer.len();
if written == 0 {
if let Err(e) = result {
return Err(e)
}
return Err(io::ErrorKind::WouldBlock.into()) return Err(io::ErrorKind::WouldBlock.into())
} }
if written < buf.len() {
if self.inner.write_buffer.len() > n {
self.inner.write_buffer.split_off(n); // Never grow the buffer.
} }
return Ok(written)
}
return Ok(buf.len())
}
self.inner_mut().write(buf) self.inner_mut().write(buf)
} }

View File

@ -45,15 +45,15 @@
#![cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))] #![cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))]
use async_std::os::unix::net::{UnixListener, UnixStream}; use async_std::os::unix::net::{UnixListener, UnixStream};
use futures::{prelude::*, future::Ready}; use futures::{prelude::*, future::{BoxFuture, Ready}};
use futures::stream::Stream; use futures::stream::BoxStream;
use libp2p_core::{ use libp2p_core::{
Transport, Transport,
multiaddr::{Protocol, Multiaddr}, multiaddr::{Protocol, Multiaddr},
transport::{ListenerEvent, TransportError} transport::{ListenerEvent, TransportError}
}; };
use log::debug; use log::debug;
use std::{io, path::PathBuf, pin::Pin}; use std::{io, path::PathBuf};
/// Represents the configuration for a Unix domain sockets transport capability for libp2p. /// Represents the configuration for a Unix domain sockets transport capability for libp2p.
/// ///
@ -65,7 +65,6 @@ pub struct UdsConfig {
impl UdsConfig { impl UdsConfig {
/// Creates a new configuration object for Unix domain sockets. /// Creates a new configuration object for Unix domain sockets.
#[inline]
pub fn new() -> UdsConfig { pub fn new() -> UdsConfig {
UdsConfig {} UdsConfig {}
} }
@ -74,13 +73,13 @@ impl UdsConfig {
impl Transport for UdsConfig { impl Transport for UdsConfig {
type Output = UnixStream; type Output = UnixStream;
type Error = io::Error; type Error = io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>> + Send>>; type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>>;
type ListenerUpgrade = Ready<Result<Self::Output, io::Error>>; type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>; type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> { fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
if let Ok(path) = multiaddr_to_path(&addr) { if let Ok(path) = multiaddr_to_path(&addr) {
Ok(Box::pin(async move { UnixListener::bind(&path).await } Ok(async move { UnixListener::bind(&path).await }
.map_ok(move |listener| { .map_ok(move |listener| {
stream::once({ stream::once({
let addr = addr.clone(); let addr = addr.clone();
@ -105,7 +104,8 @@ impl Transport for UdsConfig {
} }
})) }))
}) })
.try_flatten_stream())) .try_flatten_stream()
.boxed())
} else { } else {
Err(TransportError::MultiaddrNotSupported(addr)) Err(TransportError::MultiaddrNotSupported(addr))
} }
@ -114,7 +114,7 @@ impl Transport for UdsConfig {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
if let Ok(path) = multiaddr_to_path(&addr) { if let Ok(path) = multiaddr_to_path(&addr) {
debug!("Dialing {}", addr); debug!("Dialing {}", addr);
Ok(Box::pin(async move { UnixStream::connect(&path).await })) Ok(async move { UnixStream::connect(&path).await }.boxed())
} else { } else {
Err(TransportError::MultiaddrNotSupported(addr)) Err(TransportError::MultiaddrNotSupported(addr))
} }
@ -149,12 +149,9 @@ fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{multiaddr_to_path, UdsConfig}; use super::{multiaddr_to_path, UdsConfig};
use futures::prelude::*; use futures::{channel::oneshot, prelude::*};
use std::{self, borrow::Cow, path::Path}; use std::{self, borrow::Cow, path::Path};
use libp2p_core::{ use libp2p_core::{Transport, multiaddr::{Protocol, Multiaddr}};
Transport,
multiaddr::{Protocol, Multiaddr}
};
use tempfile; use tempfile;
#[test] #[test]
@ -179,26 +176,36 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let socket = temp_dir.path().join("socket"); let socket = temp_dir.path().join("socket");
let addr = Multiaddr::from(Protocol::Unix(Cow::Owned(socket.to_string_lossy().into_owned()))); let addr = Multiaddr::from(Protocol::Unix(Cow::Owned(socket.to_string_lossy().into_owned())));
let addr2 = addr.clone();
async_std::task::spawn( let (tx, rx) = oneshot::channel();
UdsConfig::new().listen_on(addr2).unwrap()
.try_filter_map(|ev| future::ok(ev.into_upgrade())) async_std::task::spawn(async move {
.try_for_each(|(sock, _)| { let mut listener = UdsConfig::new().listen_on(addr).unwrap();
async {
let listen_addr = listener.try_next().await.unwrap()
.expect("some event")
.into_new_address()
.expect("listen address");
tx.send(listen_addr).unwrap();
let (sock, _addr) = listener.try_filter_map(|e| future::ok(e.into_upgrade()))
.try_next()
.await
.unwrap()
.expect("some event");
let mut sock = sock.await.unwrap(); let mut sock = sock.await.unwrap();
let mut buf = [0u8; 3]; let mut buf = [0u8; 3];
sock.read_exact(&mut buf).await.unwrap(); sock.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]); assert_eq!(buf, [1, 2, 3]);
Ok(()) });
}
})
);
futures::executor::block_on(async { async_std::task::block_on(async move {
let uds = UdsConfig::new(); let uds = UdsConfig::new();
let mut socket = uds.dial(addr.clone()).unwrap().await.unwrap(); let addr = rx.await.unwrap();
socket.write(&[0x1, 0x2, 0x3]).await.unwrap(); let mut socket = uds.dial(addr).unwrap().await.unwrap();
socket.write(&[1, 2, 3]).await.unwrap();
}); });
} }