From 1110907d78aa43500e10bc03cf247b94d98fe8f0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Aug 2018 10:41:11 +0200 Subject: [PATCH] Rewrite multiplex tests to use the memory transport (#370) --- core/tests/multiplex.rs | 74 +++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 48 deletions(-) diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs index dbb4f123..4c0c9210 100644 --- a/core/tests/multiplex.rs +++ b/core/tests/multiplex.rs @@ -29,9 +29,8 @@ extern crate tokio_io; use bytes::BytesMut; use futures::future::Future; use futures::{Sink, Stream}; -use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport}; -use libp2p_tcp_transport::TcpConfig; -use std::sync::{atomic, mpsc}; +use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport, transport}; +use std::sync::atomic; use std::thread; use tokio_io::codec::length_delimited::Framed; @@ -74,19 +73,14 @@ fn client_to_server_outbound() { // A client opens a connection to a server, then an outgoing substream, then sends a message // on that substream. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = TcpConfig::new() + let future = rx .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); - - let (listener, addr) = transport - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + .into_connection_reuse() + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap()) @@ -107,11 +101,10 @@ fn client_to_server_outbound() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = TcpConfig::new().with_upgrade(multiplex::MplexConfig::new()); - - let future = transport - .dial(rx.recv().unwrap()) - .unwrap() + let future = tx + .with_upgrade(multiplex::MplexConfig::new()) + .dial("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()) .and_then(|client| client.0.outbound()) .map(|server| Framed::<_, BytesMut>::new(server.unwrap())) .and_then(|server| server.send("hello world".into())) @@ -126,19 +119,14 @@ fn connection_reused_for_dialing() { // A client dials the same multiaddress twice in a row. We check that it uses two substreams // instead of opening two different connections. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = OnlyOnce::from(TcpConfig::new()) + let future = OnlyOnce::from(rx) .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); - - let (listener, addr) = transport - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + .into_connection_reuse() + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest))) @@ -170,22 +158,20 @@ fn connection_reused_for_dialing() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = OnlyOnce::from(TcpConfig::new()) + let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) .into_connection_reuse(); - let listen_addr = rx.recv().unwrap(); - let future = transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .and_then(|server| server.send("hello world".into())) .and_then(|first_connec| { transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .map(|server| (first_connec, server)) @@ -203,19 +189,13 @@ fn use_opened_listen_to_dial() { // substream on that same connection, that the client has to accept. The client then sends a // message on that new substream. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = OnlyOnce::from(TcpConfig::new()) - .with_upgrade(multiplex::MplexConfig::new()); - - let (listener, addr) = transport - .clone() - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + let future = OnlyOnce::from(rx) + .with_upgrade(multiplex::MplexConfig::new()) + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap()) @@ -247,15 +227,13 @@ fn use_opened_listen_to_dial() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = OnlyOnce::from(TcpConfig::new()) + let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) .into_connection_reuse(); - let listen_addr = rx.recv().unwrap(); - let future = transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .and_then(|server| server.send("hello world".into()))