test: introduce libp2p-swarm-test

This patch-set introduces `libp2p-swarm-test`. It provides utilities for quick and safe bootstrapping of tests for `NetworkBehaviour`s. The main design features are:

- Everything has timeouts
- APIs don't get in your way
- Minimal boilerplate

Closes #2884.

Pull-Request: #2888.
This commit is contained in:
Thomas Eizinger
2023-03-08 20:36:35 +11:00
committed by GitHub
parent 8a27375f96
commit 7069d78ee3
24 changed files with 1821 additions and 2134 deletions

31
Cargo.lock generated
View File

@ -2197,6 +2197,7 @@ dependencies = [
"libp2p-noise",
"libp2p-request-response",
"libp2p-swarm",
"libp2p-swarm-test",
"libp2p-tcp",
"libp2p-yamux",
"log",
@ -2251,6 +2252,7 @@ dependencies = [
name = "libp2p-dcutr"
version = "0.9.0"
dependencies = [
"async-std",
"asynchronous-codec",
"clap 4.1.8",
"either",
@ -2266,6 +2268,7 @@ dependencies = [
"libp2p-plaintext",
"libp2p-relay",
"libp2p-swarm",
"libp2p-swarm-test",
"libp2p-tcp",
"libp2p-yamux",
"log",
@ -2341,9 +2344,8 @@ dependencies = [
"libp2p-core",
"libp2p-mplex",
"libp2p-noise",
"libp2p-plaintext",
"libp2p-swarm",
"libp2p-yamux",
"libp2p-swarm-test",
"log",
"prometheus-client",
"quick-protobuf",
@ -2427,6 +2429,7 @@ dependencies = [
"libp2p-core",
"libp2p-noise",
"libp2p-swarm",
"libp2p-swarm-test",
"libp2p-tcp",
"libp2p-yamux",
"log",
@ -2524,15 +2527,13 @@ version = "0.42.0"
dependencies = [
"async-std",
"either",
"env_logger 0.10.0",
"futures",
"futures-timer",
"instant",
"libp2p-core",
"libp2p-mplex",
"libp2p-noise",
"libp2p-swarm",
"libp2p-tcp",
"libp2p-yamux",
"libp2p-swarm-test",
"log",
"quickcheck-ext",
"rand 0.8.5",
@ -2649,6 +2650,7 @@ dependencies = [
"libp2p-noise",
"libp2p-ping",
"libp2p-swarm",
"libp2p-swarm-test",
"libp2p-tcp",
"libp2p-yamux",
"log",
@ -2672,6 +2674,7 @@ dependencies = [
"libp2p-core",
"libp2p-noise",
"libp2p-swarm",
"libp2p-swarm-test",
"libp2p-tcp",
"libp2p-yamux",
"rand 0.8.5",
@ -2715,6 +2718,22 @@ dependencies = [
"syn",
]
[[package]]
name = "libp2p-swarm-test"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"futures-timer",
"libp2p-core",
"libp2p-plaintext",
"libp2p-swarm",
"libp2p-tcp",
"libp2p-yamux",
"log",
"rand 0.8.5",
]
[[package]]
name = "libp2p-tcp"
version = "0.39.0"

View File

@ -174,6 +174,7 @@ members = [
"swarm",
"swarm-derive",
"interop-tests",
"swarm-test",
"transports/deflate",
"transports/dns",
"transports/noise",

View File

@ -31,6 +31,7 @@ libp2p-noise = { path = "../../transports/noise" }
libp2p-swarm = { path = "../../swarm", features = ["async-std", "macros"] }
libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] }
libp2p-yamux = { path = "../../muxers/yamux" }
libp2p-swarm-test = { path = "../../swarm-test" }
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling

View File

@ -18,85 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{channel::oneshot, Future, FutureExt, StreamExt};
use futures_timer::Delay;
use async_std::task::JoinHandle;
use libp2p_autonat::{
Behaviour, Config, Event, NatStatus, OutboundProbeError, OutboundProbeEvent, ResponseError,
};
use libp2p_core::{identity::Keypair, upgrade::Version, Multiaddr, PeerId, Transport};
use libp2p_noise as noise;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{AddressScore, Swarm, SwarmEvent};
use libp2p_tcp as tcp;
use libp2p_yamux as yamux;
use libp2p_swarm_test::SwarmExt as _;
use std::time::Duration;
const MAX_CONFIDENCE: usize = 3;
const TEST_RETRY_INTERVAL: Duration = Duration::from_secs(1);
const TEST_REFRESH_INTERVAL: Duration = Duration::from_secs(2);
async fn init_swarm(config: Config) -> Swarm<Behaviour> {
let keypair = Keypair::generate_ed25519();
let local_id = PeerId::from_public_key(&keypair.public());
let transport = tcp::async_io::Transport::default()
.upgrade(Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap())
.multiplex(yamux::YamuxConfig::default())
.boxed();
let behaviour = Behaviour::new(local_id, config);
Swarm::with_async_std_executor(transport, behaviour, local_id)
}
async fn spawn_server(kill: oneshot::Receiver<()>) -> (PeerId, Multiaddr) {
let (tx, rx) = oneshot::channel();
async_std::task::spawn(async move {
let mut server = init_swarm(Config {
boot_delay: Duration::from_secs(60),
throttle_clients_peer_max: usize::MAX,
only_global_ips: false,
..Default::default()
})
.await;
let peer_id = *server.local_peer_id();
server
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
let addr = loop {
if let SwarmEvent::NewListenAddr { address, .. } = server.select_next_some().await {
break address;
};
};
tx.send((peer_id, addr)).unwrap();
let mut kill = kill.fuse();
loop {
futures::select! {
_ = server.select_next_some() => {},
_ = kill => return,
}
}
});
rx.await.unwrap()
}
async fn next_event(swarm: &mut Swarm<Behaviour>) -> Event {
loop {
if let SwarmEvent::Behaviour(event) = swarm.select_next_some().await {
break event;
}
}
}
async fn run_test_with_timeout(test: impl Future) {
futures::select! {
_ = test.fuse() => {},
_ = Delay::new(Duration::from_secs(60)).fuse() => panic!("test timed out")
}
}
#[async_std::test]
async fn test_auto_probe() {
let test = async {
let mut client = init_swarm(Config {
let mut client = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
@ -104,11 +44,11 @@ async fn test_auto_probe() {
throttle_server_period: Duration::ZERO,
boot_delay: Duration::ZERO,
..Default::default()
})
.await;
},
)
});
let (_handle, rx) = oneshot::channel();
let (server_id, addr) = spawn_server(rx).await;
let (server_id, addr, _) = new_server_swarm().await;
client.behaviour_mut().add_server(server_id, Some(addr));
// Initial status should be unknown.
@ -117,7 +57,7 @@ async fn test_auto_probe() {
assert_eq!(client.behaviour().confidence(), 0);
// Test no listening addresses
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoAddresses);
@ -135,7 +75,7 @@ async fn test_auto_probe() {
let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
client.add_external_address(unreachable_addr.clone(), AddressScore::Infinite);
let id = match next_event(&mut client).await {
let id = match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => {
assert_eq!(peer, server_id);
probe_id
@ -143,7 +83,7 @@ async fn test_auto_probe() {
other => panic!("Unexpected behaviour event: {other:?}."),
};
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error {
probe_id,
peer,
@ -159,7 +99,7 @@ async fn test_auto_probe() {
other => panic!("Unexpected behaviour event: {other:?}."),
}
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert_eq!(new, NatStatus::Private);
@ -172,16 +112,9 @@ async fn test_auto_probe() {
assert!(client.behaviour().public_address().is_none());
// Test new public listening address
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
if let SwarmEvent::NewListenAddr { .. } = client.select_next_some().await {
break;
}
}
client.listen().await;
let id = match next_event(&mut client).await {
let id = match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => {
assert_eq!(peer, server_id);
probe_id
@ -191,7 +124,7 @@ async fn test_auto_probe() {
let mut had_connection_event = false;
loop {
match client.select_next_some().await {
match client.next_swarm_event().await {
SwarmEvent::ConnectionEstablished {
endpoint, peer_id, ..
} if endpoint.is_listener() => {
@ -224,7 +157,7 @@ async fn test_auto_probe() {
// returned a response before the inbound established connection was reported at the client.
// In this (rare) case the `ConnectionEstablished` event occurs after the `OutboundProbeEvent::Response`.
if !had_connection_event {
match client.select_next_some().await {
match client.next_swarm_event().await {
SwarmEvent::ConnectionEstablished {
endpoint, peer_id, ..
} if endpoint.is_listener() => {
@ -237,15 +170,14 @@ async fn test_auto_probe() {
assert_eq!(client.behaviour().confidence(), 0);
assert!(client.behaviour().nat_status().is_public());
assert!(client.behaviour().public_address().is_some());
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_confidence() {
let test = async {
let mut client = init_swarm(Config {
let mut client = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
@ -253,31 +185,23 @@ async fn test_confidence() {
throttle_server_period: Duration::ZERO,
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
let (_handle, rx) = oneshot::channel();
let (server_id, addr) = spawn_server(rx).await;
},
)
});
let (server_id, addr, _) = new_server_swarm().await;
client.behaviour_mut().add_server(server_id, Some(addr));
// Randomly test either for public or for private status the confidence.
let test_public = rand::random::<bool>();
if test_public {
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
if let SwarmEvent::NewListenAddr { .. } = client.select_next_some().await {
break;
}
}
client.listen().await;
} else {
let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
let unreachable_addr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
client.add_external_address(unreachable_addr, AddressScore::Infinite);
}
for i in 0..MAX_CONFIDENCE + 1 {
let id = match next_event(&mut client).await {
let id = match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => {
assert_eq!(peer, server_id);
probe_id
@ -285,7 +209,7 @@ async fn test_confidence() {
other => panic!("Unexpected behaviour event: {other:?}."),
};
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(event) => {
let (peer, probe_id) = match event {
OutboundProbeEvent::Response { probe_id, peer, .. } if test_public => {
@ -321,7 +245,7 @@ async fn test_confidence() {
// Expect status to flip after first probe
if i == 0 {
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert_eq!(new.is_public(), test_public);
@ -330,15 +254,14 @@ async fn test_confidence() {
}
}
}
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_throttle_server_period() {
let test = async {
let mut client = init_swarm(Config {
let mut client = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
@ -347,25 +270,18 @@ async fn test_throttle_server_period() {
throttle_server_period: Duration::from_secs(1000),
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
},
)
});
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
if let SwarmEvent::NewListenAddr { .. } = client.select_next_some().await {
break;
}
}
client.listen().await;
let (_handle, rx) = oneshot::channel();
let (id, addr) = spawn_server(rx).await;
client.behaviour_mut().add_server(id, Some(addr));
let (server_id, addr, _) = new_server_swarm().await;
client.behaviour_mut().add_server(server_id, Some(addr));
// First probe should be successful and flip status to public.
loop {
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert!(new.is_public());
@ -381,7 +297,7 @@ async fn test_throttle_server_period() {
// Expect following probe to fail because server is throttled
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoServer);
@ -389,15 +305,14 @@ async fn test_throttle_server_period() {
other => panic!("Unexpected behaviour event: {other:?}."),
}
assert_eq!(client.behaviour().confidence(), 0);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_use_connected_as_server() {
let test = async {
let mut client = init_swarm(Config {
let mut client = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
@ -405,52 +320,37 @@ async fn test_use_connected_as_server() {
throttle_server_period: Duration::ZERO,
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
},
)
});
let (_handle, rx) = oneshot::channel();
let (server_id, addr) = spawn_server(rx).await;
let (server_id, addr, _) = new_server_swarm().await;
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
client.listen().await;
let connected = client.dial_and_wait(addr).await;
assert_eq!(connected, server_id);
client.dial(addr).unwrap();
// await connection
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
client.select_next_some().await
{
assert_eq!(peer_id, server_id);
break;
}
}
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Request { peer, .. }) => {
assert_eq!(peer, server_id);
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Response { peer, .. }) => {
assert_eq!(peer, server_id);
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_outbound_failure() {
let test = async {
let mut servers = Vec::new();
let mut client = init_swarm(Config {
let mut client = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
@ -458,28 +358,23 @@ async fn test_outbound_failure() {
throttle_server_period: Duration::ZERO,
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
},
)
});
let mut servers = Vec::new();
for _ in 0..5 {
let (tx, rx) = oneshot::channel();
let (id, addr) = spawn_server(rx).await;
let (id, addr, handle) = new_server_swarm().await;
client.behaviour_mut().add_server(id, Some(addr));
servers.push((id, tx));
servers.push((id, handle));
}
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
client.listen().await;
loop {
if let SwarmEvent::NewListenAddr { .. } = client.select_next_some().await {
break;
}
}
// First probe should be successful and flip status to public.
loop {
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert!(new.is_public());
@ -491,14 +386,17 @@ async fn test_outbound_failure() {
}
}
let inactive = servers.split_off(1);
// kill a server
let mut inactive_servers = Vec::new();
#[allow(clippy::needless_collect)] // Drop the handles of the inactive servers to kill them.
let inactive_ids: Vec<_> = inactive.into_iter().map(|(id, _handle)| id).collect();
for (id, handle) in servers.split_off(1) {
handle.cancel().await;
inactive_servers.push(id);
}
// Expect to retry on outbound failure
loop {
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Request { .. }) => {}
Event::OutboundProbe(OutboundProbeEvent::Response { peer, .. }) => {
assert_eq!(peer, servers[0].0);
@ -509,20 +407,19 @@ async fn test_outbound_failure() {
error: OutboundProbeError::OutboundRequest(_),
..
}) => {
assert!(inactive_ids.contains(&peer));
assert!(inactive_servers.contains(&peer));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
}
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_global_ips_config() {
let test = async {
let mut client = init_swarm(Config {
let mut client = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
@ -530,42 +427,45 @@ async fn test_global_ips_config() {
only_global_ips: true,
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
},
)
});
client.listen().await;
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
if let SwarmEvent::NewListenAddr { .. } = client.select_next_some().await {
break;
}
}
let (_handle, rx) = oneshot::channel();
let (server_id, addr) = spawn_server(rx).await;
let (server_id, addr, _) = new_server_swarm().await;
// Dial server instead of adding it via `Behaviour::add_server` because the
// `only_global_ips` restriction does not apply for manually added servers.
client.dial(addr).unwrap();
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
client.select_next_some().await
{
assert_eq!(peer_id, server_id);
break;
}
}
let connected = client.dial_and_wait(addr).await;
assert_eq!(connected, server_id);
// Expect that the server is not qualified for dial-back because it is observed
// at a local IP.
match next_event(&mut client).await {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { error, .. }) => {
assert!(matches!(error, OutboundProbeError::NoServer))
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
};
run_test_with_timeout(test).await;
}
async fn new_server_swarm() -> (PeerId, Multiaddr, JoinHandle<()>) {
let mut swarm = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
boot_delay: Duration::from_secs(60),
throttle_clients_peer_max: usize::MAX,
only_global_ips: false,
..Default::default()
},
)
});
let (_, multiaddr) = swarm.listen().await;
let peer_id = *swarm.local_peer_id();
let task = async_std::task::spawn(swarm.loop_on_next());
(peer_id, multiaddr, task)
}

View File

@ -18,128 +18,23 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{channel::oneshot, Future, FutureExt, StreamExt};
use futures_timer::Delay;
use libp2p_autonat::{
Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError,
};
use libp2p_core::{
identity::Keypair, multiaddr::Protocol, upgrade::Version, ConnectedPoint, Endpoint, Multiaddr,
PeerId, Transport,
};
use libp2p_noise as noise;
use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr, PeerId};
use libp2p_swarm::DialError;
use libp2p_swarm::{AddressScore, Swarm, SwarmEvent};
use libp2p_tcp as tcp;
use libp2p_yamux as yamux;
use libp2p_swarm_test::SwarmExt as _;
use std::{num::NonZeroU32, time::Duration};
async fn init_swarm(config: Config) -> Swarm<Behaviour> {
let keypair = Keypair::generate_ed25519();
let local_id = PeerId::from_public_key(&keypair.public());
let transport = tcp::async_io::Transport::default()
.upgrade(Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap())
.multiplex(yamux::YamuxConfig::default())
.boxed();
let behaviour = Behaviour::new(local_id, config);
Swarm::with_async_std_executor(transport, behaviour, local_id)
}
async fn init_server(config: Option<Config>) -> (Swarm<Behaviour>, PeerId, Multiaddr) {
let mut config = config.unwrap_or_else(|| Config {
only_global_ips: false,
..Default::default()
});
// Don't do any outbound probes.
config.boot_delay = Duration::from_secs(60);
let mut server = init_swarm(config).await;
let peer_id = *server.local_peer_id();
server
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
let addr = loop {
if let SwarmEvent::NewListenAddr { address, .. } = server.select_next_some().await {
break address;
};
};
(server, peer_id, addr)
}
async fn spawn_client(
listen: bool,
add_dummy_external_addr: bool,
server_id: PeerId,
server_addr: Multiaddr,
kill: oneshot::Receiver<()>,
) -> (PeerId, Option<Multiaddr>) {
let (tx, rx) = oneshot::channel();
async_std::task::spawn(async move {
let mut client = init_swarm(Config {
boot_delay: Duration::from_secs(1),
retry_interval: Duration::from_secs(1),
throttle_server_period: Duration::ZERO,
only_global_ips: false,
..Default::default()
})
.await;
client
.behaviour_mut()
.add_server(server_id, Some(server_addr));
let peer_id = *client.local_peer_id();
let mut addr = None;
if listen {
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
if let SwarmEvent::NewListenAddr { address, .. } = client.select_next_some().await {
addr = Some(address);
break;
};
}
}
if add_dummy_external_addr {
let dummy_addr: Multiaddr = "/ip4/127.0.0.1/tcp/12345".parse().unwrap();
client.add_external_address(dummy_addr, AddressScore::Infinite);
}
tx.send((peer_id, addr)).unwrap();
let mut kill = kill.fuse();
loop {
futures::select! {
_ = client.select_next_some() => {},
_ = kill => return,
}
}
});
rx.await.unwrap()
}
async fn next_event(swarm: &mut Swarm<Behaviour>) -> Event {
loop {
if let SwarmEvent::Behaviour(event) = swarm.select_next_some().await {
break event;
}
}
}
async fn run_test_with_timeout(test: impl Future) {
futures::select! {
_ = test.fuse() => {},
_ = Delay::new(Duration::from_secs(60)).fuse() => panic!("test timed out")
}
}
#[async_std::test]
async fn test_dial_back() {
let test = async {
let (mut server, server_id, server_addr) = init_server(None).await;
let (_handle, rx) = oneshot::channel();
let (client_id, client_addr) = spawn_client(true, false, server_id, server_addr, rx).await;
let (mut server, server_id, server_addr) = new_server_swarm(None).await;
let (mut client, client_id) = new_client_swarm(server_id, server_addr).await;
let (_, client_addr) = client.listen().await;
async_std::task::spawn(client.loop_on_next());
let client_port = client_addr
.unwrap()
.into_iter()
.find_map(|p| match p {
Protocol::Tcp(port) => Some(port),
@ -147,7 +42,7 @@ async fn test_dial_back() {
})
.unwrap();
let observed_client_ip = loop {
match server.select_next_some().await {
match server.next_swarm_event().await {
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint:
@ -174,7 +69,7 @@ async fn test_dial_back() {
.with(Protocol::Ip4(observed_client_ip))
.with(Protocol::Tcp(client_port))
.with(Protocol::P2p(client_id.into()));
let request_probe_id = match next_event(&mut server).await {
let request_probe_id = match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Request {
peer,
addresses,
@ -189,7 +84,7 @@ async fn test_dial_back() {
};
loop {
match server.select_next_some().await {
match server.next_swarm_event().await {
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint:
@ -213,7 +108,7 @@ async fn test_dial_back() {
}
}
match next_event(&mut server).await {
match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Response {
probe_id,
peer,
@ -225,18 +120,19 @@ async fn test_dial_back() {
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_dial_error() {
let test = async {
let (mut server, server_id, server_addr) = init_server(None).await;
let (_handle, rx) = oneshot::channel();
let (client_id, _) = spawn_client(false, true, server_id, server_addr, rx).await;
let request_probe_id = match next_event(&mut server).await {
let (mut server, server_id, server_addr) = new_server_swarm(None).await;
let (mut client, client_id) = new_client_swarm(server_id, server_addr).await;
client.add_external_address(
"/ip4/127.0.0.1/tcp/12345".parse().unwrap(),
AddressScore::Infinite,
);
async_std::task::spawn(client.loop_on_next());
let request_probe_id = match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => {
assert_eq!(peer, client_id);
probe_id
@ -245,7 +141,7 @@ async fn test_dial_error() {
};
loop {
match server.select_next_some().await {
match server.next_swarm_event().await {
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
assert_eq!(peer_id.unwrap(), client_id);
assert!(matches!(error, DialError::Transport(_)));
@ -257,7 +153,7 @@ async fn test_dial_error() {
}
}
match next_event(&mut server).await {
match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Error {
probe_id,
peer,
@ -269,37 +165,30 @@ async fn test_dial_error() {
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_throttle_global_max() {
let test = async {
let (mut server, server_id, server_addr) = init_server(Some(Config {
let (mut server, server_id, server_addr) = new_server_swarm(Some(Config {
throttle_clients_global_max: 1,
throttle_clients_period: Duration::from_secs(60),
only_global_ips: false,
..Default::default()
}))
.await;
let mut handles = Vec::new();
for _ in 0..2 {
let (handle, rx) = oneshot::channel();
spawn_client(true, false, server_id, server_addr.clone(), rx).await;
handles.push(handle);
let (mut client, _) = new_client_swarm(server_id, server_addr.clone()).await;
client.listen().await;
async_std::task::spawn(client.loop_on_next());
}
let (first_probe_id, first_peer_id) = match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => {
(probe_id, peer)
}
let (first_probe_id, first_peer_id) = match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => (probe_id, peer),
other => panic!("Unexpected behaviour event: {other:?}."),
};
loop {
match next_event(&mut server).await {
match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Error {
peer,
probe_id,
@ -316,15 +205,11 @@ async fn test_throttle_global_max() {
other => panic!("Unexpected behaviour event: {other:?}."),
};
}
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_throttle_peer_max() {
let test = async {
let (mut server, server_id, server_addr) = init_server(Some(Config {
let (mut server, server_id, server_addr) = new_server_swarm(Some(Config {
throttle_clients_peer_max: 1,
throttle_clients_period: Duration::from_secs(60),
only_global_ips: false,
@ -332,10 +217,11 @@ async fn test_throttle_peer_max() {
}))
.await;
let (_handle, rx) = oneshot::channel();
let (client_id, _) = spawn_client(true, false, server_id, server_addr.clone(), rx).await;
let (mut client, client_id) = new_client_swarm(server_id, server_addr.clone()).await;
client.listen().await;
async_std::task::spawn(client.loop_on_next());
let first_probe_id = match next_event(&mut server).await {
let first_probe_id = match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => {
assert_eq!(client_id, peer);
probe_id
@ -343,7 +229,7 @@ async fn test_throttle_peer_max() {
other => panic!("Unexpected behaviour event: {other:?}."),
};
match next_event(&mut server).await {
match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Response { peer, probe_id, .. }) => {
assert_eq!(peer, client_id);
assert_eq!(probe_id, first_probe_id);
@ -351,7 +237,7 @@ async fn test_throttle_peer_max() {
other => panic!("Unexpected behaviour event: {other:?}."),
}
match next_event(&mut server).await {
match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Error {
peer,
probe_id,
@ -366,15 +252,11 @@ async fn test_throttle_peer_max() {
}
other => panic!("Unexpected behaviour event: {other:?}."),
};
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_dial_multiple_addr() {
let test = async {
let (mut server, server_id, server_addr) = init_server(Some(Config {
let (mut server, server_id, server_addr) = new_server_swarm(Some(Config {
throttle_clients_peer_max: 1,
throttle_clients_period: Duration::from_secs(60),
only_global_ips: false,
@ -382,10 +264,15 @@ async fn test_dial_multiple_addr() {
}))
.await;
let (_handle, rx) = oneshot::channel();
let (client_id, _) = spawn_client(true, true, server_id, server_addr.clone(), rx).await;
let (mut client, client_id) = new_client_swarm(server_id, server_addr.clone()).await;
client.listen().await;
client.add_external_address(
"/ip4/127.0.0.1/tcp/12345".parse().unwrap(),
AddressScore::Infinite,
);
async_std::task::spawn(client.loop_on_next());
let dial_addresses = match next_event(&mut server).await {
let dial_addresses = match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Request {
peer, addresses, ..
}) => {
@ -397,7 +284,7 @@ async fn test_dial_multiple_addr() {
};
loop {
match server.select_next_some().await {
match server.next_swarm_event().await {
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint:
@ -410,8 +297,12 @@ async fn test_dial_multiple_addr() {
} => {
assert_eq!(peer_id, client_id);
let dial_errors = concurrent_dial_errors.unwrap();
assert_eq!(dial_errors.len(), 1);
assert_eq!(dial_errors[0].0, dial_addresses[0]);
// The concurrent dial might not be fast enough to produce a dial error.
if let Some((addr, _)) = dial_errors.get(0) {
assert_eq!(addr, &dial_addresses[0]);
}
assert_eq!(address, dial_addresses[1]);
break;
}
@ -420,33 +311,63 @@ async fn test_dial_multiple_addr() {
other => panic!("Unexpected swarm event: {other:?}."),
}
}
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_global_ips_config() {
let test = async {
let (mut server, server_id, server_addr) = init_server(Some(Config {
let (mut server, server_id, server_addr) = new_server_swarm(Some(Config {
// Enforce that only clients outside of the local network are qualified for dial-backs.
only_global_ips: true,
..Default::default()
}))
.await;
let (_handle, rx) = oneshot::channel();
spawn_client(true, false, server_id, server_addr.clone(), rx).await;
let (mut client, _) = new_client_swarm(server_id, server_addr.clone()).await;
client.listen().await;
async_std::task::spawn(client.loop_on_next());
// Expect the probe to be refused as both peers run on the same machine and thus in the same local network.
match next_event(&mut server).await {
match server.next_behaviour_event().await {
Event::InboundProbe(InboundProbeEvent::Error { error, .. }) => assert!(matches!(
error,
InboundProbeError::Response(ResponseError::DialRefused)
)),
other => panic!("Unexpected behaviour event: {other:?}."),
};
};
run_test_with_timeout(test).await;
}
async fn new_server_swarm(config: Option<Config>) -> (Swarm<Behaviour>, PeerId, Multiaddr) {
let mut config = config.unwrap_or_else(|| Config {
only_global_ips: false,
..Default::default()
});
// Don't do any outbound probes.
config.boot_delay = Duration::from_secs(60);
let mut server = Swarm::new_ephemeral(|key| Behaviour::new(key.public().to_peer_id(), config));
let peer_id = *server.local_peer_id();
let (_, addr) = server.listen().await;
(server, peer_id, addr)
}
async fn new_client_swarm(server_id: PeerId, server_addr: Multiaddr) -> (Swarm<Behaviour>, PeerId) {
let mut client = Swarm::new_ephemeral(|key| {
Behaviour::new(
key.public().to_peer_id(),
Config {
boot_delay: Duration::from_secs(1),
retry_interval: Duration::from_secs(1),
throttle_server_period: Duration::ZERO,
only_global_ips: false,
..Default::default()
},
)
});
client
.behaviour_mut()
.add_server(server_id, Some(server_addr));
let peer_id = *client.local_peer_id();
(client, peer_id)
}

View File

@ -25,6 +25,7 @@ thiserror = "1.0"
void = "1"
[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
clap = { version = "4.1.6", features = ["derive"] }
env_logger = "0.10.0"
libp2p-dns = { path = "../../transports/dns", features = ["async-std"] }
@ -34,6 +35,7 @@ libp2p-ping = { path = "../../protocols/ping" }
libp2p-plaintext = { path = "../../transports/plaintext" }
libp2p-relay = { path = "../relay" }
libp2p-swarm = { path = "../../swarm", features = ["macros"] }
libp2p-swarm-test = { path = "../../swarm-test"}
libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] }
libp2p-yamux = { path = "../../muxers/yamux" }
rand = "0.8"

View File

@ -18,96 +18,94 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::executor::LocalPool;
use futures::future::FutureExt;
use futures::io::{AsyncRead, AsyncWrite};
use futures::stream::StreamExt;
use futures::task::Spawn;
use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::muxing::StreamMuxerBox;
use libp2p_core::transport::upgrade::Version;
use libp2p_core::transport::{Boxed, MemoryTransport, OrTransport, Transport};
use libp2p_core::PublicKey;
use libp2p_core::transport::{MemoryTransport, Transport};
use libp2p_core::{identity, PeerId};
use libp2p_dcutr as dcutr;
use libp2p_plaintext::PlainText2Config;
use libp2p_relay as relay;
use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent};
use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt as _;
use std::time::Duration;
#[test]
fn connect() {
#[async_std::test]
async fn connect() {
let _ = env_logger::try_init();
let mut pool = LocalPool::new();
let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
let mut relay = build_relay();
let relay_peer_id = *relay.local_peer_id();
relay.listen_on(relay_addr.clone()).unwrap();
relay.add_external_address(relay_addr.clone(), AddressScore::Infinite);
spawn_swarm_on_pool(&pool, relay);
let mut dst = build_client();
let mut src = build_client();
// Have all swarms listen on a local memory address.
let (relay_addr, _) = relay.listen().await;
let (dst_addr, _) = dst.listen().await;
src.listen().await;
let relay_peer_id = *relay.local_peer_id();
let dst_peer_id = *dst.local_peer_id();
async_std::task::spawn(relay.loop_on_next());
let dst_relayed_addr = relay_addr
.with(Protocol::P2p(relay_peer_id.into()))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(dst_peer_id.into()));
let dst_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
dst.listen_on(dst_relayed_addr.clone()).unwrap();
dst.listen_on(dst_addr.clone()).unwrap();
dst.add_external_address(dst_addr.clone(), AddressScore::Infinite);
pool.run_until(wait_for_reservation(
wait_for_reservation(
&mut dst,
dst_relayed_addr.clone(),
relay_peer_id,
false, // No renewal.
));
spawn_swarm_on_pool(&pool, dst);
)
.await;
async_std::task::spawn(dst.loop_on_next());
let mut src = build_client();
let src_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
src.listen_on(src_addr.clone()).unwrap();
pool.run_until(wait_for_new_listen_addr(&mut src, &src_addr));
src.add_external_address(src_addr.clone(), AddressScore::Infinite);
src.dial_and_wait(dst_relayed_addr.clone()).await;
src.dial(dst_relayed_addr.clone()).unwrap();
pool.run_until(wait_for_connection_established(&mut src, &dst_relayed_addr));
match pool.run_until(wait_for_dcutr_event(&mut src)) {
dcutr::Event::RemoteInitiatedDirectConnectionUpgrade {
loop {
match src
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
ClientEvent::Dcutr(dcutr::Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id,
remote_relayed_addr,
} if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr => {}
e => panic!("Unexpected event: {e:?}."),
}) => {
if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr {
break;
}
pool.run_until(wait_for_connection_established(
&mut src,
&dst_addr.with(Protocol::P2p(dst_peer_id.into())),
));
}
other => panic!("Unexpected event: {other:?}."),
}
}
let dst_addr = dst_addr.with(Protocol::P2p(dst_peer_id.into()));
src.wait(move |e| match e {
SwarmEvent::ConnectionEstablished { endpoint, .. } => {
(*endpoint.get_remote_address() == dst_addr).then_some(())
}
_ => None,
})
.await;
}
fn build_relay() -> Swarm<relay::Behaviour> {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let local_peer_id = local_public_key.to_peer_id();
Swarm::new_ephemeral(|identity| {
let local_peer_id = identity.public().to_peer_id();
let transport = build_transport(MemoryTransport::default().boxed(), local_public_key);
Swarm::with_threadpool_executor(
transport,
relay::Behaviour::new(
local_peer_id,
relay::Config {
reservation_duration: Duration::from_secs(2),
..Default::default()
},
),
local_peer_id,
)
})
}
fn build_client() -> Swarm<Client> {
@ -116,12 +114,16 @@ fn build_client() -> Swarm<Client> {
let local_peer_id = local_public_key.to_peer_id();
let (relay_transport, behaviour) = relay::client::new(local_peer_id);
let transport = build_transport(
OrTransport::new(relay_transport, MemoryTransport::default()).boxed(),
local_public_key,
);
Swarm::with_threadpool_executor(
let transport = relay_transport
.or_transport(MemoryTransport::default())
.or_transport(libp2p_tcp::async_io::Transport::default())
.upgrade(Version::V1)
.authenticate(PlainText2Config { local_public_key })
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
Swarm::without_executor(
transport,
Client {
relay: behaviour,
@ -131,20 +133,6 @@ fn build_client() -> Swarm<Client> {
)
}
fn build_transport<StreamSink>(
transport: Boxed<StreamSink>,
local_public_key: PublicKey,
) -> Boxed<(PeerId, StreamMuxerBox)>
where
StreamSink: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
transport
.upgrade(Version::V1)
.authenticate(PlainText2Config { local_public_key })
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed()
}
#[derive(NetworkBehaviour)]
#[behaviour(
out_event = "ClientEvent",
@ -174,12 +162,6 @@ impl From<dcutr::Event> for ClientEvent {
}
}
fn spawn_swarm_on_pool<B: NetworkBehaviour + Send>(pool: &LocalPool, swarm: Swarm<B>) {
pool.spawner()
.spawn_obj(swarm.collect::<Vec<_>>().map(|_| ()).boxed().into())
.unwrap();
}
async fn wait_for_reservation(
client: &mut Swarm<Client>,
client_addr: Multiaddr,
@ -189,7 +171,7 @@ async fn wait_for_reservation(
let mut new_listen_addr_for_relayed_addr = false;
let mut reservation_req_accepted = false;
loop {
match client.select_next_some().await {
match client.next_swarm_event().await {
SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {}
SwarmEvent::NewListenAddr { address, .. } if address == client_addr => {
new_listen_addr_for_relayed_addr = true;
@ -215,38 +197,3 @@ async fn wait_for_reservation(
}
}
}
async fn wait_for_connection_established(client: &mut Swarm<Client>, addr: &Multiaddr) {
loop {
match client.select_next_some().await {
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished { endpoint, .. }
if endpoint.get_remote_address() == addr =>
{
break
}
SwarmEvent::Dialing(_) => {}
SwarmEvent::Behaviour(ClientEvent::Relay(
relay::client::Event::OutboundCircuitEstablished { .. },
)) => {}
SwarmEvent::ConnectionEstablished { .. } => {}
e => panic!("{e:?}"),
}
}
}
async fn wait_for_new_listen_addr(client: &mut Swarm<Client>, new_addr: &Multiaddr) {
match client.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } if address == *new_addr => {}
e => panic!("{e:?}"),
}
}
async fn wait_for_dcutr_event(client: &mut Swarm<Client>) -> dcutr::Event {
loop {
match client.select_next_some().await {
SwarmEvent::Behaviour(ClientEvent::Dcutr(e)) => return e,
e => panic!("{e:?}"),
}
}
}

View File

@ -36,15 +36,14 @@ instant = "0.1.11"
prometheus-client = "0.19.0"
[dev-dependencies]
async-std = "1.6.3"
async-std = { version = "1.6.3", features = ["unstable"] }
env_logger = "0.10.0"
hex = "0.4.2"
libp2p-core = { path = "../../core"}
libp2p-mplex = { path = "../../muxers/mplex"}
libp2p-noise = { path = "../../transports/noise"}
libp2p-plaintext = { path = "../../transports/plaintext" }
libp2p-swarm = { path = "../../swarm" }
libp2p-yamux = { path = "../../muxers/yamux" }
quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" }
libp2p-swarm-test = { path = "../../swarm-test"}
quickcheck-ext = { path = "../../misc/quickcheck-ext" }
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling

View File

@ -3689,7 +3689,7 @@ mod local_test {
use super::*;
use crate::IdentTopic;
use asynchronous_codec::Encoder;
use quickcheck::*;
use quickcheck_ext::*;
fn empty_rpc() -> Rpc {
Rpc {

View File

@ -563,7 +563,7 @@ mod tests {
use crate::Behaviour;
use crate::IdentTopic as Topic;
use libp2p_core::identity::Keypair;
use quickcheck::*;
use quickcheck_ext::*;
#[derive(Clone, Debug)]
struct Message(RawMessage);

View File

@ -18,90 +18,50 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use log::debug;
use quickcheck::{QuickCheck, TestResult};
use rand::{random, seq::SliceRandom, SeedableRng};
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use async_std::prelude::FutureExt;
use futures::stream::{FuturesUnordered, SelectAll};
use futures::StreamExt;
use libp2p_core::{
identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, Multiaddr, Transport,
};
use libp2p_gossipsub as gossipsub;
use libp2p_plaintext::PlainText2Config;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_yamux as yamux;
use libp2p_gossipsub::{MessageAuthenticity, ValidationMode};
use libp2p_swarm::Swarm;
use libp2p_swarm_test::SwarmExt as _;
use log::debug;
use quickcheck_ext::{QuickCheck, TestResult};
use rand::{seq::SliceRandom, SeedableRng};
use std::{task::Poll, time::Duration};
struct Graph {
pub nodes: Vec<(Multiaddr, Swarm<gossipsub::Behaviour>)>,
}
impl Future for Graph {
type Output = (Multiaddr, gossipsub::Event);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
for (addr, node) in &mut self.nodes {
loop {
match node.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::Behaviour(event))) => {
return Poll::Ready((addr.clone(), event))
}
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => panic!("unexpected None when polling nodes"),
Poll::Pending => break,
}
}
}
Poll::Pending
}
nodes: SelectAll<Swarm<gossipsub::Behaviour>>,
}
impl Graph {
fn new_connected(num_nodes: usize, seed: u64) -> Graph {
async fn new_connected(num_nodes: usize, seed: u64) -> Graph {
if num_nodes == 0 {
panic!("expecting at least one node");
}
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
let mut not_connected_nodes = std::iter::once(())
.cycle()
.take(num_nodes)
let mut not_connected_nodes = (0..num_nodes)
.map(|_| build_node())
.collect::<Vec<(Multiaddr, Swarm<gossipsub::Behaviour>)>>();
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;
let mut connected_nodes = vec![not_connected_nodes.pop().unwrap()];
while !not_connected_nodes.is_empty() {
connected_nodes.shuffle(&mut rng);
not_connected_nodes.shuffle(&mut rng);
for mut next in not_connected_nodes {
let connected = connected_nodes
.choose_mut(&mut rng)
.expect("at least one connected node");
let mut next = not_connected_nodes.pop().unwrap();
let connected_addr = &connected_nodes[0].0;
// Memory transport can not handle addresses with `/p2p` suffix.
let mut connected_addr_no_p2p = connected_addr.clone();
let p2p_suffix_connected = connected_addr_no_p2p.pop();
debug!(
"Connect: {} -> {}",
next.0.clone().pop().unwrap(),
p2p_suffix_connected.unwrap()
);
next.1.dial(connected_addr_no_p2p).unwrap();
next.connect(connected).await;
connected_nodes.push(next);
}
Graph {
nodes: connected_nodes,
nodes: SelectAll::from_iter(connected_nodes),
}
}
@ -109,75 +69,62 @@ impl Graph {
/// `true`.
///
/// Returns [`true`] on success and [`false`] on timeout.
fn wait_for<F: FnMut(&gossipsub::Event) -> bool>(&mut self, mut f: F) -> bool {
let fut = futures::future::poll_fn(move |cx| match self.poll_unpin(cx) {
Poll::Ready((_addr, ev)) if f(&ev) => Poll::Ready(()),
_ => Poll::Pending,
});
async fn wait_for<F: FnMut(&gossipsub::Event) -> bool>(&mut self, mut f: F) -> bool {
let condition = async {
loop {
if let Ok(ev) = self
.nodes
.select_next_some()
.await
.try_into_behaviour_event()
{
if f(&ev) {
break;
}
}
}
};
let fut = async_std::future::timeout(Duration::from_secs(10), fut);
futures::executor::block_on(fut).is_ok()
match condition.timeout(Duration::from_secs(10)).await {
Ok(()) => true,
Err(_) => false,
}
}
/// Polls the graph until Poll::Pending is obtained, completing the underlying polls.
fn drain_poll(self) -> Self {
// The future below should return self. Given that it is a FnMut and not a FnOnce, one needs
// to wrap `self` in an Option, leaving a `None` behind after the final `Poll::Ready`.
let mut this = Some(self);
let fut = futures::future::poll_fn(move |cx| match &mut this {
Some(graph) => loop {
match graph.poll_unpin(cx) {
async fn drain_events(&mut self) {
let fut = futures::future::poll_fn(|cx| loop {
match self.nodes.poll_next_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Ready(this.take().unwrap()),
Poll::Pending => return Poll::Ready(()),
}
},
None => panic!("future called after final return"),
});
let fut = async_std::future::timeout(Duration::from_secs(10), fut);
futures::executor::block_on(fut).unwrap()
fut.timeout(Duration::from_secs(10)).await.unwrap();
}
}
fn build_node() -> (Multiaddr, Swarm<gossipsub::Behaviour>) {
let key = identity::Keypair::generate_ed25519();
let public_key = key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(PlainText2Config {
local_public_key: public_key.clone(),
})
.multiplex(yamux::YamuxConfig::default())
.boxed();
let peer_id = public_key.to_peer_id();
async fn build_node() -> Swarm<gossipsub::Behaviour> {
// NOTE: The graph of created nodes can be disconnected from the mesh point of view as nodes
// can reach their d_lo value and not add other nodes to their mesh. To speed up this test, we
// reduce the default values of the heartbeat, so that all nodes will receive gossip in a
// timely fashion.
let mut swarm = Swarm::new_ephemeral(|identity| {
let peer_id = identity.public().to_peer_id();
let config = gossipsub::ConfigBuilder::default()
.heartbeat_initial_delay(Duration::from_millis(100))
.heartbeat_interval(Duration::from_millis(200))
.history_length(10)
.history_gossip(10)
.validation_mode(gossipsub::ValidationMode::Permissive)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let behaviour =
gossipsub::Behaviour::new(gossipsub::MessageAuthenticity::Author(peer_id), config).unwrap();
let mut swarm = Swarm::without_executor(transport, behaviour, peer_id);
gossipsub::Behaviour::new(MessageAuthenticity::Author(peer_id), config).unwrap()
});
swarm.listen().await;
let port = 1 + random::<u64>();
let mut addr: Multiaddr = Protocol::Memory(port).into();
swarm.listen_on(addr.clone()).unwrap();
addr = addr.with(Protocol::P2p(public_key.to_peer_id().into()));
(addr, swarm)
swarm
}
#[test]
@ -191,18 +138,21 @@ fn multi_hop_propagation() {
debug!("number nodes: {:?}, seed: {:?}", num_nodes, seed);
let mut graph = Graph::new_connected(num_nodes as usize, seed);
async_std::task::block_on(async move {
let mut graph = Graph::new_connected(num_nodes as usize, seed).await;
let number_nodes = graph.nodes.len();
// Subscribe each node to the same topic.
let topic = gossipsub::IdentTopic::new("test-net");
for (_addr, node) in &mut graph.nodes {
for node in &mut graph.nodes {
node.behaviour_mut().subscribe(&topic).unwrap();
}
// Wait for all nodes to be subscribed.
let mut subscribed = 0;
let all_subscribed = graph.wait_for(move |ev| {
let all_subscribed = graph
.wait_for(move |ev| {
if let gossipsub::Event::Subscribed { .. } = ev {
subscribed += 1;
if subscribed == (number_nodes - 1) * 2 {
@ -211,7 +161,9 @@ fn multi_hop_propagation() {
}
false
});
})
.await;
if !all_subscribed {
return TestResult::error(format!(
"Timed out waiting for all nodes to subscribe but only have {subscribed:?}/{num_nodes:?}.",
@ -220,18 +172,22 @@ fn multi_hop_propagation() {
// It can happen that the publish occurs before all grafts have completed causing this test
// to fail. We drain all the poll messages before publishing.
graph = graph.drain_poll();
graph.drain_events().await;
// Publish a single message.
graph.nodes[0]
.1
graph
.nodes
.iter_mut()
.next()
.unwrap()
.behaviour_mut()
.publish(topic, vec![1, 2, 3])
.unwrap();
// Wait for all nodes to receive the published message.
let mut received_msgs = 0;
let all_received = graph.wait_for(move |ev| {
let all_received = graph
.wait_for(move |ev| {
if let gossipsub::Event::Message { .. } = ev {
received_msgs += 1;
if received_msgs == number_nodes - 1 {
@ -240,7 +196,9 @@ fn multi_hop_propagation() {
}
false
});
})
.await;
if !all_received {
return TestResult::error(format!(
"Timed out waiting for all nodes to receive the msg but only have {received_msgs:?}/{num_nodes:?}.",
@ -248,6 +206,7 @@ fn multi_hop_propagation() {
}
TestResult::passed()
})
}
QuickCheck::new()

View File

@ -37,6 +37,7 @@ libp2p-swarm = { path = "../../swarm", features = ["tokio", "async-std"] }
libp2p-tcp = { path = "../../transports/tcp", features = ["tokio", "async-io"] }
libp2p-yamux = { path = "../../muxers/yamux" }
tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] }
libp2p-swarm-test = { path = "../../swarm-test" }
[[test]]
name = "use-async-std"

View File

@ -18,21 +18,24 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.use futures::StreamExt;
use futures::StreamExt;
use libp2p_core::{identity, upgrade::Version, PeerId, Transport};
use futures::future::Either;
use libp2p_mdns::Event;
use libp2p_mdns::{async_io::Behaviour, Config};
use libp2p_swarm::{Swarm, SwarmEvent};
use std::error::Error;
use libp2p_swarm_test::SwarmExt as _;
use std::time::Duration;
#[async_std::test]
async fn test_discovery_async_std_ipv4() -> Result<(), Box<dyn Error>> {
async fn test_discovery_async_std_ipv4() {
env_logger::try_init().ok();
run_discovery_test(Config::default()).await
}
#[async_std::test]
async fn test_discovery_async_std_ipv6() -> Result<(), Box<dyn Error>> {
async fn test_discovery_async_std_ipv6() {
env_logger::try_init().ok();
let config = Config {
enable_ipv6: true,
..Default::default()
@ -41,22 +44,40 @@ async fn test_discovery_async_std_ipv6() -> Result<(), Box<dyn Error>> {
}
#[async_std::test]
async fn test_expired_async_std() -> Result<(), Box<dyn Error>> {
async fn test_expired_async_std() {
env_logger::try_init().ok();
let config = Config {
ttl: Duration::from_secs(1),
query_interval: Duration::from_secs(10),
..Default::default()
};
async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config))
.await
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error>)
let mut a = create_swarm(config.clone()).await;
let a_peer_id = *a.local_peer_id();
let mut b = create_swarm(config).await;
let b_peer_id = *b.local_peer_id();
loop {
match futures::future::select(a.next_behaviour_event(), b.next_behaviour_event()).await {
Either::Left((Event::Expired(mut peers), _)) => {
if peers.any(|(p, _)| p == b_peer_id) {
return;
}
}
Either::Right((Event::Expired(mut peers), _)) => {
if peers.any(|(p, _)| p == a_peer_id) {
return;
}
}
_ => {}
}
}
}
#[async_std::test]
async fn test_no_expiration_on_close_async_std() -> Result<(), Box<dyn Error>> {
async fn test_no_expiration_on_close_async_std() {
env_logger::try_init().ok();
let config = Config {
ttl: Duration::from_secs(120),
@ -64,146 +85,70 @@ async fn test_no_expiration_on_close_async_std() -> Result<(), Box<dyn Error>> {
..Default::default()
};
async_std::future::timeout(
Duration::from_secs(6),
run_no_expiration_on_close_test(config),
)
.await
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
let mut a = create_swarm(config.clone()).await;
async fn create_swarm(config: Config) -> Result<Swarm<Behaviour>, Box<dyn Error>> {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
let transport = libp2p_tcp::async_io::Transport::default()
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
let behaviour = Behaviour::new(config, peer_id)?;
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(swarm)
}
let b = create_swarm(config).await;
let b_peer_id = *b.local_peer_id();
async_std::task::spawn(b.loop_on_next());
async fn run_discovery_test(config: Config) -> Result<(), Box<dyn Error>> {
env_logger::try_init().ok();
let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?;
let mut discovered_a = false;
let mut discovered_b = false;
// 1. Connect via address from mDNS event
loop {
futures::select! {
ev = a.select_next_some() => if let SwarmEvent::Behaviour(Event::Discovered(peers)) = ev {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
if discovered_a {
return Ok(());
} else {
discovered_b = true;
}
}
}
},
ev = b.select_next_some() => if let SwarmEvent::Behaviour(Event::Discovered(peers)) = ev {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
if discovered_b {
return Ok(());
} else {
discovered_a = true;
}
}
}
}
}
}
}
async fn run_peer_expiration_test(config: Config) -> Result<(), Box<dyn Error>> {
let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?;
loop {
futures::select! {
ev = a.select_next_some() => if let SwarmEvent::Behaviour(Event::Expired(peers)) = ev {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
return Ok(());
}
}
},
ev = b.select_next_some() => if let SwarmEvent::Behaviour(Event::Expired(peers)) = ev {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
return Ok(());
}
}
}
}
}
}
async fn run_no_expiration_on_close_test(config: Config) -> Result<(), Box<dyn Error>> {
let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?;
#[derive(PartialEq)]
enum State {
Initial,
Dialed,
Closed,
}
let mut state = State::Initial;
loop {
futures::select! {
ev = a.select_next_some() => match ev {
SwarmEvent::Behaviour(Event::Discovered(peers)) => {
if state == State::Initial {
for (peer, addr) in peers {
if peer == *b.local_peer_id() {
// Connect to all addresses of b to 'expire' all of them
a.dial(addr)?;
state = State::Dialed;
if let Event::Discovered(mut peers) = a.next_behaviour_event().await {
if let Some((_, addr)) = peers.find(|(p, _)| p == &b_peer_id) {
a.dial_and_wait(addr).await;
break;
}
}
}
}
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
if peer_id == *b.local_peer_id() {
if state == State::Dialed {
// We disconnect the connection that was initiated
// in the discovered event
a.disconnect_peer_id(peer_id).unwrap();
} else if state == State::Closed {
// If the connection attempt after connection close
// succeeded the mDNS record wasn't expired by
// connection close
return Ok(())
}
}
}
SwarmEvent::ConnectionClosed { peer_id, num_established, .. } => {
if peer_id == *b.local_peer_id() && num_established == 0 {
// Dial a second time to make sure connection is still
// possible only via the peer id
state = State::Closed;
// Either wait for the expiration event to give mDNS enough time to expire
// or timeout after 1 second of not receiving the expiration event
let _ = async_std::future::timeout(Duration::from_secs(1), a.select_next_some()).await;
// 2. Close connection
let _ = a.disconnect_peer_id(b_peer_id);
a.wait(|event| {
matches!(event, SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == b_peer_id)
.then_some(())
})
.await;
// If the record expired this will fail because the peer has no addresses
a.dial(peer_id).expect("Expected peer addresses to not expire after connection close");
// 3. Ensure we can still dial via `PeerId`.
a.dial(b_peer_id).unwrap();
a.wait(|event| {
matches!(event, SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == b_peer_id)
.then_some(())
})
.await;
}
async fn run_discovery_test(config: Config) {
let mut a = create_swarm(config.clone()).await;
let a_peer_id = *a.local_peer_id();
let mut b = create_swarm(config).await;
let b_peer_id = *b.local_peer_id();
let mut discovered_a = false;
let mut discovered_b = false;
while !discovered_a && !discovered_b {
match futures::future::select(a.next_behaviour_event(), b.next_behaviour_event()).await {
Either::Left((Event::Discovered(mut peers), _)) => {
if peers.any(|(p, _)| p == b_peer_id) {
discovered_b = true;
}
}
Either::Right((Event::Discovered(mut peers), _)) => {
if peers.any(|(p, _)| p == a_peer_id) {
discovered_a = true;
}
}
_ => {}
},
_ = b.select_next_some() => {}
}
}
}
async fn create_swarm(config: Config) -> Swarm<Behaviour> {
let mut swarm =
Swarm::new_ephemeral(|key| Behaviour::new(config, key.public().to_peer_id()).unwrap());
swarm.listen().await;
swarm
}

View File

@ -17,21 +17,23 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.use futures::StreamExt;
use futures::StreamExt;
use libp2p_core::{identity, upgrade::Version, PeerId, Transport};
use futures::future::Either;
use libp2p_mdns::{tokio::Behaviour, Config, Event};
use libp2p_swarm::Swarm;
use libp2p_swarm::SwarmEvent;
use std::error::Error;
use libp2p_swarm_test::SwarmExt as _;
use std::time::Duration;
#[tokio::test]
async fn test_discovery_tokio_ipv4() -> Result<(), Box<dyn Error>> {
async fn test_discovery_tokio_ipv4() {
env_logger::try_init().ok();
run_discovery_test(Config::default()).await
}
#[tokio::test]
async fn test_discovery_tokio_ipv6() -> Result<(), Box<dyn Error>> {
async fn test_discovery_tokio_ipv6() {
env_logger::try_init().ok();
let config = Config {
enable_ipv6: true,
..Default::default()
@ -40,110 +42,69 @@ async fn test_discovery_tokio_ipv6() -> Result<(), Box<dyn Error>> {
}
#[tokio::test]
async fn test_expired_tokio() -> Result<(), Box<dyn Error>> {
async fn test_expired_tokio() {
env_logger::try_init().ok();
let config = Config {
ttl: Duration::from_secs(1),
query_interval: Duration::from_secs(10),
..Default::default()
};
run_peer_expiration_test(config).await
let mut a = create_swarm(config.clone()).await;
let a_peer_id = *a.local_peer_id();
let mut b = create_swarm(config).await;
let b_peer_id = *b.local_peer_id();
loop {
match futures::future::select(a.next_behaviour_event(), b.next_behaviour_event()).await {
Either::Left((Event::Expired(mut peers), _)) => {
if peers.any(|(p, _)| p == b_peer_id) {
return;
}
}
Either::Right((Event::Expired(mut peers), _)) => {
if peers.any(|(p, _)| p == a_peer_id) {
return;
}
}
_ => {}
}
}
}
async fn create_swarm(config: Config) -> Result<Swarm<Behaviour>, Box<dyn Error>> {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
let transport = libp2p_tcp::tokio::Transport::default()
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
let behaviour = Behaviour::new(config, peer_id)?;
let mut swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(swarm)
}
async fn run_discovery_test(config: Config) {
let mut a = create_swarm(config.clone()).await;
let a_peer_id = *a.local_peer_id();
let mut b = create_swarm(config).await;
let b_peer_id = *b.local_peer_id();
async fn run_discovery_test(config: Config) -> Result<(), Box<dyn Error>> {
env_logger::try_init().ok();
let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?;
let mut discovered_a = false;
let mut discovered_b = false;
loop {
futures::select! {
ev = a.select_next_some() => if let SwarmEvent::Behaviour(Event::Discovered(peers)) = ev {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
if discovered_a {
return Ok(());
} else {
while !discovered_a && !discovered_b {
match futures::future::select(a.next_behaviour_event(), b.next_behaviour_event()).await {
Either::Left((Event::Discovered(mut peers), _)) => {
if peers.any(|(p, _)| p == b_peer_id) {
discovered_b = true;
}
}
}
},
ev = b.select_next_some() => if let SwarmEvent::Behaviour(Event::Discovered(peers)) = ev {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
if discovered_b {
return Ok(());
} else {
Either::Right((Event::Discovered(mut peers), _)) => {
if peers.any(|(p, _)| p == a_peer_id) {
discovered_a = true;
}
}
}
}
}
}
}
async fn run_peer_expiration_test(config: Config) -> Result<(), Box<dyn Error>> {
let mut a = create_swarm(config.clone()).await?;
let mut b = create_swarm(config).await?;
let expired_at = tokio::time::sleep(Duration::from_secs(15));
tokio::pin!(expired_at);
loop {
tokio::select! {
_ev = &mut expired_at => {
panic!();
},
ev = a.select_next_some() => match ev {
SwarmEvent::Behaviour(Event::Expired(peers)) => {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
return Ok(());
}
}
}
SwarmEvent::Behaviour(Event::Discovered(peers)) => {
for (peer, _addr) in peers {
if peer == *b.local_peer_id() {
expired_at.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(2));
}
}
}
_ => {}
},
ev = b.select_next_some() => match ev {
SwarmEvent::Behaviour(Event::Expired(peers)) => {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
return Ok(());
}
}
}
SwarmEvent::Behaviour(Event::Discovered(peers)) => {
for (peer, _addr) in peers {
if peer == *a.local_peer_id() {
expired_at.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(2));
}
}
}
_ => {}
}
}
}
async fn create_swarm(config: Config) -> Swarm<Behaviour> {
let mut swarm =
Swarm::new_ephemeral(|key| Behaviour::new(config, key.public().to_peer_id()).unwrap());
swarm.listen().await;
swarm
}

View File

@ -23,11 +23,9 @@ void = "1.0"
[dev-dependencies]
async-std = "1.6.2"
libp2p-mplex = { path = "../../muxers/mplex" }
libp2p-noise = { path = "../../transports/noise" }
libp2p-swarm = { path = "../../swarm", features = ["async-std", "macros"] }
libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] }
libp2p-yamux = { path = "../../muxers/yamux" }
env_logger = "0.10.0"
libp2p-swarm = { path = "../../swarm", features = ["macros"] }
libp2p-swarm-test = { path = "../../swarm-test" }
quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" }
# Passing arguments to the docsrs builder in order to properly document cfg's.

View File

@ -20,214 +20,122 @@
//! Integration tests for the `Ping` network behaviour.
use either::Either;
use futures::{channel::mpsc, prelude::*};
use libp2p_core::{
identity,
muxing::StreamMuxerBox,
transport::{self, Transport},
upgrade, Multiaddr, PeerId,
};
use libp2p_mplex as mplex;
use libp2p_noise as noise;
use futures::prelude::*;
use libp2p_ping as ping;
use libp2p_swarm::keep_alive;
use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent};
use libp2p_tcp as tcp;
use libp2p_yamux as yamux;
use libp2p_swarm_test::SwarmExt;
use quickcheck::*;
use std::{num::NonZeroU8, time::Duration};
#[test]
fn ping_pong() {
fn prop(count: NonZeroU8, muxer: MuxerChoice) {
fn prop(count: NonZeroU8) {
let cfg = ping::Config::new().with_interval(Duration::from_millis(10));
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 =
Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id);
let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone()));
let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone()));
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg), peer2_id);
async_std::task::block_on(async {
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let pid1 = peer1_id;
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
swarm1.listen_on(addr).unwrap();
let mut count1 = count.get();
let mut count2 = count.get();
let peer1 = async move {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
peer,
result: Ok(ping::Success::Ping { rtt }),
})) => {
count1 -= 1;
if count1 == 0 {
return (pid1, peer, rtt);
}
}
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(e),
..
})) => {
panic!("Ping failure: {e:?}")
}
_ => {}
}
}
for _ in 0..count.get() {
let (e1, e2) = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await {
([BehaviourEvent::Ping(e1)], [BehaviourEvent::Ping(e2)]) => (e1, e2),
events => panic!("Unexpected events: {events:?}"),
};
let pid2 = peer2_id;
let peer2 = async move {
swarm2.dial(rx.next().await.unwrap()).unwrap();
assert_eq!(&e1.peer, swarm2.local_peer_id());
assert_eq!(&e2.peer, swarm1.local_peer_id());
loop {
match swarm2.select_next_some().await {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
peer,
result: Ok(ping::Success::Ping { rtt }),
})) => {
count2 -= 1;
if count2 == 0 {
return (pid2, peer, rtt);
assert_ping_rtt_less_than_50ms(e1);
assert_ping_rtt_less_than_50ms(e2);
}
}
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(e),
..
})) => {
panic!("Ping failure: {e:?}")
}
_ => {}
}
}
};
let result = future::select(Box::pin(peer1), Box::pin(peer2));
let ((p1, p2, rtt), _) = async_std::task::block_on(result).factor_first();
assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id);
assert!(rtt < Duration::from_millis(50));
});
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_, _))
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
fn assert_ping_rtt_less_than_50ms(e: ping::Event) {
let success = e.result.expect("a ping success");
if let ping::Success::Ping { rtt } = success {
assert!(rtt < Duration::from_millis(50))
}
}
/// Tests that the connection is closed upon a configurable
/// number of consecutive ping failures.
#[test]
fn max_failures() {
fn prop(max_failures: NonZeroU8, muxer: MuxerChoice) {
fn prop(max_failures: NonZeroU8) {
let cfg = ping::Config::new()
.with_interval(Duration::from_millis(10))
.with_timeout(Duration::from_millis(0))
.with_max_failures(max_failures.into());
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 =
Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id);
let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone()));
let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone()));
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg), peer2_id);
let (count1, count2) = async_std::task::block_on(async {
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
future::join(
count_ping_failures_until_connection_closed(swarm1),
count_ping_failures_until_connection_closed(swarm2),
)
.await
});
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
swarm1.listen_on(addr).unwrap();
let peer1 = async move {
let mut count1: u8 = 0;
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Ok(ping::Success::Ping { .. }),
..
})) => {
count1 = 0; // there may be an occasional success
}
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(_),
..
})) => {
count1 += 1;
}
SwarmEvent::ConnectionClosed { .. } => return count1,
_ => {}
}
}
};
let peer2 = async move {
swarm2.dial(rx.next().await.unwrap()).unwrap();
let mut count2: u8 = 0;
loop {
match swarm2.select_next_some().await {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Ok(ping::Success::Ping { .. }),
..
})) => {
count2 = 0; // there may be an occasional success
}
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(_),
..
})) => {
count2 += 1;
}
SwarmEvent::ConnectionClosed { .. } => return count2,
_ => {}
}
}
};
let future = future::join(peer1, peer2);
let (count1, count2) = async_std::task::block_on(future);
assert_eq!(u8::max(count1, count2), max_failures.get() - 1);
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_, _))
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
async fn count_ping_failures_until_connection_closed(mut swarm: Swarm<Behaviour>) -> u8 {
let mut failure_count = 0;
loop {
match swarm.next_swarm_event().await {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Ok(ping::Success::Ping { .. }),
..
})) => {
failure_count = 0; // there may be an occasional success
}
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { result: Err(_), .. })) => {
failure_count += 1;
}
SwarmEvent::ConnectionClosed { .. } => {
return failure_count;
}
_ => {}
}
}
}
#[test]
fn unsupported_doesnt_fail() {
let (peer1_id, trans) = mk_transport(MuxerChoice::Mplex);
let mut swarm1 = Swarm::with_async_std_executor(trans, keep_alive::Behaviour, peer1_id);
let mut swarm1 = Swarm::new_ephemeral(|_| keep_alive::Behaviour);
let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(ping::Config::new()));
let (peer2_id, trans) = mk_transport(MuxerChoice::Mplex);
let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::default(), peer2_id);
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
swarm1.listen_on(addr).unwrap();
async_std::task::spawn(async move {
loop {
if let SwarmEvent::NewListenAddr { address, .. } = swarm1.select_next_some().await {
tx.send(address).await.unwrap()
}
}
});
let result = async_std::task::block_on(async move {
swarm2.dial(rx.next().await.unwrap()).unwrap();
let result = async_std::task::block_on(async {
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
let swarm1_peer_id = *swarm1.local_peer_id();
async_std::task::spawn(swarm1.loop_on_next());
loop {
match swarm2.select_next_some().await {
match swarm2.next_swarm_event().await {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(ping::Failure::Unsupported),
..
})) => {
swarm2.disconnect_peer_id(peer1_id).unwrap();
swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();
}
SwarmEvent::ConnectionClosed { cause: Some(e), .. } => {
break Err(e);
@ -243,34 +151,6 @@ fn unsupported_doesnt_fail() {
result.expect("node with ping should not fail connection due to unsupported protocol");
}
fn mk_transport(muxer: MuxerChoice) -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().to_peer_id();
(
peer_id,
tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&id_keys).unwrap())
.multiplex(match muxer {
MuxerChoice::Yamux => Either::Left(yamux::YamuxConfig::default()),
MuxerChoice::Mplex => Either::Right(mplex::MplexConfig::default()),
})
.boxed(),
)
}
#[derive(Debug, Copy, Clone)]
enum MuxerChoice {
Mplex,
Yamux,
}
impl Arbitrary for MuxerChoice {
fn arbitrary(g: &mut Gen) -> MuxerChoice {
*g.choose(&[MuxerChoice::Mplex, MuxerChoice::Yamux]).unwrap()
}
}
#[derive(NetworkBehaviour, Default)]
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct Behaviour {

View File

@ -37,6 +37,7 @@ libp2p-yamux = { path = "../../muxers/yamux" }
libp2p-tcp = { path = "../../transports/tcp", features = ["tokio"] }
rand = "0.8"
tokio = { version = "1.15", features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] }
libp2p-swarm-test = { path = "../../swarm-test" }
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling

View File

@ -1,228 +0,0 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use async_trait::async_trait;
use futures::stream::FusedStream;
use futures::StreamExt;
use futures::{future, Stream};
use libp2p_core::transport::upgrade::Version;
use libp2p_core::transport::MemoryTransport;
use libp2p_core::upgrade::SelectUpgrade;
use libp2p_core::{identity, Multiaddr, PeerId, Transport};
use libp2p_mplex::MplexConfig;
use libp2p_noise::NoiseAuthenticated;
use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent};
use libp2p_yamux::YamuxConfig;
use std::fmt::Debug;
use std::time::Duration;
pub fn new_swarm<B, F>(behaviour_fn: F) -> Swarm<B>
where
B: NetworkBehaviour,
<B as NetworkBehaviour>::OutEvent: Debug,
B: NetworkBehaviour,
F: FnOnce(PeerId, identity::Keypair) -> B,
{
let identity = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(identity.public());
let transport = MemoryTransport::default()
.upgrade(Version::V1)
.authenticate(NoiseAuthenticated::xx(&identity).unwrap())
.multiplex(SelectUpgrade::new(
YamuxConfig::default(),
MplexConfig::new(),
))
.timeout(Duration::from_secs(5))
.boxed();
Swarm::with_tokio_executor(transport, behaviour_fn(peer_id, identity), peer_id)
}
fn get_rand_memory_address() -> Multiaddr {
let address_port = rand::random::<u64>();
format!("/memory/{address_port}")
.parse::<Multiaddr>()
.unwrap()
}
pub async fn await_event_or_timeout<Event, Error>(
swarm: &mut (impl Stream<Item = SwarmEvent<Event, Error>> + FusedStream + Unpin),
) -> SwarmEvent<Event, Error>
where
SwarmEvent<Event, Error>: Debug,
{
tokio::time::timeout(
Duration::from_secs(30),
swarm
.inspect(|event| log::debug!("Swarm emitted {:?}", event))
.select_next_some(),
)
.await
.expect("network behaviour to emit an event within 30 seconds")
}
pub async fn await_events_or_timeout<Event1, Event2, Error1, Error2>(
swarm_1: &mut (impl Stream<Item = SwarmEvent<Event1, Error1>> + FusedStream + Unpin),
swarm_2: &mut (impl Stream<Item = SwarmEvent<Event2, Error2>> + FusedStream + Unpin),
) -> (SwarmEvent<Event1, Error1>, SwarmEvent<Event2, Error2>)
where
SwarmEvent<Event1, Error1>: Debug,
SwarmEvent<Event2, Error2>: Debug,
{
tokio::time::timeout(
Duration::from_secs(30),
future::join(
swarm_1
.inspect(|event| log::debug!("Swarm1 emitted {:?}", event))
.select_next_some(),
swarm_2
.inspect(|event| log::debug!("Swarm2 emitted {:?}", event))
.select_next_some(),
),
)
.await
.expect("network behaviours to emit an event within 30 seconds")
}
#[macro_export]
macro_rules! assert_behaviour_events {
($swarm: ident: $pat: pat, || $body: block) => {
match await_event_or_timeout(&mut $swarm).await {
libp2p_swarm::SwarmEvent::Behaviour($pat) => $body,
_ => panic!("Unexpected combination of events emitted, check logs for details"),
}
};
($swarm1: ident: $pat1: pat, $swarm2: ident: $pat2: pat, || $body: block) => {
match await_events_or_timeout(&mut $swarm1, &mut $swarm2).await {
(
libp2p_swarm::SwarmEvent::Behaviour($pat1),
libp2p_swarm::SwarmEvent::Behaviour($pat2),
) => $body,
_ => panic!("Unexpected combination of events emitted, check logs for details"),
}
};
}
/// An extension trait for [`Swarm`] that makes it easier to set up a network of [`Swarm`]s for tests.
#[async_trait]
pub trait SwarmExt {
/// Establishes a connection to the given [`Swarm`], polling both of them until the connection is established.
async fn block_on_connection<T>(&mut self, other: &mut Swarm<T>)
where
T: NetworkBehaviour + Send,
<T as NetworkBehaviour>::OutEvent: Debug;
/// Listens on a random memory address, polling the [`Swarm`] until the transport is ready to accept connections.
async fn listen_on_random_memory_address(&mut self) -> Multiaddr;
/// Spawns the given [`Swarm`] into a runtime, polling it endlessly.
fn spawn_into_runtime(self);
}
#[async_trait]
impl<B> SwarmExt for Swarm<B>
where
B: NetworkBehaviour + Send,
<B as NetworkBehaviour>::OutEvent: Debug,
{
async fn block_on_connection<T>(&mut self, other: &mut Swarm<T>)
where
T: NetworkBehaviour + Send,
<T as NetworkBehaviour>::OutEvent: Debug,
{
let addr_to_dial = other.external_addresses().next().unwrap().addr.clone();
self.dial(addr_to_dial.clone()).unwrap();
let mut dialer_done = false;
let mut listener_done = false;
loop {
let dialer_event_fut = self.select_next_some();
tokio::select! {
dialer_event = dialer_event_fut => {
match dialer_event {
SwarmEvent::ConnectionEstablished { .. } => {
dialer_done = true;
}
other => {
log::debug!("Ignoring {:?}", other);
}
}
},
listener_event = other.select_next_some() => {
match listener_event {
SwarmEvent::ConnectionEstablished { .. } => {
listener_done = true;
}
SwarmEvent::IncomingConnectionError { error, .. } => {
panic!("Failure in incoming connection {error}");
}
other => {
log::debug!("Ignoring {:?}", other);
}
}
}
}
if dialer_done && listener_done {
return;
}
}
}
async fn listen_on_random_memory_address(&mut self) -> Multiaddr {
let memory_addr_listener_id = self.listen_on(get_rand_memory_address()).unwrap();
// block until we are actually listening
let multiaddr = loop {
match self.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == memory_addr_listener_id => {
break address;
}
other => {
log::debug!(
"Ignoring {:?} while waiting for listening to succeed",
other
);
}
}
};
// Memory addresses are externally reachable because they all share the same memory-space.
self.add_external_address(multiaddr.clone(), AddressScore::Infinite);
multiaddr
}
fn spawn_into_runtime(mut self) {
tokio::spawn(async move {
loop {
self.next().await;
}
});
}
}

View File

@ -18,15 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
#[macro_use]
pub mod harness;
use crate::harness::{await_event_or_timeout, await_events_or_timeout, new_swarm, SwarmExt};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use libp2p_core::identity;
use libp2p_rendezvous as rendezvous;
use libp2p_swarm::{DialError, Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt;
use std::convert::TryInto;
use std::time::Duration;
@ -41,26 +38,32 @@ async fn given_successful_registration_then_successful_discovery() {
.behaviour_mut()
.register(namespace.clone(), *robert.local_peer_id(), None);
assert_behaviour_events! {
alice: rendezvous::client::Event::Registered { rendezvous_node, ttl, namespace: register_node_namespace },
robert: rendezvous::server::Event::PeerRegistered { peer, registration },
|| {
match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
[rendezvous::client::Event::Registered {
rendezvous_node,
ttl,
namespace: register_node_namespace,
}],
[rendezvous::server::Event::PeerRegistered { peer, registration }],
) => {
assert_eq!(&peer, alice.local_peer_id());
assert_eq!(&rendezvous_node, robert.local_peer_id());
assert_eq!(registration.namespace, namespace);
assert_eq!(register_node_namespace, namespace);
assert_eq!(ttl, rendezvous::DEFAULT_TTL);
}
};
events => panic!("Unexpected events: {events:?}"),
}
bob.behaviour_mut()
.discover(Some(namespace.clone()), None, None, *robert.local_peer_id());
assert_behaviour_events! {
bob: rendezvous::client::Event::Discovered { registrations, .. },
robert: rendezvous::server::Event::DiscoverServed { .. },
|| {
match registrations.as_slice() {
match libp2p_swarm_test::drive(&mut bob, &mut robert).await {
(
[rendezvous::client::Event::Discovered { registrations, .. }],
[rendezvous::server::Event::DiscoverServed { .. }],
) => match registrations.as_slice() {
[rendezvous::Registration {
namespace: registered_namespace,
record,
@ -71,10 +74,10 @@ async fn given_successful_registration_then_successful_discovery() {
assert_eq!(*registered_namespace, namespace);
}
_ => panic!("Expected exactly one registration to be returned from discover"),
},
events => panic!("Unexpected events: {events:?}"),
}
}
};
}
#[tokio::test]
async fn given_successful_registration_then_refresh_ttl() {
@ -90,49 +93,55 @@ async fn given_successful_registration_then_refresh_ttl() {
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, None);
assert_behaviour_events! {
alice: rendezvous::client::Event::Registered { .. },
robert: rendezvous::server::Event::PeerRegistered { .. },
|| { }
};
match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
[rendezvous::client::Event::Registered { .. }],
[rendezvous::server::Event::PeerRegistered { .. }],
) => {}
events => panic!("Unexpected events: {events:?}"),
}
bob.behaviour_mut()
.discover(Some(namespace.clone()), None, None, roberts_peer_id);
assert_behaviour_events! {
bob: rendezvous::client::Event::Discovered { .. },
robert: rendezvous::server::Event::DiscoverServed { .. },
|| { }
};
match libp2p_swarm_test::drive(&mut bob, &mut robert).await {
(
[rendezvous::client::Event::Discovered { .. }],
[rendezvous::server::Event::DiscoverServed { .. }],
) => {}
events => panic!("Unexpected events: {events:?}"),
}
alice
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, Some(refresh_ttl));
assert_behaviour_events! {
alice: rendezvous::client::Event::Registered { ttl, .. },
robert: rendezvous::server::Event::PeerRegistered { .. },
|| {
match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
[rendezvous::client::Event::Registered { ttl, .. }],
[rendezvous::server::Event::PeerRegistered { .. }],
) => {
assert_eq!(ttl, refresh_ttl);
}
};
events => panic!("Unexpected events: {events:?}"),
}
bob.behaviour_mut()
.discover(Some(namespace.clone()), None, None, *robert.local_peer_id());
assert_behaviour_events! {
bob: rendezvous::client::Event::Discovered { registrations, .. },
robert: rendezvous::server::Event::DiscoverServed { .. },
|| {
match registrations.as_slice() {
match libp2p_swarm_test::drive(&mut bob, &mut robert).await {
(
[rendezvous::client::Event::Discovered { registrations, .. }],
[rendezvous::server::Event::DiscoverServed { .. }],
) => match registrations.as_slice() {
[rendezvous::Registration { ttl, .. }] => {
assert_eq!(*ttl, refresh_ttl);
}
_ => panic!("Expected exactly one registration to be returned from discover"),
},
events => panic!("Unexpected events: {events:?}"),
}
}
};
}
#[tokio::test]
async fn given_invalid_ttl_then_unsuccessful_registration() {
@ -147,13 +156,18 @@ async fn given_invalid_ttl_then_unsuccessful_registration() {
Some(100_000_000),
);
assert_behaviour_events! {
alice: rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote {error , ..}),
robert: rendezvous::server::Event::PeerNotRegistered { .. },
|| {
match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
[rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote {
error,
..
})],
[rendezvous::server::Event::PeerNotRegistered { .. }],
) => {
assert_eq!(error, rendezvous::ErrorCode::InvalidTtl);
}
};
events => panic!("Unexpected events: {events:?}"),
}
}
#[tokio::test]
@ -164,24 +178,24 @@ async fn discover_allows_for_dial_by_peer_id() {
new_server_with_connected_clients(rendezvous::server::Config::default()).await;
let roberts_peer_id = *robert.local_peer_id();
robert.spawn_into_runtime();
tokio::spawn(robert.loop_on_next());
alice
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, None);
assert_behaviour_events! {
alice: rendezvous::client::Event::Registered { .. },
|| { }
};
match alice.next_behaviour_event().await {
rendezvous::client::Event::Registered { .. } => {}
event => panic!("Unexpected event: {event:?}"),
}
bob.behaviour_mut()
.discover(Some(namespace.clone()), None, None, roberts_peer_id);
assert_behaviour_events! {
bob: rendezvous::client::Event::Discovered { registrations,.. },
|| {
match bob.next_behaviour_event().await {
rendezvous::client::Event::Discovered { registrations, .. } => {
assert!(!registrations.is_empty());
}
};
event => panic!("Unexpected event: {event:?}"),
}
let alices_peer_id = *alice.local_peer_id();
let bobs_peer_id = *bob.local_peer_id();
@ -216,18 +230,23 @@ async fn eve_cannot_register() {
let namespace = rendezvous::Namespace::from_static("some-namespace");
let mut robert = new_server(rendezvous::server::Config::default()).await;
let mut eve = new_impersonating_client().await;
eve.block_on_connection(&mut robert).await;
eve.connect(&mut robert).await;
eve.behaviour_mut()
.register(namespace.clone(), *robert.local_peer_id(), None);
assert_behaviour_events! {
eve: rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote { error: err_code , ..}),
robert: rendezvous::server::Event::PeerNotRegistered { .. },
|| {
match libp2p_swarm_test::drive(&mut eve, &mut robert).await {
(
[rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote {
error: err_code,
..
})],
[rendezvous::server::Event::PeerNotRegistered { .. }],
) => {
assert_eq!(err_code, rendezvous::ErrorCode::NotAuthorized);
}
};
events => panic!("Unexpected events: {events:?}"),
}
}
// test if charlie can operate as client and server simultaneously
@ -238,29 +257,31 @@ async fn can_combine_client_and_server() {
let ([mut alice], mut robert) =
new_server_with_connected_clients(rendezvous::server::Config::default()).await;
let mut charlie = new_combined_node().await;
charlie.block_on_connection(&mut robert).await;
alice.block_on_connection(&mut charlie).await;
charlie.connect(&mut robert).await;
alice.connect(&mut charlie).await;
charlie
.behaviour_mut()
.client
.register(namespace.clone(), *robert.local_peer_id(), None);
assert_behaviour_events! {
charlie: CombinedEvent::Client(rendezvous::client::Event::Registered { .. }),
robert: rendezvous::server::Event::PeerRegistered { .. },
|| { }
};
match libp2p_swarm_test::drive(&mut charlie, &mut robert).await {
(
[CombinedEvent::Client(rendezvous::client::Event::Registered { .. })],
[rendezvous::server::Event::PeerRegistered { .. }],
) => {}
events => panic!("Unexpected events: {events:?}"),
}
alice
.behaviour_mut()
.register(namespace, *charlie.local_peer_id(), None);
assert_behaviour_events! {
charlie: CombinedEvent::Server(rendezvous::server::Event::PeerRegistered { .. }),
alice: rendezvous::client::Event::Registered { .. },
|| { }
};
match libp2p_swarm_test::drive(&mut charlie, &mut alice).await {
(
[CombinedEvent::Server(rendezvous::server::Event::PeerRegistered { .. })],
[rendezvous::client::Event::Registered { .. }],
) => {}
events => panic!("Unexpected events: {events:?}"),
}
}
#[tokio::test]
@ -272,25 +293,25 @@ async fn registration_on_clients_expire() {
.await;
let roberts_peer_id = *robert.local_peer_id();
robert.spawn_into_runtime();
tokio::spawn(robert.loop_on_next());
let registration_ttl = 3;
alice
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, Some(registration_ttl));
assert_behaviour_events! {
alice: rendezvous::client::Event::Registered { .. },
|| { }
};
match alice.next_behaviour_event().await {
rendezvous::client::Event::Registered { .. } => {}
event => panic!("Unexpected event: {event:?}"),
}
bob.behaviour_mut()
.discover(Some(namespace), None, None, roberts_peer_id);
assert_behaviour_events! {
bob: rendezvous::client::Event::Discovered { registrations,.. },
|| {
match bob.next_behaviour_event().await {
rendezvous::client::Event::Discovered { registrations, .. } => {
assert!(!registrations.is_empty());
}
};
event => panic!("Unexpected event: {event:?}"),
}
tokio::time::sleep(Duration::from_secs(registration_ttl + 5)).await;
@ -324,33 +345,33 @@ async fn new_server_with_connected_clients<const N: usize>(
};
for client in &mut clients {
client.block_on_connection(&mut server).await;
client.connect(&mut server).await;
}
(clients, server)
}
async fn new_client() -> Swarm<rendezvous::client::Behaviour> {
let mut client = new_swarm(|_, identity| rendezvous::client::Behaviour::new(identity));
client.listen_on_random_memory_address().await; // we need to listen otherwise we don't have addresses to register
let mut client = Swarm::new_ephemeral(rendezvous::client::Behaviour::new);
client.listen().await; // we need to listen otherwise we don't have addresses to register
client
}
async fn new_server(config: rendezvous::server::Config) -> Swarm<rendezvous::server::Behaviour> {
let mut server = new_swarm(|_, _| rendezvous::server::Behaviour::new(config));
let mut server = Swarm::new_ephemeral(|_| rendezvous::server::Behaviour::new(config));
server.listen_on_random_memory_address().await;
server.listen().await;
server
}
async fn new_combined_node() -> Swarm<CombinedBehaviour> {
let mut node = new_swarm(|_, identity| CombinedBehaviour {
async fn new_combined_node() -> Swarm<Combined> {
let mut node = Swarm::new_ephemeral(|identity| Combined {
client: rendezvous::client::Behaviour::new(identity),
server: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()),
});
node.listen_on_random_memory_address().await;
node.listen().await;
node
}
@ -360,38 +381,15 @@ async fn new_impersonating_client() -> Swarm<rendezvous::client::Behaviour> {
// Due to the type-safe API of the `Rendezvous` behaviour and `PeerRecord`, we actually cannot construct a bad `PeerRecord` (i.e. one that is claims to be someone else).
// As such, the best we can do is hand eve a completely different keypair from what she is using to authenticate her connection.
let someone_else = identity::Keypair::generate_ed25519();
let mut eve = new_swarm(move |_, _| rendezvous::client::Behaviour::new(someone_else));
eve.listen_on_random_memory_address().await;
let mut eve = Swarm::new_ephemeral(move |_| rendezvous::client::Behaviour::new(someone_else));
eve.listen().await;
eve
}
#[derive(libp2p_swarm::NetworkBehaviour)]
#[behaviour(
event_process = false,
out_event = "CombinedEvent",
prelude = "libp2p_swarm::derive_prelude"
)]
struct CombinedBehaviour {
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct Combined {
client: rendezvous::client::Behaviour,
server: rendezvous::server::Behaviour,
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum CombinedEvent {
Client(rendezvous::client::Event),
Server(rendezvous::server::Event),
}
impl From<rendezvous::server::Event> for CombinedEvent {
fn from(v: rendezvous::server::Event) -> Self {
Self::Server(v)
}
}
impl From<rendezvous::client::Event> for CombinedEvent {
fn from(v: rendezvous::client::Event) -> Self {
Self::Client(v)
}
}

View File

@ -20,12 +20,13 @@ rand = "0.8"
smallvec = "1.6.1"
[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10.0"
libp2p-noise = { path = "../../transports/noise" }
libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] }
libp2p-yamux = { path = "../../muxers/yamux" }
rand = "0.8"
libp2p-swarm-test = { path = "../../swarm-test" }
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling

View File

@ -21,44 +21,45 @@
//! Integration tests for the `Behaviour`.
use async_trait::async_trait;
use futures::{channel::mpsc, prelude::*, AsyncWriteExt};
use futures::{prelude::*, AsyncWriteExt};
use libp2p_core::{
identity,
muxing::StreamMuxerBox,
transport,
upgrade::{self, read_length_prefixed, write_length_prefixed},
Multiaddr, PeerId, Transport,
upgrade::{read_length_prefixed, write_length_prefixed},
PeerId, ProtocolName,
};
use libp2p_noise::NoiseAuthenticated;
use libp2p_request_response::*;
use libp2p_request_response as request_response;
use libp2p_request_response::ProtocolSupport;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_tcp as tcp;
use libp2p_swarm_test::SwarmExt;
use rand::{self, Rng};
use std::{io, iter};
#[test]
fn is_response_outbound() {
#[async_std::test]
async fn is_response_outbound() {
let _ = env_logger::try_init();
let ping = Ping("ping".to_string().into_bytes());
let offline_peer = PeerId::random();
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
let cfg = Config::default();
let protocols = iter::once((PingProtocol(), request_response::ProtocolSupport::Full));
let cfg = request_response::Config::default();
let (peer1_id, trans) = mk_transport();
let ping_proto1 = Behaviour::new(PingCodec(), protocols, cfg);
let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id);
let mut swarm1 =
Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg));
let request_id1 = swarm1
.behaviour_mut()
.send_request(&offline_peer, ping.clone());
match futures::executor::block_on(swarm1.select_next_some()) {
SwarmEvent::Behaviour(Event::OutboundFailure {
match swarm1
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
request_response::Event::OutboundFailure {
peer,
request_id: req_id,
error: _error,
}) => {
} => {
assert_eq!(&offline_peer, &peer);
assert_eq!(req_id, request_id1);
}
@ -76,38 +77,35 @@ fn is_response_outbound() {
}
/// Exercises a simple ping protocol.
#[test]
fn ping_protocol() {
#[async_std::test]
async fn ping_protocol() {
let ping = Ping("ping".to_string().into_bytes());
let pong = Pong("pong".to_string().into_bytes());
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
let cfg = Config::default();
let cfg = request_response::Config::default();
let (peer1_id, trans) = mk_transport();
let ping_proto1 = Behaviour::new(PingCodec(), protocols.clone(), cfg.clone());
let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id);
let mut swarm1 = Swarm::new_ephemeral(|_| {
request_response::Behaviour::new(PingCodec(), protocols.clone(), cfg.clone())
});
let peer1_id = *swarm1.local_peer_id();
let mut swarm2 =
Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg));
let peer2_id = *swarm2.local_peer_id();
let (peer2_id, trans) = mk_transport();
let ping_proto2 = Behaviour::new(PingCodec(), protocols, cfg);
let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id);
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
swarm1.listen_on(addr).unwrap();
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
let expected_ping = ping.clone();
let expected_pong = pong.clone();
let peer1 = async move {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(Event::Message {
match swarm1.next_swarm_event().await.try_into_behaviour_event() {
Ok(request_response::Event::Message {
peer,
message:
Message::Request {
request_response::Message::Request {
request, channel, ..
},
}) => {
@ -118,34 +116,40 @@ fn ping_protocol() {
.send_response(channel, pong.clone())
.unwrap();
}
SwarmEvent::Behaviour(Event::ResponseSent { peer, .. }) => {
Ok(request_response::Event::ResponseSent { peer, .. }) => {
assert_eq!(&peer, &peer2_id);
}
SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {e:?}"),
_ => {}
Ok(e) => {
panic!("Peer1: Unexpected event: {e:?}")
}
Err(..) => {}
}
}
};
let num_pings: u8 = rand::thread_rng().gen_range(1..100);
let peer2 = async move {
let peer2 = async {
let mut count = 0;
let addr = rx.next().await.unwrap();
swarm2.behaviour_mut().add_address(&peer1_id, addr.clone());
let mut req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));
loop {
match swarm2.select_next_some().await {
SwarmEvent::Behaviour(Event::Message {
match swarm2
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
request_response::Event::Message {
peer,
message:
Message::Response {
request_response::Message::Response {
request_id,
response,
},
}) => {
} => {
count += 1;
assert_eq!(&response, &expected_pong);
assert_eq!(&peer, &peer1_id);
@ -156,48 +160,42 @@ fn ping_protocol() {
req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
}
}
SwarmEvent::Behaviour(e) => panic!("Peer2: Unexpected event: {e:?}"),
_ => {}
e => panic!("Peer2: Unexpected event: {e:?}"),
}
}
};
async_std::task::spawn(Box::pin(peer1));
let () = async_std::task::block_on(peer2);
peer2.await;
}
#[test]
fn emits_inbound_connection_closed_failure() {
#[async_std::test]
async fn emits_inbound_connection_closed_failure() {
let ping = Ping("ping".to_string().into_bytes());
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
let cfg = Config::default();
let cfg = request_response::Config::default();
let (peer1_id, trans) = mk_transport();
let ping_proto1 = Behaviour::new(PingCodec(), protocols.clone(), cfg.clone());
let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id);
let mut swarm1 = Swarm::new_ephemeral(|_| {
request_response::Behaviour::new(PingCodec(), protocols.clone(), cfg.clone())
});
let peer1_id = *swarm1.local_peer_id();
let mut swarm2 =
Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg));
let peer2_id = *swarm2.local_peer_id();
let (peer2_id, trans) = mk_transport();
let ping_proto2 = Behaviour::new(PingCodec(), protocols, cfg);
let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id);
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
swarm1.listen_on(addr).unwrap();
futures::executor::block_on(async move {
while swarm1.next().now_or_never().is_some() {}
let addr1 = Swarm::listeners(&swarm1).next().unwrap();
swarm2.behaviour_mut().add_address(&peer1_id, addr1.clone());
swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
// Wait for swarm 1 to receive request by swarm 2.
let _channel = loop {
futures::select!(
event = swarm1.select_next_some() => match event {
SwarmEvent::Behaviour(Event::Message {
SwarmEvent::Behaviour(request_response::Event::Message {
peer,
message: Message::Request { request, channel, .. }
message: request_response::Message::Request { request, channel, .. }
}) => {
assert_eq!(&request, &ping);
assert_eq!(&peer, &peer2_id);
@ -219,15 +217,14 @@ fn emits_inbound_connection_closed_failure() {
loop {
match swarm1.select_next_some().await {
SwarmEvent::Behaviour(Event::InboundFailure {
error: InboundFailure::ConnectionClosed,
SwarmEvent::Behaviour(request_response::Event::InboundFailure {
error: request_response::InboundFailure::ConnectionClosed,
..
}) => break,
SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {e:?}"),
_ => {}
}
}
});
}
/// We expect the substream to be properly closed when response channel is dropped.
@ -235,38 +232,33 @@ fn emits_inbound_connection_closed_failure() {
/// early close as a protocol violation which results in the connection being closed.
/// If the substream were not properly closed when dropped, the sender would instead
/// run into a timeout waiting for the response.
#[test]
fn emits_inbound_connection_closed_if_channel_is_dropped() {
#[async_std::test]
async fn emits_inbound_connection_closed_if_channel_is_dropped() {
let ping = Ping("ping".to_string().into_bytes());
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
let cfg = Config::default();
let cfg = request_response::Config::default();
let (peer1_id, trans) = mk_transport();
let ping_proto1 = Behaviour::new(PingCodec(), protocols.clone(), cfg.clone());
let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id);
let mut swarm1 = Swarm::new_ephemeral(|_| {
request_response::Behaviour::new(PingCodec(), protocols.clone(), cfg.clone())
});
let peer1_id = *swarm1.local_peer_id();
let mut swarm2 =
Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg));
let peer2_id = *swarm2.local_peer_id();
let (peer2_id, trans) = mk_transport();
let ping_proto2 = Behaviour::new(PingCodec(), protocols, cfg);
let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id);
swarm1.listen().await;
swarm2.connect(&mut swarm1).await;
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
swarm1.listen_on(addr).unwrap();
futures::executor::block_on(async move {
while swarm1.next().now_or_never().is_some() {}
let addr1 = Swarm::listeners(&swarm1).next().unwrap();
swarm2.behaviour_mut().add_address(&peer1_id, addr1.clone());
swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
// Wait for swarm 1 to receive request by swarm 2.
let event = loop {
futures::select!(
event = swarm1.select_next_some() => {
if let SwarmEvent::Behaviour(Event::Message {
if let SwarmEvent::Behaviour(request_response::Event::Message {
peer,
message: Message::Request { request, channel, .. }
message: request_response::Message::Request { request, channel, .. }
}) = event {
assert_eq!(&request, &ping);
assert_eq!(&peer, &peer2_id);
@ -284,26 +276,11 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() {
};
let error = match event {
Event::OutboundFailure { error, .. } => error,
request_response::Event::OutboundFailure { error, .. } => error,
e => panic!("unexpected event from peer 2: {e:?}"),
};
assert_eq!(error, OutboundFailure::ConnectionClosed);
});
}
fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().to_peer_id();
(
peer_id,
tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(NoiseAuthenticated::xx(&id_keys).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed(),
)
assert_eq!(error, request_response::OutboundFailure::ConnectionClosed);
}
// Simple Ping-Pong Protocol

20
swarm-test/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "libp2p-swarm-test"
version = "0.1.0"
edition = "2021"
license = "MIT"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.57"
libp2p-core = { path = "../core" }
libp2p-swarm = { path = "../swarm" }
libp2p-yamux = { path = "../muxers/yamux" }
libp2p-plaintext = { path = "../transports/plaintext" }
libp2p-tcp = { path = "../transports/tcp", features = ["async-io"] }
futures = "0.3.24"
log = "0.4.17"
rand = "0.8.5"
futures-timer = "3.0.2"

373
swarm-test/src/lib.rs Normal file
View File

@ -0,0 +1,373 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use async_trait::async_trait;
use futures::future::Either;
use futures::StreamExt;
use libp2p_core::{
identity::Keypair, multiaddr::Protocol, transport::MemoryTransport, upgrade::Version,
Multiaddr, PeerId, Transport,
};
use libp2p_plaintext::PlainText2Config;
use libp2p_swarm::{
dial_opts::DialOpts, AddressScore, NetworkBehaviour, Swarm, SwarmEvent, THandlerErr,
};
use libp2p_yamux::YamuxConfig;
use std::fmt::Debug;
use std::time::Duration;
/// An extension trait for [`Swarm`] that makes it easier to set up a network of [`Swarm`]s for tests.
#[async_trait]
pub trait SwarmExt {
type NB: NetworkBehaviour;
/// Create a new [`Swarm`] with an ephemeral identity.
///
/// The swarm will use a [`MemoryTransport`] together with [`PlainText2Config`] authentication layer and
/// yamux as the multiplexer. However, these details should not be relied upon by the test
/// and may change at any time.
fn new_ephemeral(behaviour_fn: impl FnOnce(Keypair) -> Self::NB) -> Self
where
Self: Sized;
/// Establishes a connection to the given [`Swarm`], polling both of them until the connection is established.
async fn connect<T>(&mut self, other: &mut Swarm<T>)
where
T: NetworkBehaviour + Send,
<T as NetworkBehaviour>::OutEvent: Debug;
/// Dial the provided address and wait until a connection has been established.
///
/// In a normal test scenario, you should prefer [`SwarmExt::connect`] but that is not always possible.
/// This function only abstracts away the "dial and wait for `ConnectionEstablished` event" part.
///
/// Because we don't have access to the other [`Swarm`], we can't guarantee that it makes progress.
async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId;
/// Wait for specified condition to return `Some`.
async fn wait<E, P>(&mut self, predicate: P) -> E
where
P: Fn(
SwarmEvent<<Self::NB as NetworkBehaviour>::OutEvent, THandlerErr<Self::NB>>,
) -> Option<E>,
P: Send;
/// Listens for incoming connections, polling the [`Swarm`] until the transport is ready to accept connections.
///
/// The first address is for the memory transport, the second one for the TCP transport.
async fn listen(&mut self) -> (Multiaddr, Multiaddr);
/// Returns the next [`SwarmEvent`] or times out after 10 seconds.
///
/// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
async fn next_swarm_event(
&mut self,
) -> SwarmEvent<<Self::NB as NetworkBehaviour>::OutEvent, THandlerErr<Self::NB>>;
/// Returns the next behaviour event or times out after 10 seconds.
///
/// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::OutEvent;
async fn loop_on_next(self);
}
/// Drives two [`Swarm`]s until a certain number of events are emitted.
///
/// # Usage
///
/// ## Number of events
///
/// The number of events is configured via const generics based on the array size of the return type.
/// This allows the compiler to infer how many events you are expecting based on how you use this function.
/// For example, if you expect the first [`Swarm`] to emit 2 events, you should assign the first variable of the returned tuple value to an array of size 2.
/// This works especially well if you directly pattern-match on the return value.
///
/// ## Type of event
///
/// This function utilizes the [`TryIntoOutput`] trait.
/// Similar as to the number of expected events, the type of event is inferred based on your usage.
/// If you match against a [`SwarmEvent`], the first [`SwarmEvent`] will be returned.
/// If you match against your [`NetworkBehaviour::OutEvent`] type, [`SwarmEvent`]s which are not [`SwarmEvent::Behaviour`] will be skipped until the [`Swarm`] returns a behaviour event.
///
/// You can implement the [`TryIntoOutput`] for any other type to further customize this behaviour.
///
/// # Difference to [`futures::future::join`]
///
/// This function is similar to joining two futures with two crucial differences:
/// 1. As described above, it allows you to obtain more than a single event.
/// 2. More importantly, it will continue to poll the [`Swarm`]s **even if they already has emitted all expected events**.
///
/// Especially (2) is crucial for our usage of this function.
/// If a [`Swarm`] is not polled, nothing within it makes progress.
/// This can "starve" the other swarm which for example may wait for another message to be sent on a connection.
///
/// Using [`drive`] instead of [`futures::future::join`] ensures that a [`Swarm`] continues to be polled, even after it emitted its events.
pub async fn drive<
TBehaviour1,
const NUM_EVENTS_SWARM_1: usize,
Out1,
TBehaviour2,
const NUM_EVENTS_SWARM_2: usize,
Out2,
>(
swarm1: &mut Swarm<TBehaviour2>,
swarm2: &mut Swarm<TBehaviour1>,
) -> ([Out1; NUM_EVENTS_SWARM_1], [Out2; NUM_EVENTS_SWARM_2])
where
TBehaviour2: NetworkBehaviour + Send,
TBehaviour2::OutEvent: Debug,
TBehaviour1: NetworkBehaviour + Send,
TBehaviour1::OutEvent: Debug,
SwarmEvent<TBehaviour2::OutEvent, THandlerErr<TBehaviour2>>: TryIntoOutput<Out1>,
SwarmEvent<TBehaviour1::OutEvent, THandlerErr<TBehaviour1>>: TryIntoOutput<Out2>,
Out1: Debug,
Out2: Debug,
{
let mut res1 = Vec::<Out1>::with_capacity(NUM_EVENTS_SWARM_1);
let mut res2 = Vec::<Out2>::with_capacity(NUM_EVENTS_SWARM_2);
while res1.len() < NUM_EVENTS_SWARM_1 || res2.len() < NUM_EVENTS_SWARM_2 {
match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await {
Either::Left((o1, _)) => {
if let Ok(o1) = o1.try_into_output() {
res1.push(o1);
}
}
Either::Right((o2, _)) => {
if let Ok(o2) = o2.try_into_output() {
res2.push(o2);
}
}
}
}
(
res1.try_into().unwrap_or_else(|res1: Vec<_>| {
panic!(
"expected {NUM_EVENTS_SWARM_1} items from first swarm but got {}",
res1.len()
)
}),
res2.try_into().unwrap_or_else(|res2: Vec<_>| {
panic!(
"expected {NUM_EVENTS_SWARM_2} items from second swarm but got {}",
res2.len()
)
}),
)
}
pub trait TryIntoOutput<O>: Sized {
fn try_into_output(self) -> Result<O, Self>;
}
impl<O, THandlerErr> TryIntoOutput<O> for SwarmEvent<O, THandlerErr> {
fn try_into_output(self) -> Result<O, Self> {
self.try_into_behaviour_event()
}
}
impl<TBehaviourOutEvent, THandlerErr> TryIntoOutput<SwarmEvent<TBehaviourOutEvent, THandlerErr>>
for SwarmEvent<TBehaviourOutEvent, THandlerErr>
{
fn try_into_output(self) -> Result<SwarmEvent<TBehaviourOutEvent, THandlerErr>, Self> {
Ok(self)
}
}
#[async_trait]
impl<B> SwarmExt for Swarm<B>
where
B: NetworkBehaviour + Send,
<B as NetworkBehaviour>::OutEvent: Debug,
{
type NB = B;
fn new_ephemeral(behaviour_fn: impl FnOnce(Keypair) -> Self::NB) -> Self
where
Self: Sized,
{
let identity = Keypair::generate_ed25519();
let peer_id = PeerId::from(identity.public());
let transport = MemoryTransport::default()
.or_transport(libp2p_tcp::async_io::Transport::default())
.upgrade(Version::V1)
.authenticate(PlainText2Config {
local_public_key: identity.public(),
})
.multiplex(YamuxConfig::default())
.timeout(Duration::from_secs(20))
.boxed();
Swarm::without_executor(transport, behaviour_fn(identity), peer_id)
}
async fn connect<T>(&mut self, other: &mut Swarm<T>)
where
T: NetworkBehaviour + Send,
<T as NetworkBehaviour>::OutEvent: Debug,
{
let external_addresses = other
.external_addresses()
.cloned()
.map(|r| r.addr)
.collect();
let dial_opts = DialOpts::peer_id(*other.local_peer_id())
.addresses(external_addresses)
.build();
self.dial(dial_opts).unwrap();
let mut dialer_done = false;
let mut listener_done = false;
loop {
match futures::future::select(self.next_swarm_event(), other.next_swarm_event()).await {
Either::Left((SwarmEvent::ConnectionEstablished { .. }, _)) => {
dialer_done = true;
}
Either::Right((SwarmEvent::ConnectionEstablished { .. }, _)) => {
listener_done = true;
}
Either::Left((other, _)) => {
log::debug!("Ignoring event from dialer {:?}", other);
}
Either::Right((other, _)) => {
log::debug!("Ignoring event from listener {:?}", other);
}
}
if dialer_done && listener_done {
return;
}
}
}
async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId {
self.dial(addr.clone()).unwrap();
self.wait(|e| match e {
SwarmEvent::ConnectionEstablished {
endpoint, peer_id, ..
} => (endpoint.get_remote_address() == &addr).then_some(peer_id),
other => {
log::debug!("Ignoring event from dialer {:?}", other);
None
}
})
.await
}
async fn wait<E, P>(&mut self, predicate: P) -> E
where
P: Fn(SwarmEvent<<B as NetworkBehaviour>::OutEvent, THandlerErr<B>>) -> Option<E>,
P: Send,
{
loop {
let event = self.next_swarm_event().await;
if let Some(e) = predicate(event) {
break e;
}
}
}
async fn listen(&mut self) -> (Multiaddr, Multiaddr) {
let memory_addr_listener_id = self.listen_on(Protocol::Memory(0).into()).unwrap();
// block until we are actually listening
let memory_multiaddr = self
.wait(|e| match e {
SwarmEvent::NewListenAddr {
address,
listener_id,
} => (listener_id == memory_addr_listener_id).then_some(address),
other => {
log::debug!(
"Ignoring {:?} while waiting for listening to succeed",
other
);
None
}
})
.await;
// Memory addresses are externally reachable because they all share the same memory-space.
self.add_external_address(memory_multiaddr.clone(), AddressScore::Infinite);
let tcp_addr_listener_id = self
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
let tcp_multiaddr = self
.wait(|e| match e {
SwarmEvent::NewListenAddr {
address,
listener_id,
} => (listener_id == tcp_addr_listener_id).then_some(address),
other => {
log::debug!(
"Ignoring {:?} while waiting for listening to succeed",
other
);
None
}
})
.await;
// We purposely don't add the TCP addr as an external one because we want to only use the memory transport for making connections in here.
// The TCP transport is only supported for protocols that manage their own connections.
(memory_multiaddr, tcp_multiaddr)
}
async fn next_swarm_event(
&mut self,
) -> SwarmEvent<<Self::NB as NetworkBehaviour>::OutEvent, THandlerErr<Self::NB>> {
match futures::future::select(
futures_timer::Delay::new(Duration::from_secs(10)),
self.select_next_some(),
)
.await
{
Either::Left(((), _)) => panic!("Swarm did not emit an event within 10s"),
Either::Right((event, _)) => {
log::trace!("Swarm produced: {:?}", event);
event
}
}
}
async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::OutEvent {
loop {
if let Ok(event) = self.next_swarm_event().await.try_into_behaviour_event() {
return event;
}
}
}
async fn loop_on_next(mut self) {
while let Some(event) = self.next().await {
log::trace!("Swarm produced: {:?}", event);
}
}
}

View File

@ -296,6 +296,17 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
Dialing(PeerId),
}
impl<TBehaviourOutEvent, THandlerErr> SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour` variant, otherwise fail.
#[allow(clippy::result_large_err)]
pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
match self {
SwarmEvent::Behaviour(inner) => Ok(inner),
other => Err(other),
}
}
}
/// Contains the state of the network, plus the way it should behave.
///
/// Note: Needs to be polled via `<Swarm as Stream>` in order to make