diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 481f0ae9..ea90a5f8 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -452,31 +452,10 @@ impl Drop for TcpTransStream { #[cfg(test)] mod tests { - use futures::{prelude::*, future::{self, Loop}, stream}; + use futures::prelude::*; use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent}; use std::{net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration}; - use super::{multiaddr_to_socketaddr, TcpConfig, Listener}; - use tokio::runtime::current_thread::{self, Runtime}; - use tokio_io; - - #[test] - fn pause_on_error() { - // We create a stream of values and errors and continue polling even after errors - // have been encountered. We count the number of items (including errors) and assert - // that no item has been missed. - let rs = stream::iter_result(vec![Ok(1), Err(1), Ok(1), Err(1)]); - let ls = Listener::new(rs, Duration::from_secs(1)); - let sum = future::loop_fn((0, ls), |(acc, ls)| { - ls.into_future().then(move |item| { - match item { - Ok((None, _)) => Ok::<_, std::convert::Infallible>(Loop::Break(acc)), - Ok((Some(n), rest)) => Ok(Loop::Continue((acc + n, rest))), - Err((n, rest)) => Ok(Loop::Continue((acc + n, rest))) - } - }) - }); - assert_eq!(4, current_thread::block_on_all(sum).unwrap()) - } + use super::{multiaddr_to_socketaddr, TcpConfig}; #[test] fn wildcard_expansion() { @@ -569,42 +548,43 @@ mod tests { #[test] fn communicating_between_dialer_and_listener() { - use std::io::Write; + let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); + let mut ready_tx = Some(ready_tx); - std::thread::spawn(move || { - let addr = "/ip4/127.0.0.1/tcp/12345".parse::().unwrap(); + async_std::task::spawn(async move { + let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); let tcp = TcpConfig::new(); - let listener = tcp.listen_on(addr).unwrap() - .filter_map(ListenerEvent::into_upgrade) - .for_each(|(sock, _)| { - sock.and_then(|sock| { - // Define what to do with the socket that just connected to us - // Which in this case is read 3 bytes - let handle_conn = tokio_io::io::read_exact(sock, [0; 3]) - .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) - .map_err(|err| panic!("IO error {:?}", err)); + let mut listener = tcp.listen_on(addr).unwrap(); - // Spawn the future as a concurrent task - handle.spawn(handle_conn).unwrap(); - - futures::future::ready(()) - }) - }); - - futures::executor::block_on(listener); + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.take().unwrap().send(listen_addr).unwrap(); + }, + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + }, + _ => unreachable!() + } + } }); - std::thread::sleep(std::time::Duration::from_millis(100)); - let addr = "/ip4/127.0.0.1/tcp/12345".parse::().unwrap(); - let tcp = TcpConfig::new(); - // Obtain a future socket through dialing - let socket = tcp.dial(addr.clone()).unwrap(); - // Define what to do with the socket once it's obtained - let action = socket.then(|sock| { - sock.unwrap().write(&[0x1, 0x2, 0x3]).unwrap(); - futures::future::ready(()) + + async_std::task::block_on(async move { + let addr = ready_rx.await.unwrap(); + let tcp = TcpConfig::new(); + + // Obtain a future socket through dialing + let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); }); - // Execute the future in our event loop - futures::executor::block_on(action); } #[test]