Fix deflate test.

Skip over empty messages or else the socket may not be connected by the
time `close` is called on it.
This commit is contained in:
Toralf Wittner
2019-12-19 12:28:46 +01:00
parent 9349d6ce25
commit 34a631d560
2 changed files with 52 additions and 33 deletions

View File

@ -71,6 +71,7 @@ where
}
/// Decodes and encodes traffic using DEFLATE.
#[derive(Debug)]
pub struct DeflateOutput<S> {
/// Inner stream where we read compressed data from and write compressed data to.
inner: S,

View File

@ -18,59 +18,77 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{prelude::*, channel::oneshot};
use futures::{future, prelude::*};
use libp2p_core::{transport::Transport, upgrade};
use libp2p_deflate::DeflateConfig;
use libp2p_tcp::TcpConfig;
use quickcheck::QuickCheck;
use quickcheck::{QuickCheck, RngCore, TestResult};
#[test]
fn deflate() {
fn prop(message: Vec<u8>) -> bool {
run(message);
true
fn prop(message: Vec<u8>) -> TestResult {
if message.is_empty() {
return TestResult::discard()
}
async_std::task::block_on(run(message));
TestResult::passed()
}
QuickCheck::new()
.max_tests(30)
.quickcheck(prop as fn(Vec<u8>) -> bool)
QuickCheck::new().quickcheck(prop as fn(Vec<u8>) -> TestResult)
}
#[test]
fn lot_of_data() {
run((0..16*1024*1024).map(|_| rand::random::<u8>()).collect());
let mut v = vec![0; 2 * 1024 * 1024];
rand::thread_rng().fill_bytes(&mut v);
async_std::task::block_on(run(v))
}
fn run(message1: Vec<u8>) {
let transport1 = TcpConfig::new()
.and_then(|c, e| upgrade::apply(c, DeflateConfig::default(), e, upgrade::Version::V1));
let transport2 = transport1.clone();
async fn run(message1: Vec<u8>) {
let transport = TcpConfig::new()
.and_then(|conn, endpoint| {
upgrade::apply(conn, DeflateConfig::default(), endpoint, upgrade::Version::V1)
});
let mut listener = transport.clone()
.listen_on("/ip4/0.0.0.0/tcp/0".parse().expect("multiaddr"))
.expect("listener");
let listen_addr = listener.by_ref().next().await
.expect("some event")
.expect("no error")
.into_new_address()
.expect("new address");
let message2 = message1.clone();
let (l_a_tx, l_a_rx) = oneshot::channel();
async_std::task::spawn(async move {
let mut server = transport1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let server_address = server.next().await.unwrap().unwrap().into_new_address().unwrap();
l_a_tx.send(server_address).unwrap();
let mut connec = server.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap();
let listener_task = async_std::task::spawn(async move {
let mut conn = listener
.filter(|e| future::ready(e.as_ref().map(|e| e.is_upgrade()).unwrap_or(false)))
.next()
.await
.expect("some event")
.expect("no error")
.into_upgrade()
.expect("upgrade")
.0
.await
.expect("connection");
let mut buf = vec![0; message2.len()];
connec.read_exact(&mut buf).await.unwrap();
conn.read_exact(&mut buf).await.expect("read_exact");
assert_eq!(&buf[..], &message2[..]);
connec.write_all(&message2).await.unwrap();
connec.close().await.unwrap();
conn.write_all(&message2).await.expect("write_all");
conn.close().await.expect("close")
});
futures::executor::block_on(async move {
let listen_addr = l_a_rx.await.unwrap();
let mut connec = transport2.dial(listen_addr).unwrap().await.unwrap();
connec.write_all(&message1).await.unwrap();
connec.close().await.unwrap();
let mut conn = transport.dial(listen_addr).expect("dialer").await.expect("connection");
conn.write_all(&message1).await.expect("write_all");
conn.close().await.expect("close");
let mut buf = Vec::new();
connec.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf[..], &message1[..]);
});
let mut buf = Vec::new();
conn.read_to_end(&mut buf).await.expect("read_to_end");
assert_eq!(&buf[..], &message1[..]);
listener_task.await
}