Some integration tests for multiplex-rs (#108)

* Some integration tests for multiplex-rs

* Correctly check that the background thread is ok
This commit is contained in:
Pierre Krieger
2018-01-26 11:40:43 +01:00
committed by GitHub
parent cace5bf577
commit 1071bfd324
2 changed files with 139 additions and 1 deletions

View File

@ -15,4 +15,8 @@ rand = "0.3.17"
libp2p-swarm = { path = "../libp2p-swarm" } libp2p-swarm = { path = "../libp2p-swarm" }
varint = { path = "../varint-rs" } varint = { path = "../varint-rs" }
error-chain = "0.11.0" error-chain = "0.11.0"
futures-mutex = { path = "../futures-mutex" } futures-mutex = { path = "../futures-mutex" }
[dev-dependencies]
libp2p-tcp-transport = { path = "../libp2p-tcp-transport" }
tokio-core = "0.1"

View File

@ -0,0 +1,134 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
extern crate bytes;
extern crate futures;
extern crate libp2p_swarm as swarm;
extern crate libp2p_tcp_transport as tcp;
extern crate multiplex;
extern crate tokio_core;
extern crate tokio_io;
use futures::future::Future;
use futures::{Stream, Sink};
use std::sync::mpsc;
use std::thread;
use swarm::{Transport, StreamMuxer};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited::Framed;
#[test]
fn client_to_server_outbound() {
// Simulate a client sending a message to a server through a multiplex upgrade.
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle())
.with_upgrade(multiplex::MultiplexConfig);
let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().0)
.and_then(|client| client.outbound())
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
.and_then(|client| {
client.into_future()
.map_err(|(err, _)| err)
.map(|(msg, _)| msg)
})
.and_then(|msg| {
let msg = msg.unwrap();
assert_eq!(msg, "hello world");
Ok(())
});
core.run(future).unwrap();
});
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle())
.with_upgrade(multiplex::MultiplexConfig);
let future = transport.dial(rx.recv().unwrap()).unwrap()
.and_then(|client| client.inbound())
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
.and_then(|server| server.send("hello world".into()))
.map(|_| ());
core.run(future).unwrap();
bg_thread.join().unwrap();
}
#[test]
fn client_to_server_inbound() {
// Simulate a client sending a message to a server through a multiplex upgrade.
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle())
.with_upgrade(multiplex::MultiplexConfig);
let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().0)
.and_then(|client| client.inbound())
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
.and_then(|client| {
client.into_future()
.map_err(|(err, _)| err)
.map(|(msg, _)| msg)
})
.and_then(|msg| {
let msg = msg.unwrap();
assert_eq!(msg, "hello world");
Ok(())
});
core.run(future).unwrap();
});
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle())
.with_upgrade(multiplex::MultiplexConfig);
let future = transport.dial(rx.recv().unwrap()).unwrap()
.and_then(|client| client.outbound())
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
.and_then(|server| server.send("hello world".into()))
.map(|_| ());
core.run(future).unwrap();
bg_thread.join().unwrap();
}