From 1071bfd324d4f79a82c763d87065da2503e19d5f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 26 Jan 2018 11:40:43 +0100 Subject: [PATCH] Some integration tests for multiplex-rs (#108) * Some integration tests for multiplex-rs * Correctly check that the background thread is ok --- multiplex-rs/Cargo.toml | 6 +- multiplex-rs/tests/two_peers.rs | 134 ++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 multiplex-rs/tests/two_peers.rs diff --git a/multiplex-rs/Cargo.toml b/multiplex-rs/Cargo.toml index 8cedadc6..13741fb8 100644 --- a/multiplex-rs/Cargo.toml +++ b/multiplex-rs/Cargo.toml @@ -15,4 +15,8 @@ rand = "0.3.17" libp2p-swarm = { path = "../libp2p-swarm" } varint = { path = "../varint-rs" } error-chain = "0.11.0" -futures-mutex = { path = "../futures-mutex" } \ No newline at end of file +futures-mutex = { path = "../futures-mutex" } + +[dev-dependencies] +libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +tokio-core = "0.1" diff --git a/multiplex-rs/tests/two_peers.rs b/multiplex-rs/tests/two_peers.rs new file mode 100644 index 00000000..7a8c42d4 --- /dev/null +++ b/multiplex-rs/tests/two_peers.rs @@ -0,0 +1,134 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate bytes; +extern crate futures; +extern crate libp2p_swarm as swarm; +extern crate libp2p_tcp_transport as tcp; +extern crate multiplex; +extern crate tokio_core; +extern crate tokio_io; + +use futures::future::Future; +use futures::{Stream, Sink}; +use std::sync::mpsc; +use std::thread; +use swarm::{Transport, StreamMuxer}; +use tcp::TcpConfig; +use tokio_core::reactor::Core; +use tokio_io::codec::length_delimited::Framed; + +#[test] +fn client_to_server_outbound() { + // Simulate a client sending a message to a server through a multiplex upgrade. + + let (tx, rx) = mpsc::channel(); + + let bg_thread = thread::spawn(move || { + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()) + .with_upgrade(multiplex::MultiplexConfig); + + let (listener, addr) = transport + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); + tx.send(addr).unwrap(); + + let future = listener + .into_future() + .map_err(|(err, _)| err) + .and_then(|(client, _)| client.unwrap().0) + .and_then(|client| client.outbound()) + .map(|client| Framed::<_, bytes::BytesMut>::new(client)) + .and_then(|client| { + client.into_future() + .map_err(|(err, _)| err) + .map(|(msg, _)| msg) + }) + .and_then(|msg| { + let msg = msg.unwrap(); + assert_eq!(msg, "hello world"); + Ok(()) + }); + + core.run(future).unwrap(); + }); + + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()) + .with_upgrade(multiplex::MultiplexConfig); + + let future = transport.dial(rx.recv().unwrap()).unwrap() + .and_then(|client| client.inbound()) + .map(|server| Framed::<_, bytes::BytesMut>::new(server)) + .and_then(|server| server.send("hello world".into())) + .map(|_| ()); + + core.run(future).unwrap(); + bg_thread.join().unwrap(); +} + +#[test] +fn client_to_server_inbound() { + // Simulate a client sending a message to a server through a multiplex upgrade. + + let (tx, rx) = mpsc::channel(); + + let bg_thread = thread::spawn(move || { + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()) + .with_upgrade(multiplex::MultiplexConfig); + + let (listener, addr) = transport + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); + tx.send(addr).unwrap(); + + let future = listener + .into_future() + .map_err(|(err, _)| err) + .and_then(|(client, _)| client.unwrap().0) + .and_then(|client| client.inbound()) + .map(|client| Framed::<_, bytes::BytesMut>::new(client)) + .and_then(|client| { + client.into_future() + .map_err(|(err, _)| err) + .map(|(msg, _)| msg) + }) + .and_then(|msg| { + let msg = msg.unwrap(); + assert_eq!(msg, "hello world"); + Ok(()) + }); + + core.run(future).unwrap(); + }); + + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()) + .with_upgrade(multiplex::MultiplexConfig); + + let future = transport.dial(rx.recv().unwrap()).unwrap() + .and_then(|client| client.outbound()) + .map(|server| Framed::<_, bytes::BytesMut>::new(server)) + .and_then(|server| server.send("hello world".into())) + .map(|_| ()); + + core.run(future).unwrap(); + bg_thread.join().unwrap(); +}