Fix the identify tests (#1324)

* Fix identify tests

* Some clean-up
This commit is contained in:
Pierre Krieger
2019-11-26 14:47:49 +01:00
committed by Toralf Wittner
parent e5b087d01f
commit 8be45f5318
4 changed files with 59 additions and 94 deletions

View File

@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
async-std = "1.0"
bytes = "0.4" bytes = "0.4"
futures_codec = "0.3.1" futures_codec = "0.3.1"
futures = "0.3.1" futures = "0.3.1"

View File

@ -92,7 +92,7 @@ impl<TSubstream> IdentifyHandler<TSubstream> {
impl<TSubstream> ProtocolsHandler for IdentifyHandler<TSubstream> impl<TSubstream> ProtocolsHandler for IdentifyHandler<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite + Unpin + 'static, TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
type InEvent = (); type InEvent = ();
type OutEvent = IdentifyHandlerEvent<TSubstream>; type OutEvent = IdentifyHandlerEvent<TSubstream>;

View File

@ -254,7 +254,7 @@ pub enum IdentifyEvent {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{Identify, IdentifyEvent}; use crate::{Identify, IdentifyEvent};
use futures::{future, prelude::*}; use futures::prelude::*;
use libp2p_core::{ use libp2p_core::{
identity, identity,
PeerId, PeerId,
@ -269,7 +269,6 @@ mod tests {
use libp2p_mplex::MplexConfig; use libp2p_mplex::MplexConfig;
use rand::{Rng, thread_rng}; use rand::{Rng, thread_rng};
use std::{fmt, io}; use std::{fmt, io};
use tokio::runtime::current_thread;
fn transport() -> (identity::PublicKey, impl Transport< fn transport() -> (identity::PublicKey, impl Transport<
Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send, Error = impl Into<io::Error>>), Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send, Error = impl Into<io::Error>>),
@ -316,40 +315,28 @@ mod tests {
// it will permit the connection to be closed, as defined by // it will permit the connection to be closed, as defined by
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if // `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly. // either `Identified` event arrives correctly.
current_thread::Runtime::new().unwrap().block_on( futures::executor::block_on(async move {
future::poll_fn(move || -> Result<_, io::Error> {
loop { loop {
match swarm1.poll().unwrap() { match future::select(swarm1.next(), swarm2.next()).await.factor_second().0 {
Async::Ready(Some(IdentifyEvent::Received { info, .. })) => { future::Either::Left(Some(Ok(IdentifyEvent::Received { info, .. }))) => {
assert_eq!(info.public_key, pubkey2); assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c"); assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d"); assert_eq!(info.agent_version, "d");
assert!(!info.protocols.is_empty()); assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty()); assert!(info.listen_addrs.is_empty());
return Ok(Poll::Ready(())) return;
},
Async::Ready(Some(IdentifyEvent::Sent { .. })) => (),
Async::Ready(e) => panic!("{:?}", e),
Async::NotReady => {}
} }
future::Either::Right(Some(Ok(IdentifyEvent::Received { info, .. }))) => {
match swarm2.poll().unwrap() {
Async::Ready(Some(IdentifyEvent::Received { info, .. })) => {
assert_eq!(info.public_key, pubkey1); assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a"); assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b"); assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty()); assert!(!info.protocols.is_empty());
assert_eq!(info.listen_addrs.len(), 1); assert_eq!(info.listen_addrs.len(), 1);
return Ok(Poll::Ready(())) return;
}, }
Async::Ready(Some(IdentifyEvent::Sent { .. })) => (), _ => {}
Async::Ready(e) => panic!("{:?}", e),
Async::NotReady => break
} }
} }
})
Ok(Poll::Pending)
}))
.unwrap();
} }
} }

View File

@ -137,11 +137,11 @@ where
impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig
where where
C: AsyncRead + AsyncWrite + Unpin + 'static, C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
type Output = RemoteInfo; type Output = RemoteInfo;
type Error = upgrade::ReadOneError; type Error = upgrade::ReadOneError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, mut socket: Negotiated<C>, _: Self::Info) -> Self::Future { fn upgrade_outbound(self, mut socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::pin(async move { Box::pin(async move {
@ -209,16 +209,13 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig}; use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig};
use tokio::runtime::current_thread::Runtime;
use libp2p_tcp::TcpConfig; use libp2p_tcp::TcpConfig;
use futures::{Future, Stream}; use futures::{prelude::*, channel::oneshot};
use libp2p_core::{ use libp2p_core::{
identity, identity,
Transport, Transport,
transport::ListenerEvent,
upgrade::{self, apply_outbound, apply_inbound} upgrade::{self, apply_outbound, apply_inbound}
}; };
use std::{io, sync::mpsc, thread};
#[test] #[test]
fn correct_transfer() { fn correct_transfer() {
@ -227,35 +224,24 @@ mod tests {
let send_pubkey = identity::Keypair::generate_ed25519().public(); let send_pubkey = identity::Keypair::generate_ed25519().public();
let recv_pubkey = send_pubkey.clone(); let recv_pubkey = send_pubkey.clone();
let (tx, rx) = mpsc::channel(); let (tx, rx) = oneshot::channel();
let bg_thread = thread::spawn(move || { let bg_task = async_std::task::spawn(async move {
let transport = TcpConfig::new(); let transport = TcpConfig::new();
let mut listener = transport let mut listener = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap(); .unwrap();
let addr = listener.by_ref().wait() let addr = listener.next().await
.next()
.expect("some event") .expect("some event")
.expect("no error") .expect("no error")
.into_new_address() .into_new_address()
.expect("listen address"); .expect("listen address");
tx.send(addr).unwrap(); tx.send(addr).unwrap();
let future = listener let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap();
.filter_map(ListenerEvent::into_upgrade) let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap();
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().0)
.and_then(|socket| {
apply_inbound(socket, IdentifyProtocolConfig)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
})
.and_then(|sender| {
sender.send( sender.send(
IdentifyInfo { IdentifyInfo {
public_key: send_pubkey, public_key: send_pubkey,
@ -268,21 +254,15 @@ mod tests {
protocols: vec!["proto1".to_string(), "proto2".to_string()], protocols: vec!["proto1".to_string(), "proto2".to_string()],
}, },
&"/ip4/100.101.102.103/tcp/5000".parse().unwrap(), &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
) ).await.unwrap();
});
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
}); });
async_std::task::block_on(async move {
let transport = TcpConfig::new(); let transport = TcpConfig::new();
let future = transport.dial(rx.recv().unwrap()) let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
.unwrap() let RemoteInfo { info, observed_addr, .. } =
.and_then(|socket| { apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap();
apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
})
.and_then(|RemoteInfo { info, observed_addr, .. }| {
assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap()); assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
assert_eq!(info.public_key, recv_pubkey); assert_eq!(info.public_key, recv_pubkey);
assert_eq!(info.protocol_version, "proto_version"); assert_eq!(info.protocol_version, "proto_version");
@ -291,11 +271,8 @@ mod tests {
&["/ip4/80.81.82.83/tcp/500".parse().unwrap(), &["/ip4/80.81.82.83/tcp/500".parse().unwrap(),
"/ip6/::1/udp/1000".parse().unwrap()]); "/ip6/::1/udp/1000".parse().unwrap()]);
assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]); assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]);
Ok(())
});
let mut rt = Runtime::new().unwrap(); bg_task.await;
let _ = rt.block_on(future).unwrap(); });
bg_thread.join().unwrap();
} }
} }