mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-26 08:11:39 +00:00
Make the TCP tests compile again (#1251)
This commit is contained in:
@ -452,31 +452,10 @@ impl Drop for TcpTransStream {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use futures::{prelude::*, future::{self, Loop}, stream};
|
use futures::prelude::*;
|
||||||
use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent};
|
use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent};
|
||||||
use std::{net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration};
|
use std::{net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration};
|
||||||
use super::{multiaddr_to_socketaddr, TcpConfig, Listener};
|
use super::{multiaddr_to_socketaddr, TcpConfig};
|
||||||
use tokio::runtime::current_thread::{self, Runtime};
|
|
||||||
use tokio_io;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn pause_on_error() {
|
|
||||||
// We create a stream of values and errors and continue polling even after errors
|
|
||||||
// have been encountered. We count the number of items (including errors) and assert
|
|
||||||
// that no item has been missed.
|
|
||||||
let rs = stream::iter_result(vec![Ok(1), Err(1), Ok(1), Err(1)]);
|
|
||||||
let ls = Listener::new(rs, Duration::from_secs(1));
|
|
||||||
let sum = future::loop_fn((0, ls), |(acc, ls)| {
|
|
||||||
ls.into_future().then(move |item| {
|
|
||||||
match item {
|
|
||||||
Ok((None, _)) => Ok::<_, std::convert::Infallible>(Loop::Break(acc)),
|
|
||||||
Ok((Some(n), rest)) => Ok(Loop::Continue((acc + n, rest))),
|
|
||||||
Err((n, rest)) => Ok(Loop::Continue((acc + n, rest)))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
});
|
|
||||||
assert_eq!(4, current_thread::block_on_all(sum).unwrap())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn wildcard_expansion() {
|
fn wildcard_expansion() {
|
||||||
@ -569,42 +548,43 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn communicating_between_dialer_and_listener() {
|
fn communicating_between_dialer_and_listener() {
|
||||||
use std::io::Write;
|
let (ready_tx, ready_rx) = futures::channel::oneshot::channel();
|
||||||
|
let mut ready_tx = Some(ready_tx);
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
async_std::task::spawn(async move {
|
||||||
let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
|
let addr = "/ip4/127.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
|
||||||
let tcp = TcpConfig::new();
|
let tcp = TcpConfig::new();
|
||||||
let listener = tcp.listen_on(addr).unwrap()
|
let mut listener = tcp.listen_on(addr).unwrap();
|
||||||
.filter_map(ListenerEvent::into_upgrade)
|
|
||||||
.for_each(|(sock, _)| {
|
|
||||||
sock.and_then(|sock| {
|
|
||||||
// Define what to do with the socket that just connected to us
|
|
||||||
// Which in this case is read 3 bytes
|
|
||||||
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
|
|
||||||
.map(|(_, buf)| assert_eq!(buf, [1, 2, 3]))
|
|
||||||
.map_err(|err| panic!("IO error {:?}", err));
|
|
||||||
|
|
||||||
// Spawn the future as a concurrent task
|
loop {
|
||||||
handle.spawn(handle_conn).unwrap();
|
match listener.next().await.unwrap().unwrap() {
|
||||||
|
ListenerEvent::NewAddress(listen_addr) => {
|
||||||
futures::future::ready(())
|
ready_tx.take().unwrap().send(listen_addr).unwrap();
|
||||||
})
|
},
|
||||||
|
ListenerEvent::Upgrade { upgrade, .. } => {
|
||||||
|
let mut upgrade = upgrade.await.unwrap();
|
||||||
|
let mut buf = [0u8; 3];
|
||||||
|
upgrade.read_exact(&mut buf).await.unwrap();
|
||||||
|
assert_eq!(buf, [1, 2, 3]);
|
||||||
|
upgrade.write_all(&[4, 5, 6]).await.unwrap();
|
||||||
|
},
|
||||||
|
_ => unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
futures::executor::block_on(listener);
|
async_std::task::block_on(async move {
|
||||||
});
|
let addr = ready_rx.await.unwrap();
|
||||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
|
||||||
let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
|
|
||||||
let tcp = TcpConfig::new();
|
let tcp = TcpConfig::new();
|
||||||
|
|
||||||
// Obtain a future socket through dialing
|
// Obtain a future socket through dialing
|
||||||
let socket = tcp.dial(addr.clone()).unwrap();
|
let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
|
||||||
// Define what to do with the socket once it's obtained
|
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
|
||||||
let action = socket.then(|sock| {
|
|
||||||
sock.unwrap().write(&[0x1, 0x2, 0x3]).unwrap();
|
let mut buf = [0u8; 3];
|
||||||
futures::future::ready(())
|
socket.read_exact(&mut buf).await.unwrap();
|
||||||
|
assert_eq!(buf, [4, 5, 6]);
|
||||||
});
|
});
|
||||||
// Execute the future in our event loop
|
|
||||||
futures::executor::block_on(action);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
Reference in New Issue
Block a user