refactor(identify): bring tests up to workspace standard

This patch refactors the identify tests to use `libp2p-swarm-test`. This allows us to delete quite a bit of code and makes several dev-dependencies obsolete.

The `correct_transfer` test is made obsolete by more precise assertions in the `periodic_identify` test. This allows us to remove the dependency on the `upgrade::{apply_inbound,apply_outbound}` functions.

Finally, we also fix a bug where the reported listen addresses to the other node could contain duplicates.

Related: #3748.

Pull-Request: #3851.
This commit is contained in:
Thomas Eizinger
2023-05-08 05:39:34 +02:00
committed by GitHub
parent 14938043cf
commit fa4adc8c9d
8 changed files with 204 additions and 444 deletions

5
Cargo.lock generated
View File

@ -2606,11 +2606,8 @@ dependencies = [
"futures-timer",
"libp2p-core",
"libp2p-identity",
"libp2p-mplex",
"libp2p-noise",
"libp2p-swarm",
"libp2p-tcp",
"libp2p-yamux",
"libp2p-swarm-test",
"log",
"lru",
"quick-protobuf",

View File

@ -29,11 +29,8 @@ either = "1.8.0"
[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10"
libp2p-mplex = { workspace = true }
libp2p-yamux = { workspace = true }
libp2p-noise = { workspace = true }
libp2p-swarm = { workspace = true, features = ["async-std"] }
libp2p-tcp = { workspace = true, features = ["async-io"] }
libp2p-swarm-test = { path = "../../swarm-test" }
libp2p-swarm = { workspace = true, features = ["macros"] }
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling

View File

@ -561,279 +561,6 @@ impl PeerCache {
#[cfg(test)]
mod tests {
use super::*;
use futures::pin_mut;
use futures::prelude::*;
use libp2p_core::{muxing::StreamMuxerBox, transport, upgrade, Transport};
use libp2p_identity as identity;
use libp2p_identity::PeerId;
use libp2p_mplex::MplexConfig;
use libp2p_noise as noise;
use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent};
use libp2p_tcp as tcp;
use std::time::Duration;
fn transport() -> (PublicKey, transport::Boxed<(PeerId, StreamMuxerBox)>) {
let id_keys = identity::Keypair::generate_ed25519();
let pubkey = id_keys.public();
let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&id_keys).unwrap())
.multiplex(MplexConfig::new())
.boxed();
(pubkey, transport)
}
#[test]
fn periodic_identify() {
let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};
let (mut swarm2, pubkey2) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};
swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await {
return address;
}
}
});
swarm2.dial(listen_addr).unwrap();
// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
let swarm2_fut = swarm2.select_next_some();
pin_mut!(swarm2_fut);
match future::select(swarm1_fut, swarm2_fut)
.await
.factor_second()
.0
{
future::Either::Left(SwarmEvent::Behaviour(Event::Received {
info, ..
})) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(SwarmEvent::Behaviour(Event::Received {
info, ..
})) => {
assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty());
assert_eq!(info.listen_addrs.len(), 1);
return;
}
_ => {}
}
}
})
}
#[test]
fn identify_push() {
let _ = env_logger::try_init();
let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone()));
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};
let (mut swarm2, pubkey2) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};
Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await {
return address;
}
}
});
swarm2.dial(listen_addr).unwrap();
async_std::task::block_on(async move {
loop {
let swarm1_fut = swarm1.select_next_some();
let swarm2_fut = swarm2.select_next_some();
{
pin_mut!(swarm1_fut);
pin_mut!(swarm2_fut);
match future::select(swarm1_fut, swarm2_fut)
.await
.factor_second()
.0
{
future::Either::Left(SwarmEvent::Behaviour(Event::Received {
info,
..
})) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(SwarmEvent::ConnectionEstablished { .. }) => {
// Once a connection is established, we can initiate an
// active push below.
}
_ => continue,
}
}
swarm2
.behaviour_mut()
.push(std::iter::once(pubkey1.to_peer_id()));
}
})
}
#[test]
fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();
let mut swarm1 = {
let (pubkey, transport) = transport();
#[allow(deprecated)]
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone())
// `swarm1` will set `KeepAlive::No` once it identified `swarm2` and thus
// closes the connection. At this point in time `swarm2` might not yet have
// identified `swarm1`. To give `swarm2` enough time, set an initial delay on
// `swarm1`.
.with_initial_delay(Duration::from_secs(10)),
);
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build()
};
let mut swarm2 = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build()
};
let swarm1_peer_id = *swarm1.local_peer_id();
let listener = swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let listen_addr = async_std::task::block_on(async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == listener => return address,
_ => {}
}
}
});
async_std::task::spawn(async move {
loop {
swarm1.next().await;
}
});
swarm2.dial(listen_addr).unwrap();
// Wait until we identified.
async_std::task::block_on(async {
loop {
if let SwarmEvent::Behaviour(Event::Received { .. }) =
swarm2.select_next_some().await
{
break;
}
}
});
swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();
// Wait for connection to close.
async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionClosed { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});
// We should still be able to dial now!
swarm2.dial(swarm1_peer_id).unwrap();
let connected_peer = async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});
assert_eq!(connected_peer, swarm1_peer_id);
}
#[test]
fn check_multiaddr_matches_peer_id() {

View File

@ -39,7 +39,7 @@ use libp2p_swarm::{
};
use log::warn;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
/// Protocol handler for sending and receiving identification requests.
@ -89,7 +89,7 @@ pub struct Handler {
#[derive(Debug)]
pub struct InEvent {
/// The addresses that the peer is listening on.
pub listen_addrs: Vec<Multiaddr>,
pub listen_addrs: HashSet<Multiaddr>,
/// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
pub supported_protocols: Vec<StreamProtocol>,
@ -246,7 +246,7 @@ impl ConnectionHandler for Handler {
public_key: self.public_key.clone(),
protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(),
listen_addrs,
listen_addrs: Vec::from_iter(listen_addrs),
protocols: supported_protocols,
observed_addr: self.observed_addr.clone(),
};

View File

@ -1,2 +0,0 @@
// Automatically generated mod.rs
pub mod structs;

View File

@ -284,97 +284,7 @@ pub enum UpgradeError {
#[cfg(test)]
mod tests {
use super::*;
use futures::channel::oneshot;
use libp2p_core::{
upgrade::{self, apply_inbound, apply_outbound},
Transport,
};
use libp2p_identity as identity;
use libp2p_tcp as tcp;
#[test]
fn correct_transfer() {
// We open a server and a client, send info from the server to the client, and check that
// they were successfully received.
let send_pubkey = identity::Keypair::generate_ed25519().public();
let recv_pubkey = send_pubkey.clone();
let (tx, rx) = oneshot::channel();
let bg_task = async_std::task::spawn(async move {
let mut transport = tcp::async_io::Transport::default().boxed();
transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let addr = transport
.next()
.await
.expect("some event")
.into_new_address()
.expect("listen address");
tx.send(addr).unwrap();
let socket = transport
.next()
.await
.expect("some event")
.into_incoming()
.unwrap()
.0
.await
.unwrap();
let sender = apply_inbound(socket, Identify).await.unwrap();
send(
sender,
Info {
public_key: send_pubkey,
protocol_version: "proto_version".to_owned(),
agent_version: "agent_version".to_owned(),
listen_addrs: vec![
"/ip4/80.81.82.83/tcp/500".parse().unwrap(),
"/ip6/::1/udp/1000".parse().unwrap(),
],
protocols: vec![
StreamProtocol::new("/proto1"),
StreamProtocol::new("/proto2"),
],
observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
},
)
.await
.unwrap();
});
async_std::task::block_on(async move {
let mut transport = tcp::async_io::Transport::default();
let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
let info = apply_outbound(socket, Identify, upgrade::Version::V1)
.await
.unwrap();
assert_eq!(
info.observed_addr,
"/ip4/100.101.102.103/tcp/5000".parse().unwrap()
);
assert_eq!(info.public_key, recv_pubkey);
assert_eq!(info.protocol_version, "proto_version");
assert_eq!(info.agent_version, "agent_version");
assert_eq!(
info.listen_addrs,
&[
"/ip4/80.81.82.83/tcp/500".parse().unwrap(),
"/ip6/::1/udp/1000".parse().unwrap()
]
);
assert_eq!(info.protocols, &["/proto1", "/proto2"]);
bg_task.await;
});
}
#[test]
fn skip_invalid_multiaddr() {

View File

@ -1,67 +0,0 @@
// Automatically generated rust module for 'structs.proto' file
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(unused_imports)]
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct Identify {
pub protocolVersion: Option<String>,
pub agentVersion: Option<String>,
pub publicKey: Option<Vec<u8>>,
pub listenAddrs: Vec<Vec<u8>>,
pub observedAddr: Option<Vec<u8>>,
pub protocols: Vec<String>,
}
impl<'a> MessageRead<'a> for Identify {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(42) => msg.protocolVersion = Some(r.read_string(bytes)?.to_owned()),
Ok(50) => msg.agentVersion = Some(r.read_string(bytes)?.to_owned()),
Ok(10) => msg.publicKey = Some(r.read_bytes(bytes)?.to_owned()),
Ok(18) => msg.listenAddrs.push(r.read_bytes(bytes)?.to_owned()),
Ok(34) => msg.observedAddr = Some(r.read_bytes(bytes)?.to_owned()),
Ok(26) => msg.protocols.push(r.read_string(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for Identify {
fn get_size(&self) -> usize {
0
+ self.protocolVersion.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.agentVersion.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.publicKey.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.listenAddrs.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
+ self.observedAddr.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.protocols.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.protocolVersion { w.write_with_tag(42, |w| w.write_string(&**s))?; }
if let Some(ref s) = self.agentVersion { w.write_with_tag(50, |w| w.write_string(&**s))?; }
if let Some(ref s) = self.publicKey { w.write_with_tag(10, |w| w.write_bytes(&**s))?; }
for s in &self.listenAddrs { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.observedAddr { w.write_with_tag(34, |w| w.write_bytes(&**s))?; }
for s in &self.protocols { w.write_with_tag(26, |w| w.write_string(&**s))?; }
Ok(())
}
}

View File

@ -0,0 +1,198 @@
use futures::prelude::*;
use libp2p_core::multiaddr::Protocol;
use libp2p_identify as identify;
use libp2p_swarm::{keep_alive, Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt;
use std::iter;
#[async_std::test]
async fn periodic_identify() {
let mut swarm1 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("a".to_string(), identity.public())
.with_agent_version("b".to_string()),
)
});
let swarm1_peer_id = *swarm1.local_peer_id();
let mut swarm2 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("c".to_string(), identity.public())
.with_agent_version("d".to_string()),
)
});
let swarm2_peer_id = *swarm2.local_peer_id();
let (swarm1_memory_listen, swarm1_tcp_listen_addr) = swarm1.listen().await;
let (swarm2_memory_listen, swarm2_tcp_listen_addr) = swarm2.listen().await;
swarm2.connect(&mut swarm1).await;
// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
loop {
match future::select(swarm1.next_behaviour_event(), swarm2.next_behaviour_event())
.await
.factor_second()
.0
{
future::Either::Left(identify::Event::Received { info, .. }) => {
assert_eq!(info.public_key.to_peer_id(), swarm2_peer_id);
assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d");
assert!(!info.protocols.is_empty());
assert_eq!(
info.observed_addr,
swarm1_memory_listen.with(Protocol::P2p(swarm1_peer_id.into()))
);
assert!(info.listen_addrs.contains(&swarm2_tcp_listen_addr));
assert!(info.listen_addrs.contains(&swarm2_memory_listen));
return;
}
future::Either::Right(identify::Event::Received { info, .. }) => {
assert_eq!(info.public_key.to_peer_id(), swarm1_peer_id);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty());
assert_eq!(
info.observed_addr,
swarm2_memory_listen.with(Protocol::P2p(swarm2_peer_id.into()))
);
assert!(info.listen_addrs.contains(&swarm1_tcp_listen_addr));
assert!(info.listen_addrs.contains(&swarm1_memory_listen));
return;
}
_ => {}
}
}
}
#[async_std::test]
async fn identify_push() {
let _ = env_logger::try_init();
let mut swarm1 = Swarm::new_ephemeral(|identity| {
Behaviour::new(identify::Config::new("a".to_string(), identity.public()))
});
let mut swarm2 = Swarm::new_ephemeral(|identity| {
Behaviour::new(
identify::Config::new("a".to_string(), identity.public())
.with_agent_version("b".to_string()),
)
});
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
// First, let the periodic identify do its thing.
match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await {
(
[BehaviourEvent::Identify(e1), BehaviourEvent::Identify(e2)],
[BehaviourEvent::Identify(e3), BehaviourEvent::Identify(e4)],
) => {
use identify::Event::{Received, Sent};
// These can be received in any order, hence assert them here instead of the pattern above.
assert!(matches!(e1, Received { .. } | Sent { .. }));
assert!(matches!(e2, Received { .. } | Sent { .. }));
assert!(matches!(e3, Received { .. } | Sent { .. }));
assert!(matches!(e4, Received { .. } | Sent { .. }));
}
other => panic!("Unexpected events: {other:?}"),
};
// Second, actively push.
swarm2
.behaviour_mut()
.identify
.push(iter::once(*swarm1.local_peer_id()));
let swarm1_received_info = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await {
(
[BehaviourEvent::Identify(identify::Event::Received { info, .. })],
[BehaviourEvent::Identify(identify::Event::Pushed { .. })],
) => info,
other => panic!("Unexpected events: {other:?}"),
};
assert_eq!(
swarm1_received_info.public_key.to_peer_id(),
*swarm2.local_peer_id()
);
assert_eq!(swarm1_received_info.protocol_version, "a");
assert_eq!(swarm1_received_info.agent_version, "b");
assert!(!swarm1_received_info.protocols.is_empty());
assert!(swarm1_received_info.listen_addrs.is_empty());
}
#[async_std::test]
async fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();
let mut swarm1 = Swarm::new_ephemeral(|identity| {
Behaviour::new(identify::Config::new("a".to_string(), identity.public()))
});
let mut swarm2 = Swarm::new_ephemeral(|identity| {
Behaviour::new(
identify::Config::new("a".to_string(), identity.public())
.with_agent_version("b".to_string()),
)
});
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
let swarm1_peer_id = *swarm1.local_peer_id();
async_std::task::spawn(swarm1.loop_on_next());
// Wait until we identified.
swarm2
.wait(|event| {
matches!(
event,
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { .. }))
)
.then_some(())
})
.await;
swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();
// Wait for connection to close.
swarm2
.wait(|event| matches!(event, SwarmEvent::ConnectionClosed { .. }).then_some(()))
.await;
// We should still be able to dial now!
swarm2.dial(swarm1_peer_id).unwrap();
let connected_peer = swarm2
.wait(|event| match event {
SwarmEvent::ConnectionEstablished { peer_id, .. } => Some(peer_id),
_ => None,
})
.await;
assert_eq!(connected_peer, swarm1_peer_id);
}
/// Combined behaviour to keep the connection alive after the periodic identify.
///
/// The identify implementation sets `keep_alive` to `No` once it has done its thing.
/// This can result in unexpected connection closures if one peer is faster than the other.
#[derive(libp2p_swarm::NetworkBehaviour)]
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct Behaviour {
identify: identify::Behaviour,
keep_alive: keep_alive::Behaviour,
}
impl Behaviour {
fn new(config: identify::Config) -> Self {
Self {
identify: identify::Behaviour::new(config),
keep_alive: keep_alive::Behaviour::default(),
}
}
}