muxers: Add test harness for StreamMuxer implementations (#2952)

This commit is contained in:
Thomas Eizinger
2022-10-17 11:23:13 +11:00
committed by GitHub
parent 981d586bfe
commit 4d4833f8c2
9 changed files with 426 additions and 305 deletions

View File

@ -152,6 +152,7 @@ members = [
"misc/quickcheck-ext",
"muxers/mplex",
"muxers/yamux",
"muxers/test-harness",
"protocols/dcutr",
"protocols/autonat",
"protocols/floodsub",

View File

@ -23,11 +23,12 @@ smallvec = "1.6.1"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
[dev-dependencies]
async-std = "1.7.0"
async-std = { version = "1.7.0", features = ["attributes"] }
criterion = "0.4"
env_logger = "0.9"
futures = "0.3"
libp2p = { path = "../..", features = ["full"] }
libp2p-muxer-test-harness = { path = "../test-harness" }
quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" }
[[bench]]

View File

@ -1,91 +0,0 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future::poll_fn;
use futures::{channel::oneshot, prelude::*};
use libp2p::core::muxing::StreamMuxerExt;
use libp2p::core::{upgrade, Transport};
use libp2p::tcp::TcpTransport;
#[test]
fn async_write() {
// Tests that `AsyncWrite::close` implies flush.
let (tx, rx) = oneshot::channel();
let bg_thread = async_std::task::spawn(async move {
let mplex = libp2p::mplex::MplexConfig::new();
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1))
.boxed();
transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let addr = transport
.next()
.await
.expect("some event")
.into_new_address()
.expect("listen address");
tx.send(addr).unwrap();
let mut client = transport
.next()
.await
.expect("some event")
.into_incoming()
.unwrap()
.0
.await
.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");
let mut buf = Vec::new();
outbound.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
});
async_std::task::block_on(async {
let mplex = libp2p::mplex::MplexConfig::new();
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1));
let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
// Just calling `poll_inbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut inbound = poll_fn(|cx| client.poll_inbound_unpin(cx))
.await
.expect("unexpected error");
inbound.write_all(b"hello world").await.unwrap();
// The test consists in making sure that this flushes the substream.
inbound.close().await.unwrap();
bg_thread.await;
});
}

View File

@ -0,0 +1,28 @@
use libp2p_mplex::MplexConfig;
#[async_std::test]
async fn close_implies_flush() {
let (alice, bob) =
libp2p_muxer_test_harness::connected_muxers_on_memory_transport::<MplexConfig, _, _>()
.await;
libp2p_muxer_test_harness::close_implies_flush(alice, bob).await;
}
#[async_std::test]
async fn dialer_can_receive() {
let (alice, bob) =
libp2p_muxer_test_harness::connected_muxers_on_memory_transport::<MplexConfig, _, _>()
.await;
libp2p_muxer_test_harness::dialer_can_receive(alice, bob).await;
}
#[async_std::test]
async fn read_after_close() {
let (alice, bob) =
libp2p_muxer_test_harness::connected_muxers_on_memory_transport::<MplexConfig, _, _>()
.await;
libp2p_muxer_test_harness::read_after_close(alice, bob).await;
}

View File

@ -1,213 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future::poll_fn;
use futures::{channel::oneshot, prelude::*};
use libp2p::core::muxing::StreamMuxerExt;
use libp2p::core::{upgrade, Transport};
use libp2p::tcp::TcpTransport;
#[test]
fn client_to_server_outbound() {
// Simulate a client sending a message to a server through a multiplex upgrade.
let (tx, rx) = oneshot::channel();
let bg_thread = async_std::task::spawn(async move {
let mplex = libp2p_mplex::MplexConfig::new();
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1))
.boxed();
transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let addr = transport
.next()
.await
.expect("some event")
.into_new_address()
.expect("listen address");
tx.send(addr).unwrap();
let mut client = transport
.next()
.await
.expect("some event")
.into_incoming()
.unwrap()
.0
.await
.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");
let mut buf = Vec::new();
outbound.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
});
async_std::task::block_on(async {
let mplex = libp2p_mplex::MplexConfig::new();
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1))
.boxed();
let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
// Just calling `poll_inbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut inbound = poll_fn(|cx| client.poll_inbound_unpin(cx))
.await
.expect("unexpected error");
inbound.write_all(b"hello world").await.unwrap();
inbound.close().await.unwrap();
bg_thread.await;
});
}
#[test]
fn client_to_server_inbound() {
// Simulate a client sending a message to a server through a multiplex upgrade.
let (tx, rx) = oneshot::channel();
let bg_thread = async_std::task::spawn(async move {
let mplex = libp2p_mplex::MplexConfig::new();
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1))
.boxed();
transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let addr = transport
.next()
.await
.expect("some event")
.into_new_address()
.expect("listen address");
tx.send(addr).unwrap();
let mut client = transport
.next()
.await
.expect("some event")
.into_incoming()
.unwrap()
.0
.await
.unwrap();
// Just calling `poll_inbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut inbound = poll_fn(|cx| client.poll_inbound_unpin(cx))
.await
.expect("unexpected error");
let mut buf = Vec::new();
inbound.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
});
async_std::task::block_on(async {
let mplex = libp2p_mplex::MplexConfig::new();
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1))
.boxed();
let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");
outbound.write_all(b"hello world").await.unwrap();
outbound.close().await.unwrap();
bg_thread.await;
});
}
#[test]
fn protocol_not_match() {
let (tx, rx) = oneshot::channel();
let _bg_thread = async_std::task::spawn(async move {
let mplex = libp2p_mplex::MplexConfig::new();
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1))
.boxed();
transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let addr = transport
.next()
.await
.expect("some event")
.into_new_address()
.expect("listen address");
tx.send(addr).unwrap();
let mut client = transport
.next()
.await
.expect("some event")
.into_incoming()
.unwrap()
.0
.await
.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");
let mut buf = Vec::new();
outbound.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
});
async_std::task::block_on(async {
// Make sure they do not connect when protocols do not match
let mut mplex = libp2p_mplex::MplexConfig::new();
mplex.set_protocol_name(b"/mplextest/1.0.0");
let mut transport = TcpTransport::default()
.and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1))
.boxed();
assert!(
transport.dial(rx.await.unwrap()).unwrap().await.is_err(),
"Dialing should fail here as protocols do not match"
);
});
}

View File

@ -0,0 +1,14 @@
[package]
name = "libp2p-muxer-test-harness"
version = "0.1.0"
edition = "2021"
publish = false
license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libp2p-core = { path = "../../core" }
futures = "0.3.24"
log = "0.4"
futures-timer = "3.0.2"

View File

@ -0,0 +1,348 @@
use crate::future::{BoxFuture, Either, FutureExt};
use futures::{future, AsyncRead, AsyncWrite};
use futures::{AsyncReadExt, Stream};
use futures::{AsyncWriteExt, StreamExt};
use libp2p_core::multiaddr::Protocol;
use libp2p_core::muxing::StreamMuxerExt;
use libp2p_core::transport::memory::Channel;
use libp2p_core::transport::MemoryTransport;
use libp2p_core::{
upgrade, InboundUpgrade, Negotiated, OutboundUpgrade, StreamMuxer, Transport, UpgradeInfo,
};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, mem};
pub async fn connected_muxers_on_memory_transport<MC, M, E>() -> (M, M)
where
MC: InboundUpgrade<Negotiated<Channel<Vec<u8>>>, Error = E, Output = M>
+ OutboundUpgrade<Negotiated<Channel<Vec<u8>>>, Error = E, Output = M>
+ Send
+ 'static
+ Default,
<MC as UpgradeInfo>::Info: Send,
<<MC as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<MC as InboundUpgrade<Negotiated<Channel<Vec<u8>>>>>::Future: Send,
<MC as OutboundUpgrade<Negotiated<Channel<Vec<u8>>>>>::Future: Send,
E: std::error::Error + Send + Sync + 'static,
{
let mut alice = MemoryTransport::default()
.and_then(move |c, e| upgrade::apply(c, MC::default(), e, upgrade::Version::V1))
.boxed();
let mut bob = MemoryTransport::default()
.and_then(move |c, e| upgrade::apply(c, MC::default(), e, upgrade::Version::V1))
.boxed();
alice.listen_on(Protocol::Memory(0).into()).unwrap();
let listen_address = alice.next().await.unwrap().into_new_address().unwrap();
futures::future::join(
async {
alice
.next()
.await
.unwrap()
.into_incoming()
.unwrap()
.0
.await
.unwrap()
},
async { bob.dial(listen_address).unwrap().await.unwrap() },
)
.await
}
/// Verifies that Alice can send a message and immediately close the stream afterwards and Bob can use `read_to_end` to read the entire message.
pub async fn close_implies_flush<A, B, S, E>(alice: A, bob: B)
where
A: StreamMuxer<Substream = S, Error = E> + Unpin,
B: StreamMuxer<Substream = S, Error = E> + Unpin,
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
E: fmt::Debug,
{
run_commutative(
alice,
bob,
|mut stream| async move {
stream.write_all(b"PING").await.unwrap();
stream.close().await.unwrap();
},
|mut stream| async move {
let mut buf = Vec::new();
stream.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"PING");
},
)
.await;
}
/// Verifies that the dialer of a substream can receive a message.
pub async fn dialer_can_receive<A, B, S, E>(alice: A, bob: B)
where
A: StreamMuxer<Substream = S, Error = E> + Unpin,
B: StreamMuxer<Substream = S, Error = E> + Unpin,
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
E: fmt::Debug,
{
run_commutative(
alice,
bob,
|mut stream| async move {
let mut buf = Vec::new();
stream.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"PING");
},
|mut stream| async move {
stream.write_all(b"PING").await.unwrap();
stream.close().await.unwrap();
},
)
.await;
}
/// Verifies that we can "half-close" a substream.
pub async fn read_after_close<A, B, S, E>(alice: A, bob: B)
where
A: StreamMuxer<Substream = S, Error = E> + Unpin,
B: StreamMuxer<Substream = S, Error = E> + Unpin,
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
E: fmt::Debug,
{
run_commutative(
alice,
bob,
|mut stream| async move {
stream.write_all(b"PING").await.unwrap();
stream.close().await.unwrap();
let mut buf = Vec::new();
stream.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"PONG");
},
|mut stream| async move {
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"PING");
stream.write_all(b"PONG").await.unwrap();
stream.close().await.unwrap();
},
)
.await;
}
/// Runs the given protocol between the two parties, ensuring commutativity, i.e. either party can be the dialer and listener.
async fn run_commutative<A, B, S, E, F1, F2>(
mut alice: A,
mut bob: B,
alice_proto: impl Fn(S) -> F1 + Clone + 'static,
bob_proto: impl Fn(S) -> F2 + Clone + 'static,
) where
A: StreamMuxer<Substream = S, Error = E> + Unpin,
B: StreamMuxer<Substream = S, Error = E> + Unpin,
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
E: fmt::Debug,
F1: Future<Output = ()> + Send + 'static,
F2: Future<Output = ()> + Send + 'static,
{
run(&mut alice, &mut bob, alice_proto.clone(), bob_proto.clone()).await;
run(&mut bob, &mut alice, alice_proto, bob_proto).await;
}
/// Runs a given protocol between the two parties.
///
/// The first party will open a new substream and the second party will wait for this.
/// The [`StreamMuxer`] is polled until both parties have completed the protocol to ensure that the underlying connection can make progress at all times.
async fn run<A, B, S, E, F1, F2>(
dialer: &mut A,
listener: &mut B,
alice_proto: impl Fn(S) -> F1 + 'static,
bob_proto: impl Fn(S) -> F2 + 'static,
) where
A: StreamMuxer<Substream = S, Error = E> + Unpin,
B: StreamMuxer<Substream = S, Error = E> + Unpin,
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
E: fmt::Debug,
F1: Future<Output = ()> + Send + 'static,
F2: Future<Output = ()> + Send + 'static,
{
let mut dialer = Harness::OutboundSetup {
muxer: dialer,
proto_fn: Box::new(move |s| alice_proto(s).boxed()),
};
let mut listener = Harness::InboundSetup {
muxer: listener,
proto_fn: Box::new(move |s| bob_proto(s).boxed()),
};
let mut dialer_complete = false;
let mut listener_complete = false;
loop {
match futures::future::select(dialer.next(), listener.next()).await {
Either::Left((Some(Event::SetupComplete), _)) => {
log::info!("Dialer opened outbound stream");
}
Either::Left((Some(Event::ProtocolComplete), _)) => {
log::info!("Dialer completed protocol");
dialer_complete = true
}
Either::Left((Some(Event::Timeout), _)) => {
panic!("Dialer protocol timed out");
}
Either::Right((Some(Event::SetupComplete), _)) => {
log::info!("Listener received inbound stream");
}
Either::Right((Some(Event::ProtocolComplete), _)) => {
log::info!("Listener completed protocol");
listener_complete = true
}
Either::Right((Some(Event::Timeout), _)) => {
panic!("Listener protocol timed out");
}
_ => unreachable!(),
}
if dialer_complete && listener_complete {
break;
}
}
}
enum Harness<'m, M>
where
M: StreamMuxer,
{
InboundSetup {
muxer: &'m mut M,
proto_fn: Box<dyn FnOnce(M::Substream) -> BoxFuture<'static, ()>>,
},
OutboundSetup {
muxer: &'m mut M,
proto_fn: Box<dyn FnOnce(M::Substream) -> BoxFuture<'static, ()>>,
},
Running {
muxer: &'m mut M,
timeout: futures_timer::Delay,
proto: BoxFuture<'static, ()>,
},
Complete {
muxer: &'m mut M,
},
Poisoned,
}
enum Event {
SetupComplete,
Timeout,
ProtocolComplete,
}
impl<'m, M> Stream for Harness<'m, M>
where
M: StreamMuxer + Unpin,
{
type Item = Event;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
match mem::replace(this, Self::Poisoned) {
Harness::InboundSetup { muxer, proto_fn } => {
if let Poll::Ready(stream) = muxer.poll_inbound_unpin(cx) {
*this = Harness::Running {
muxer,
timeout: futures_timer::Delay::new(Duration::from_secs(10)),
proto: proto_fn(stream.unwrap()),
};
return Poll::Ready(Some(Event::SetupComplete));
}
if let Poll::Ready(event) = muxer.poll_unpin(cx) {
event.unwrap();
*this = Harness::InboundSetup { muxer, proto_fn };
continue;
}
*this = Harness::InboundSetup { muxer, proto_fn };
return Poll::Pending;
}
Harness::OutboundSetup { muxer, proto_fn } => {
if let Poll::Ready(stream) = muxer.poll_outbound_unpin(cx) {
*this = Harness::Running {
muxer,
timeout: futures_timer::Delay::new(Duration::from_secs(10)),
proto: proto_fn(stream.unwrap()),
};
return Poll::Ready(Some(Event::SetupComplete));
}
if let Poll::Ready(event) = muxer.poll_unpin(cx) {
event.unwrap();
*this = Harness::OutboundSetup { muxer, proto_fn };
continue;
}
*this = Harness::OutboundSetup { muxer, proto_fn };
return Poll::Pending;
}
Harness::Running {
muxer,
mut proto,
mut timeout,
} => {
if let Poll::Ready(event) = muxer.poll_unpin(cx) {
event.unwrap();
*this = Harness::Running {
muxer,
proto,
timeout,
};
continue;
}
if let Poll::Ready(()) = proto.poll_unpin(cx) {
*this = Harness::Complete { muxer };
return Poll::Ready(Some(Event::ProtocolComplete));
}
if let Poll::Ready(()) = timeout.poll_unpin(cx) {
return Poll::Ready(Some(Event::Timeout));
}
*this = Harness::Running {
muxer,
proto,
timeout,
};
return Poll::Pending;
}
Harness::Complete { muxer } => {
if let Poll::Ready(event) = muxer.poll_unpin(cx) {
event.unwrap();
*this = Harness::Complete { muxer };
continue;
}
*this = Harness::Complete { muxer };
return Poll::Pending;
}
Harness::Poisoned => {
unreachable!()
}
}
}
}
}

View File

@ -17,3 +17,7 @@ parking_lot = "0.12"
thiserror = "1.0"
yamux = "0.10.0"
log = "0.4"
[dev-dependencies]
async-std = { version = "1.7.0", features = ["attributes"] }
libp2p-muxer-test-harness = { path = "../test-harness" }

View File

@ -0,0 +1,29 @@
use libp2p_yamux::YamuxConfig;
#[async_std::test]
async fn close_implies_flush() {
let (alice, bob) =
libp2p_muxer_test_harness::connected_muxers_on_memory_transport::<YamuxConfig, _, _>()
.await;
libp2p_muxer_test_harness::close_implies_flush(alice, bob).await;
}
#[async_std::test]
#[ignore] // Hangs forever, is this a harness bug? It passes if we try to write to the stream.
async fn dialer_can_receive() {
let (alice, bob) =
libp2p_muxer_test_harness::connected_muxers_on_memory_transport::<YamuxConfig, _, _>()
.await;
libp2p_muxer_test_harness::dialer_can_receive(alice, bob).await;
}
#[async_std::test]
async fn read_after_close() {
let (alice, bob) =
libp2p_muxer_test_harness::connected_muxers_on_memory_transport::<YamuxConfig, _, _>()
.await;
libp2p_muxer_test_harness::read_after_close(alice, bob).await;
}