diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index e1776b21..76bba0ee 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +async-std = "1.0" bytes = "0.4" futures_codec = "0.3.1" futures = "0.3.1" diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 72ddc8f8..da764bcd 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -92,7 +92,7 @@ impl IdentifyHandler { impl ProtocolsHandler for IdentifyHandler where - TSubstream: AsyncRead + AsyncWrite + Unpin + 'static, + TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type InEvent = (); type OutEvent = IdentifyHandlerEvent; diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index c764da9a..da371b7c 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -254,7 +254,7 @@ pub enum IdentifyEvent { #[cfg(test)] mod tests { use crate::{Identify, IdentifyEvent}; - use futures::{future, prelude::*}; + use futures::prelude::*; use libp2p_core::{ identity, PeerId, @@ -269,7 +269,6 @@ mod tests { use libp2p_mplex::MplexConfig; use rand::{Rng, thread_rng}; use std::{fmt, io}; - use tokio::runtime::current_thread; fn transport() -> (identity::PublicKey, impl Transport< Output = (PeerId, impl StreamMuxer>), @@ -316,40 +315,28 @@ mod tests { // it will permit the connection to be closed, as defined by // `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if // either `Identified` event arrives correctly. - current_thread::Runtime::new().unwrap().block_on( - future::poll_fn(move || -> Result<_, io::Error> { - loop { - match swarm1.poll().unwrap() { - Async::Ready(Some(IdentifyEvent::Received { info, .. })) => { - assert_eq!(info.public_key, pubkey2); - assert_eq!(info.protocol_version, "c"); - assert_eq!(info.agent_version, "d"); - assert!(!info.protocols.is_empty()); - assert!(info.listen_addrs.is_empty()); - return Ok(Poll::Ready(())) - }, - Async::Ready(Some(IdentifyEvent::Sent { .. })) => (), - Async::Ready(e) => panic!("{:?}", e), - Async::NotReady => {} + futures::executor::block_on(async move { + loop { + match future::select(swarm1.next(), swarm2.next()).await.factor_second().0 { + future::Either::Left(Some(Ok(IdentifyEvent::Received { info, .. }))) => { + assert_eq!(info.public_key, pubkey2); + assert_eq!(info.protocol_version, "c"); + assert_eq!(info.agent_version, "d"); + assert!(!info.protocols.is_empty()); + assert!(info.listen_addrs.is_empty()); + return; } - - match swarm2.poll().unwrap() { - Async::Ready(Some(IdentifyEvent::Received { info, .. })) => { - assert_eq!(info.public_key, pubkey1); - assert_eq!(info.protocol_version, "a"); - assert_eq!(info.agent_version, "b"); - assert!(!info.protocols.is_empty()); - assert_eq!(info.listen_addrs.len(), 1); - return Ok(Poll::Ready(())) - }, - Async::Ready(Some(IdentifyEvent::Sent { .. })) => (), - Async::Ready(e) => panic!("{:?}", e), - Async::NotReady => break + future::Either::Right(Some(Ok(IdentifyEvent::Received { info, .. }))) => { + assert_eq!(info.public_key, pubkey1); + assert_eq!(info.protocol_version, "a"); + assert_eq!(info.agent_version, "b"); + assert!(!info.protocols.is_empty()); + assert_eq!(info.listen_addrs.len(), 1); + return; } + _ => {} } - - Ok(Poll::Pending) - })) - .unwrap(); + } + }) } } diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index c7e3cc91..f768d574 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -137,11 +137,11 @@ where impl OutboundUpgrade for IdentifyProtocolConfig where - C: AsyncRead + AsyncWrite + Unpin + 'static, + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = RemoteInfo; type Error = upgrade::ReadOneError; - type Future = Pin>>>; + type Future = Pin> + Send>>; fn upgrade_outbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { Box::pin(async move { @@ -209,16 +209,13 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i #[cfg(test)] mod tests { use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig}; - use tokio::runtime::current_thread::Runtime; use libp2p_tcp::TcpConfig; - use futures::{Future, Stream}; + use futures::{prelude::*, channel::oneshot}; use libp2p_core::{ identity, Transport, - transport::ListenerEvent, upgrade::{self, apply_outbound, apply_inbound} }; - use std::{io, sync::mpsc, thread}; #[test] fn correct_transfer() { @@ -227,75 +224,55 @@ mod tests { let send_pubkey = identity::Keypair::generate_ed25519().public(); let recv_pubkey = send_pubkey.clone(); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = oneshot::channel(); - let bg_thread = thread::spawn(move || { + let bg_task = async_std::task::spawn(async move { let transport = TcpConfig::new(); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .unwrap(); - let addr = listener.by_ref().wait() - .next() + let addr = listener.next().await .expect("some event") .expect("no error") .into_new_address() .expect("listen address"); - - tx.send(addr).unwrap(); - let future = listener - .filter_map(ListenerEvent::into_upgrade) - .into_future() - .map_err(|(err, _)| err) - .and_then(|(client, _)| client.unwrap().0) - .and_then(|socket| { - apply_inbound(socket, IdentifyProtocolConfig) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - }) - .and_then(|sender| { - sender.send( - IdentifyInfo { - public_key: send_pubkey, - protocol_version: "proto_version".to_owned(), - agent_version: "agent_version".to_owned(), - listen_addrs: vec![ - "/ip4/80.81.82.83/tcp/500".parse().unwrap(), - "/ip6/::1/udp/1000".parse().unwrap(), - ], - protocols: vec!["proto1".to_string(), "proto2".to_string()], - }, - &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(), - ) - }); - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); + let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap(); + let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap(); + sender.send( + IdentifyInfo { + public_key: send_pubkey, + protocol_version: "proto_version".to_owned(), + agent_version: "agent_version".to_owned(), + listen_addrs: vec![ + "/ip4/80.81.82.83/tcp/500".parse().unwrap(), + "/ip6/::1/udp/1000".parse().unwrap(), + ], + protocols: vec!["proto1".to_string(), "proto2".to_string()], + }, + &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(), + ).await.unwrap(); }); - let transport = TcpConfig::new(); + async_std::task::block_on(async move { + let transport = TcpConfig::new(); - let future = transport.dial(rx.recv().unwrap()) - .unwrap() - .and_then(|socket| { - apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - }) - .and_then(|RemoteInfo { info, observed_addr, .. }| { - assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap()); - assert_eq!(info.public_key, recv_pubkey); - assert_eq!(info.protocol_version, "proto_version"); - assert_eq!(info.agent_version, "agent_version"); - assert_eq!(info.listen_addrs, - &["/ip4/80.81.82.83/tcp/500".parse().unwrap(), - "/ip6/::1/udp/1000".parse().unwrap()]); - assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]); - Ok(()) - }); + let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let RemoteInfo { info, observed_addr, .. } = + apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap(); + assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap()); + assert_eq!(info.public_key, recv_pubkey); + assert_eq!(info.protocol_version, "proto_version"); + assert_eq!(info.agent_version, "agent_version"); + assert_eq!(info.listen_addrs, + &["/ip4/80.81.82.83/tcp/500".parse().unwrap(), + "/ip6/::1/udp/1000".parse().unwrap()]); + assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]); - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); - bg_thread.join().unwrap(); + bg_task.await; + }); } }