mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-27 00:31:35 +00:00
Rewrite multiplex tests to use the memory transport (#370)
This commit is contained in:
committed by
Benjamin Kampmann
parent
9009a6ffa7
commit
1110907d78
@ -29,9 +29,8 @@ extern crate tokio_io;
|
|||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use futures::{Sink, Stream};
|
use futures::{Sink, Stream};
|
||||||
use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport};
|
use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport, transport};
|
||||||
use libp2p_tcp_transport::TcpConfig;
|
use std::sync::atomic;
|
||||||
use std::sync::{atomic, mpsc};
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use tokio_io::codec::length_delimited::Framed;
|
use tokio_io::codec::length_delimited::Framed;
|
||||||
|
|
||||||
@ -74,19 +73,14 @@ fn client_to_server_outbound() {
|
|||||||
// A client opens a connection to a server, then an outgoing substream, then sends a message
|
// A client opens a connection to a server, then an outgoing substream, then sends a message
|
||||||
// on that substream.
|
// on that substream.
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = transport::connector();
|
||||||
|
|
||||||
let bg_thread = thread::spawn(move || {
|
let bg_thread = thread::spawn(move || {
|
||||||
let transport = TcpConfig::new()
|
let future = rx
|
||||||
.with_upgrade(multiplex::MplexConfig::new())
|
.with_upgrade(multiplex::MplexConfig::new())
|
||||||
.into_connection_reuse();
|
.into_connection_reuse()
|
||||||
|
.listen_on("/memory".parse().unwrap())
|
||||||
let (listener, addr) = transport
|
.unwrap_or_else(|_| panic!()).0
|
||||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
|
||||||
.unwrap_or_else(|_| panic!());
|
|
||||||
tx.send(addr).unwrap();
|
|
||||||
|
|
||||||
let future = listener
|
|
||||||
.into_future()
|
.into_future()
|
||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.and_then(|(client, _)| client.unwrap())
|
.and_then(|(client, _)| client.unwrap())
|
||||||
@ -107,11 +101,10 @@ fn client_to_server_outbound() {
|
|||||||
tokio_current_thread::block_on_all(future).unwrap();
|
tokio_current_thread::block_on_all(future).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
let transport = TcpConfig::new().with_upgrade(multiplex::MplexConfig::new());
|
let future = tx
|
||||||
|
.with_upgrade(multiplex::MplexConfig::new())
|
||||||
let future = transport
|
.dial("/memory".parse().unwrap())
|
||||||
.dial(rx.recv().unwrap())
|
.unwrap_or_else(|_| panic!())
|
||||||
.unwrap()
|
|
||||||
.and_then(|client| client.0.outbound())
|
.and_then(|client| client.0.outbound())
|
||||||
.map(|server| Framed::<_, BytesMut>::new(server.unwrap()))
|
.map(|server| Framed::<_, BytesMut>::new(server.unwrap()))
|
||||||
.and_then(|server| server.send("hello world".into()))
|
.and_then(|server| server.send("hello world".into()))
|
||||||
@ -126,19 +119,14 @@ fn connection_reused_for_dialing() {
|
|||||||
// A client dials the same multiaddress twice in a row. We check that it uses two substreams
|
// A client dials the same multiaddress twice in a row. We check that it uses two substreams
|
||||||
// instead of opening two different connections.
|
// instead of opening two different connections.
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = transport::connector();
|
||||||
|
|
||||||
let bg_thread = thread::spawn(move || {
|
let bg_thread = thread::spawn(move || {
|
||||||
let transport = OnlyOnce::from(TcpConfig::new())
|
let future = OnlyOnce::from(rx)
|
||||||
.with_upgrade(multiplex::MplexConfig::new())
|
.with_upgrade(multiplex::MplexConfig::new())
|
||||||
.into_connection_reuse();
|
.into_connection_reuse()
|
||||||
|
.listen_on("/memory".parse().unwrap())
|
||||||
let (listener, addr) = transport
|
.unwrap_or_else(|_| panic!()).0
|
||||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
|
||||||
.unwrap_or_else(|_| panic!());
|
|
||||||
tx.send(addr).unwrap();
|
|
||||||
|
|
||||||
let future = listener
|
|
||||||
.into_future()
|
.into_future()
|
||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest)))
|
.and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest)))
|
||||||
@ -170,22 +158,20 @@ fn connection_reused_for_dialing() {
|
|||||||
tokio_current_thread::block_on_all(future).unwrap();
|
tokio_current_thread::block_on_all(future).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
let transport = OnlyOnce::from(TcpConfig::new())
|
let transport = OnlyOnce::from(tx)
|
||||||
.with_upgrade(multiplex::MplexConfig::new())
|
.with_upgrade(multiplex::MplexConfig::new())
|
||||||
.into_connection_reuse();
|
.into_connection_reuse();
|
||||||
|
|
||||||
let listen_addr = rx.recv().unwrap();
|
|
||||||
|
|
||||||
let future = transport
|
let future = transport
|
||||||
.clone()
|
.clone()
|
||||||
.dial(listen_addr.clone())
|
.dial("/memory".parse().unwrap())
|
||||||
.unwrap_or_else(|_| panic!())
|
.unwrap_or_else(|_| panic!())
|
||||||
.map(|server| Framed::<_, BytesMut>::new(server.0))
|
.map(|server| Framed::<_, BytesMut>::new(server.0))
|
||||||
.and_then(|server| server.send("hello world".into()))
|
.and_then(|server| server.send("hello world".into()))
|
||||||
.and_then(|first_connec| {
|
.and_then(|first_connec| {
|
||||||
transport
|
transport
|
||||||
.clone()
|
.clone()
|
||||||
.dial(listen_addr.clone())
|
.dial("/memory".parse().unwrap())
|
||||||
.unwrap_or_else(|_| panic!())
|
.unwrap_or_else(|_| panic!())
|
||||||
.map(|server| Framed::<_, BytesMut>::new(server.0))
|
.map(|server| Framed::<_, BytesMut>::new(server.0))
|
||||||
.map(|server| (first_connec, server))
|
.map(|server| (first_connec, server))
|
||||||
@ -203,19 +189,13 @@ fn use_opened_listen_to_dial() {
|
|||||||
// substream on that same connection, that the client has to accept. The client then sends a
|
// substream on that same connection, that the client has to accept. The client then sends a
|
||||||
// message on that new substream.
|
// message on that new substream.
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = transport::connector();
|
||||||
|
|
||||||
let bg_thread = thread::spawn(move || {
|
let bg_thread = thread::spawn(move || {
|
||||||
let transport = OnlyOnce::from(TcpConfig::new())
|
let future = OnlyOnce::from(rx)
|
||||||
.with_upgrade(multiplex::MplexConfig::new());
|
.with_upgrade(multiplex::MplexConfig::new())
|
||||||
|
.listen_on("/memory".parse().unwrap())
|
||||||
let (listener, addr) = transport
|
.unwrap_or_else(|_| panic!()).0
|
||||||
.clone()
|
|
||||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
|
||||||
.unwrap_or_else(|_| panic!());
|
|
||||||
tx.send(addr).unwrap();
|
|
||||||
|
|
||||||
let future = listener
|
|
||||||
.into_future()
|
.into_future()
|
||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.and_then(|(client, _)| client.unwrap())
|
.and_then(|(client, _)| client.unwrap())
|
||||||
@ -247,15 +227,13 @@ fn use_opened_listen_to_dial() {
|
|||||||
tokio_current_thread::block_on_all(future).unwrap();
|
tokio_current_thread::block_on_all(future).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
let transport = OnlyOnce::from(TcpConfig::new())
|
let transport = OnlyOnce::from(tx)
|
||||||
.with_upgrade(multiplex::MplexConfig::new())
|
.with_upgrade(multiplex::MplexConfig::new())
|
||||||
.into_connection_reuse();
|
.into_connection_reuse();
|
||||||
|
|
||||||
let listen_addr = rx.recv().unwrap();
|
|
||||||
|
|
||||||
let future = transport
|
let future = transport
|
||||||
.clone()
|
.clone()
|
||||||
.dial(listen_addr.clone())
|
.dial("/memory".parse().unwrap())
|
||||||
.unwrap_or_else(|_| panic!())
|
.unwrap_or_else(|_| panic!())
|
||||||
.map(|server| Framed::<_, BytesMut>::new(server.0))
|
.map(|server| Framed::<_, BytesMut>::new(server.0))
|
||||||
.and_then(|server| server.send("hello world".into()))
|
.and_then(|server| server.send("hello world".into()))
|
||||||
|
Reference in New Issue
Block a user