diff --git a/Cargo.lock b/Cargo.lock index c39664be..19d7b99d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2820,7 +2820,7 @@ name = "libp2p-perf" version = "0.2.0" dependencies = [ "anyhow", - "async-std", + "async-trait", "clap 4.3.0", "env_logger 0.10.0", "futures", @@ -2828,15 +2828,19 @@ dependencies = [ "libp2p-core", "libp2p-dns", "libp2p-identity", - "libp2p-noise", "libp2p-quic", + "libp2p-request-response", "libp2p-swarm", "libp2p-swarm-test", "libp2p-tcp", + "libp2p-tls", "libp2p-yamux", "log", "rand 0.8.5", + "serde", + "serde_json", "thiserror", + "tokio", "void", ] diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index 05a35883..5dd8979c 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -12,21 +12,25 @@ categories = ["network-programming", "asynchronous"] [dependencies] anyhow = "1" -async-std = { version = "1.9.0", features = ["attributes"] } +async-trait = "0.1" clap = { version = "4.3.0", features = ["derive"] } env_logger = "0.10.0" futures = "0.3.28" instant = "0.1.12" libp2p-core = { workspace = true } -libp2p-dns = { workspace = true, features = ["async-std"] } +libp2p-dns = { workspace = true, features = ["tokio"] } libp2p-identity = { workspace = true } -libp2p-noise = { workspace = true } -libp2p-quic = { workspace = true, features = ["async-std"] } -libp2p-swarm = { workspace = true, features = ["macros", "async-std"] } -libp2p-tcp = { workspace = true, features = ["async-io"] } +libp2p-tls = { workspace = true } +libp2p-quic = { workspace = true, features = ["tokio"] } +libp2p-request-response = { workspace = true } +libp2p-swarm = { workspace = true, features = ["macros", "tokio"] } +libp2p-tcp = { workspace = true, features = ["tokio"] } libp2p-yamux = { workspace = true } log = "0.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" thiserror = "1.0" +tokio = { version = "1.27.0", features = ["full"] } void = "1" [dev-dependencies] diff --git a/protocols/perf/Dockerfile b/protocols/perf/Dockerfile index aef8eed1..6523e3be 100644 --- a/protocols/perf/Dockerfile +++ b/protocols/perf/Dockerfile @@ -9,14 +9,10 @@ RUN --mount=type=cache,target=./target \ cargo build --release --package libp2p-perf RUN --mount=type=cache,target=./target \ - mv ./target/release/perf-server /usr/local/bin/perf-server - -RUN --mount=type=cache,target=./target \ - mv ./target/release/perf-client /usr/local/bin/perf-client + mv ./target/release/perf /usr/local/bin/perf FROM debian:bullseye-slim -COPY --from=builder /usr/local/bin/perf-server /usr/local/bin/perf-server -COPY --from=builder /usr/local/bin/perf-client /usr/local/bin/perf-client +COPY --from=builder /usr/local/bin/perf /app/perf -ENTRYPOINT [ "perf-server"] +ENTRYPOINT [ "/app/perf" ] diff --git a/protocols/perf/src/bin/perf-client.rs b/protocols/perf/src/bin/perf-client.rs deleted file mode 100644 index 0b9a3d96..00000000 --- a/protocols/perf/src/bin/perf-client.rs +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2023 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use anyhow::{bail, Result}; -use clap::Parser; -use futures::{future::Either, StreamExt}; -use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, Transport}; -use libp2p_dns::DnsConfig; -use libp2p_identity::PeerId; -use libp2p_perf::client::RunParams; -use libp2p_swarm::{SwarmBuilder, SwarmEvent}; -use log::info; - -#[derive(Debug, Parser)] -#[clap(name = "libp2p perf client")] -struct Opts { - #[arg(long)] - server_address: Multiaddr, -} - -#[async_std::main] -async fn main() -> Result<()> { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); - - let opts = Opts::parse(); - - info!("Initiating performance tests with {}", opts.server_address); - - // Create a random PeerId - let local_key = libp2p_identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - - let transport = { - let tcp = - libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true)) - .upgrade(upgrade::Version::V1Lazy) - .authenticate( - libp2p_noise::Config::new(&local_key) - .expect("Signing libp2p-noise static DH keypair failed."), - ) - .multiplex(libp2p_yamux::Config::default()); - - let quic = { - let mut config = libp2p_quic::Config::new(&local_key); - config.support_draft_29 = true; - libp2p_quic::async_std::Transport::new(config) - }; - - let dns = DnsConfig::system(OrTransport::new(quic, tcp)) - .await - .unwrap(); - - dns.map(|either_output, _| match either_output { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - }) - .boxed() - }; - - let mut swarm = SwarmBuilder::with_async_std_executor( - transport, - libp2p_perf::client::Behaviour::default(), - local_peer_id, - ) - .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) - .build(); - - swarm.dial(opts.server_address.clone()).unwrap(); - let server_peer_id = loop { - match swarm.next().await.unwrap() { - SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id, - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - bail!("Outgoing connection error to {:?}: {:?}", peer_id, error); - } - e => panic!("{e:?}"), - } - }; - - info!( - "Connection to {} established. Launching benchmarks.", - opts.server_address - ); - - swarm.behaviour_mut().perf( - server_peer_id, - RunParams { - to_send: 10 * 1024 * 1024, - to_receive: 10 * 1024 * 1024, - }, - )?; - - let stats = loop { - match swarm.next().await.unwrap() { - SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. - } => { - info!("Established connection to {:?} via {:?}", peer_id, endpoint); - } - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - info!("Outgoing connection error to {:?}: {:?}", peer_id, error); - } - SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?, - e => panic!("{e:?}"), - } - }; - - let sent_mebibytes = stats.params.to_send as f64 / 1024.0 / 1024.0; - let sent_time = (stats.timers.write_done - stats.timers.write_start).as_secs_f64(); - let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time; - - let received_mebibytes = stats.params.to_receive as f64 / 1024.0 / 1024.0; - let receive_time = (stats.timers.read_done - stats.timers.write_done).as_secs_f64(); - let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time; - - info!( - "Finished run: Sent {sent_mebibytes:.2} MiB in {sent_time:.2} s with \ - {sent_bandwidth_mebibit_second:.2} MiBit/s and received \ - {received_mebibytes:.2} MiB in {receive_time:.2} s with \ - {receive_bandwidth_mebibit_second:.2} MiBit/s", - ); - - Ok(()) -} diff --git a/protocols/perf/src/bin/perf-server.rs b/protocols/perf/src/bin/perf-server.rs deleted file mode 100644 index 9219ed85..00000000 --- a/protocols/perf/src/bin/perf-server.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2023 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use clap::Parser; -use futures::{future::Either, StreamExt}; -use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Transport}; -use libp2p_dns::DnsConfig; -use libp2p_identity::PeerId; -use libp2p_swarm::{SwarmBuilder, SwarmEvent}; -use log::{error, info}; - -#[derive(Debug, Parser)] -#[clap(name = "libp2p perf server")] -struct Opts {} - -#[async_std::main] -async fn main() { - env_logger::init(); - - let _opts = Opts::parse(); - - // Create a random PeerId - let local_key = libp2p_identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - println!("Local peer id: {local_peer_id}"); - - let transport = { - let tcp = - libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true)) - .upgrade(upgrade::Version::V1Lazy) - .authenticate( - libp2p_noise::Config::new(&local_key) - .expect("Signing libp2p-noise static DH keypair failed."), - ) - .multiplex(libp2p_yamux::Config::default()); - - let quic = { - let mut config = libp2p_quic::Config::new(&local_key); - config.support_draft_29 = true; - libp2p_quic::async_std::Transport::new(config) - }; - - let dns = DnsConfig::system(OrTransport::new(quic, tcp)) - .await - .unwrap(); - - dns.map(|either_output, _| match either_output { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - }) - .boxed() - }; - - let mut swarm = SwarmBuilder::with_async_std_executor( - transport, - libp2p_perf::server::Behaviour::default(), - local_peer_id, - ) - .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) - .build(); - - swarm - .listen_on("/ip4/0.0.0.0/tcp/4001".parse().unwrap()) - .unwrap(); - - swarm - .listen_on("/ip4/0.0.0.0/udp/4001/quic-v1".parse().unwrap()) - .unwrap(); - - loop { - match swarm.next().await.unwrap() { - SwarmEvent::NewListenAddr { address, .. } => { - info!("Listening on {address}"); - } - SwarmEvent::IncomingConnection { .. } => {} - e @ SwarmEvent::IncomingConnectionError { .. } => { - error!("{e:?}"); - } - SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. - } => { - info!("Established connection to {:?} via {:?}", peer_id, endpoint); - } - SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::Behaviour(libp2p_perf::server::Event { - remote_peer_id, - stats, - }) => { - let received_mebibytes = stats.params.received as f64 / 1024.0 / 1024.0; - let receive_time = (stats.timers.read_done - stats.timers.read_start).as_secs_f64(); - let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time; - - let sent_mebibytes = stats.params.sent as f64 / 1024.0 / 1024.0; - let sent_time = (stats.timers.write_done - stats.timers.read_done).as_secs_f64(); - let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time; - - info!( - "Finished run with {}: Received {:.2} MiB in {:.2} s with {:.2} MiBit/s and sent {:.2} MiB in {:.2} s with {:.2} MiBit/s", - remote_peer_id, - received_mebibytes, - receive_time, - receive_bandwidth_mebibit_second, - sent_mebibytes, - sent_time, - sent_bandwidth_mebibit_second, - ) - } - e => panic!("{e:?}"), - } - } -} diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs new file mode 100644 index 00000000..5404f405 --- /dev/null +++ b/protocols/perf/src/bin/perf.rs @@ -0,0 +1,486 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{net::SocketAddr, str::FromStr}; + +use anyhow::{bail, Result}; +use clap::Parser; +use futures::FutureExt; +use futures::{future::Either, StreamExt}; +use instant::{Duration, Instant}; +use libp2p_core::{ + multiaddr::Protocol, muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, + Transport as _, +}; +use libp2p_identity::PeerId; +use libp2p_perf::{Run, RunDuration, RunParams}; +use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use log::{error, info}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Parser)] +#[clap(name = "libp2p perf client")] +struct Opts { + #[arg(long)] + server_address: Option, + #[arg(long)] + transport: Option, + #[arg(long)] + upload_bytes: Option, + #[arg(long)] + download_bytes: Option, + + /// Run in server mode. + #[clap(long)] + run_server: bool, + /// Fixed value to generate deterministic peer id. + #[clap(long)] + secret_key_seed: Option, +} + +/// Supported transports by rust-libp2p. +#[derive(Clone, Debug)] +pub enum Transport { + Tcp, + QuicV1, +} + +impl FromStr for Transport { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::result::Result { + Ok(match s { + "tcp" => Self::Tcp, + "quic-v1" => Self::QuicV1, + other => bail!("unknown transport {other}"), + }) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) + .format_timestamp_millis() + .init(); + + let opts = Opts::parse(); + match opts { + Opts { + server_address: Some(server_address), + transport: None, + upload_bytes: None, + download_bytes: None, + run_server: true, + secret_key_seed: Some(secret_key_seed), + } => server(server_address, secret_key_seed).await?, + Opts { + server_address: Some(server_address), + transport: Some(transport), + upload_bytes, + download_bytes, + run_server: false, + secret_key_seed: None, + } => { + client(server_address, transport, upload_bytes, download_bytes).await?; + } + _ => panic!("invalid command line arguments: {opts:?}"), + }; + + Ok(()) +} + +async fn server(server_address: SocketAddr, secret_key_seed: u8) -> Result<()> { + let mut swarm = swarm::(Some(secret_key_seed)).await?; + + swarm.listen_on( + Multiaddr::empty() + .with(server_address.ip().into()) + .with(Protocol::Tcp(server_address.port())), + )?; + + swarm + .listen_on( + Multiaddr::empty() + .with(server_address.ip().into()) + .with(Protocol::Udp(server_address.port())) + .with(Protocol::QuicV1), + ) + .unwrap(); + + tokio::spawn(async move { + loop { + match swarm.next().await.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {address}"); + } + SwarmEvent::IncomingConnection { .. } => {} + e @ SwarmEvent::IncomingConnectionError { .. } => { + error!("{e:?}"); + } + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + info!("Established connection to {:?} via {:?}", peer_id, endpoint); + } + SwarmEvent::ConnectionClosed { .. } => {} + SwarmEvent::Behaviour(()) => { + info!("Finished run",) + } + e => panic!("{e:?}"), + } + } + }) + .await + .unwrap(); + + Ok(()) +} + +async fn client( + server_address: SocketAddr, + transport: Transport, + upload_bytes: Option, + download_bytes: Option, +) -> Result<()> { + let server_address = match transport { + Transport::Tcp => Multiaddr::empty() + .with(server_address.ip().into()) + .with(Protocol::Tcp(server_address.port())), + Transport::QuicV1 => Multiaddr::empty() + .with(server_address.ip().into()) + .with(Protocol::Udp(server_address.port())) + .with(Protocol::QuicV1), + }; + + let benchmarks = if upload_bytes.is_some() { + vec![custom( + server_address, + RunParams { + to_send: upload_bytes.unwrap(), + to_receive: download_bytes.unwrap(), + }, + ) + .boxed()] + } else { + vec![ + latency(server_address.clone()).boxed(), + throughput(server_address.clone()).boxed(), + requests_per_second(server_address.clone()).boxed(), + sequential_connections_per_second(server_address.clone()).boxed(), + ] + }; + + tokio::spawn(async move { + for benchmark in benchmarks { + benchmark.await?; + } + + anyhow::Ok(()) + }) + .await??; + + Ok(()) +} + +async fn custom(server_address: Multiaddr, params: RunParams) -> Result<()> { + info!("start benchmark: custom"); + let mut swarm = swarm(None).await?; + + let (server_peer_id, connection_established) = + connect(&mut swarm, server_address.clone()).await?; + + let RunDuration { upload, download } = perf(&mut swarm, server_peer_id, params).await?; + + #[derive(Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + struct CustomResult { + connection_established_seconds: f64, + upload_seconds: f64, + download_seconds: f64, + } + + println!( + "{}", + serde_json::to_string(&CustomResult { + connection_established_seconds: connection_established.as_secs_f64(), + upload_seconds: upload.as_secs_f64(), + download_seconds: download.as_secs_f64(), + }) + .unwrap() + ); + + Ok(()) +} + +async fn latency(server_address: Multiaddr) -> Result<()> { + info!("start benchmark: round-trip-time latency"); + let mut swarm = swarm(None).await?; + + let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?; + + let mut rounds = 0; + let start = Instant::now(); + let mut latencies = Vec::new(); + + loop { + if start.elapsed() > Duration::from_secs(30) { + break; + } + + let start = Instant::now(); + + perf( + &mut swarm, + server_peer_id, + RunParams { + to_send: 1, + to_receive: 1, + }, + ) + .await?; + + latencies.push(start.elapsed().as_secs_f64()); + rounds += 1; + } + + latencies.sort_by(|a, b| a.partial_cmp(b).unwrap()); + + info!( + "Finished: {rounds} pings in {:.4}s", + start.elapsed().as_secs_f64() + ); + info!("- {:.4} s median", percentile(&latencies, 0.50),); + info!("- {:.4} s 95th percentile\n", percentile(&latencies, 0.95),); + Ok(()) +} + +fn percentile(values: &[V], percentile: f64) -> V { + let n: usize = (values.len() as f64 * percentile).ceil() as usize - 1; + values[n] +} + +async fn throughput(server_address: Multiaddr) -> Result<()> { + info!("start benchmark: single connection single channel throughput"); + let mut swarm = swarm(None).await?; + + let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?; + + let params = RunParams { + to_send: 10 * 1024 * 1024, + to_receive: 10 * 1024 * 1024, + }; + + perf(&mut swarm, server_peer_id, params).await?; + + Ok(()) +} + +async fn requests_per_second(server_address: Multiaddr) -> Result<()> { + info!("start benchmark: single connection parallel requests per second"); + let mut swarm = swarm(None).await?; + + let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?; + + let num = 1_000; + let to_send = 1; + let to_receive = 1; + + for _ in 0..num { + swarm.behaviour_mut().perf( + server_peer_id, + RunParams { + to_send, + to_receive, + }, + )?; + } + + let mut finished = 0; + let start = Instant::now(); + + loop { + match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(libp2p_perf::client::Event { + id: _, + result: Ok(_), + }) => { + finished += 1; + + if finished == num { + break; + } + } + e => panic!("{e:?}"), + } + } + + let duration = start.elapsed().as_secs_f64(); + let requests_per_second = num as f64 / duration; + + info!( + "Finished: sent {num} {to_send} bytes requests with {to_receive} bytes response each within {duration:.2} s", + ); + info!("- {requests_per_second:.2} req/s\n"); + + Ok(()) +} + +async fn sequential_connections_per_second(server_address: Multiaddr) -> Result<()> { + info!("start benchmark: sequential connections with single request per second"); + let mut rounds = 0; + let to_send = 1; + let to_receive = 1; + let start = Instant::now(); + + let mut latency_connection_establishment = Vec::new(); + let mut latency_connection_establishment_plus_request = Vec::new(); + + loop { + if start.elapsed() > Duration::from_secs(30) { + break; + } + + let mut swarm = swarm(None).await?; + + let start = Instant::now(); + + let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?; + + latency_connection_establishment.push(start.elapsed().as_secs_f64()); + + perf( + &mut swarm, + server_peer_id, + RunParams { + to_send, + to_receive, + }, + ) + .await?; + + latency_connection_establishment_plus_request.push(start.elapsed().as_secs_f64()); + rounds += 1; + } + + let duration = start.elapsed().as_secs_f64(); + + latency_connection_establishment.sort_by(|a, b| a.partial_cmp(b).unwrap()); + latency_connection_establishment_plus_request.sort_by(|a, b| a.partial_cmp(b).unwrap()); + + let connection_establishment_95th = percentile(&latency_connection_establishment, 0.95); + let connection_establishment_plus_request_95th = + percentile(&latency_connection_establishment_plus_request, 0.95); + + info!( + "Finished: established {rounds} connections with one {to_send} bytes request and one {to_receive} bytes response within {duration:.2} s", + ); + info!("- {connection_establishment_95th:.4} s 95th percentile connection establishment"); + info!("- {connection_establishment_plus_request_95th:.4} s 95th percentile connection establishment + one request"); + + Ok(()) +} + +async fn swarm(secret_key_seed: Option) -> Result> { + let local_key = if let Some(seed) = secret_key_seed { + generate_ed25519(seed) + } else { + libp2p_identity::Keypair::generate_ed25519() + }; + let local_peer_id = PeerId::from(local_key.public()); + + let transport = { + let tcp = libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default().nodelay(true)) + .upgrade(upgrade::Version::V1Lazy) + .authenticate(libp2p_tls::Config::new(&local_key)?) + .multiplex(libp2p_yamux::Config::default()); + + let quic = { + let mut config = libp2p_quic::Config::new(&local_key); + config.support_draft_29 = true; + libp2p_quic::tokio::Transport::new(config) + }; + + let dns = libp2p_dns::TokioDnsConfig::system(OrTransport::new(quic, tcp))?; + + dns.map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .boxed() + }; + + Ok( + SwarmBuilder::with_tokio_executor(transport, Default::default(), local_peer_id) + .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) + .build(), + ) +} + +async fn connect( + swarm: &mut Swarm, + server_address: Multiaddr, +) -> Result<(PeerId, Duration)> { + let start = Instant::now(); + swarm.dial(server_address.clone()).unwrap(); + + let server_peer_id = loop { + match swarm.next().await.unwrap() { + SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id, + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + bail!("Outgoing connection error to {:?}: {:?}", peer_id, error); + } + e => panic!("{e:?}"), + } + }; + + let duration = start.elapsed(); + let duration_seconds = duration.as_secs_f64(); + + info!("established connection in {duration_seconds:.4} s"); + + Ok((server_peer_id, duration)) +} + +async fn perf( + swarm: &mut Swarm, + server_peer_id: PeerId, + params: RunParams, +) -> Result { + swarm.behaviour_mut().perf(server_peer_id, params)?; + + let duration = match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(libp2p_perf::client::Event { + id: _, + result: Ok(duration), + }) => duration, + e => panic!("{e:?}"), + }; + + info!("{}", Run { params, duration }); + + Ok(duration) +} + +fn generate_ed25519(secret_key_seed: u8) -> libp2p_identity::Keypair { + let mut bytes = [0u8; 32]; + bytes[0] = secret_key_seed; + + libp2p_identity::Keypair::ed25519_from_bytes(bytes).expect("only errors on wrong length") +} diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index c320b18e..93c2086a 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -18,45 +18,232 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod behaviour; -mod handler; +use instant::Duration; -use instant::Instant; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ + collections::HashSet, + task::{Context, Poll}, +}; -pub use behaviour::{Behaviour, Event}; +use libp2p_core::Multiaddr; +use libp2p_identity::PeerId; +use libp2p_request_response as request_response; +use libp2p_swarm::{ + derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, + NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; -/// Parameters for a single run, i.e. one stream, sending and receiving data. -#[derive(Debug, Clone, Copy)] -pub struct RunParams { - pub to_send: usize, - pub to_receive: usize, -} - -/// Timers for a single run, i.e. one stream, sending and receiving data. -#[derive(Debug, Clone, Copy)] -pub struct RunTimers { - pub write_start: Instant, - pub write_done: Instant, - pub read_done: Instant, -} - -/// Statistics for a single run, i.e. one stream, sending and receiving data. -#[derive(Debug)] -pub struct RunStats { - pub params: RunParams, - pub timers: RunTimers, -} - -static NEXT_RUN_ID: AtomicUsize = AtomicUsize::new(1); +use crate::{protocol::Response, RunDuration, RunParams}; /// Connection identifier. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct RunId(usize); +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +pub struct RunId(request_response::RequestId); -impl RunId { - /// Returns the next available [`RunId`]. - pub(crate) fn next() -> Self { - Self(NEXT_RUN_ID.fetch_add(1, Ordering::SeqCst)) +impl From for RunId { + fn from(value: request_response::RequestId) -> Self { + Self(value) + } +} + +#[derive(Debug)] +pub struct Event { + pub id: RunId, + pub result: Result, +} + +pub struct Behaviour { + connected: HashSet, + request_response: request_response::Behaviour, +} + +impl Default for Behaviour { + fn default() -> Self { + let mut req_resp_config = request_response::Config::default(); + req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5)); + req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); + Self { + connected: Default::default(), + request_response: request_response::Behaviour::new( + std::iter::once(( + crate::PROTOCOL_NAME, + request_response::ProtocolSupport::Outbound, + )), + req_resp_config, + ), + } + } +} + +impl Behaviour { + pub fn new() -> Self { + Self::default() + } + + pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { + if !self.connected.contains(&server) { + return Err(PerfError::NotConnected); + } + + let id = self.request_response.send_request(&server, params).into(); + + Ok(id) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum PerfError { + #[error("Not connected to peer")] + NotConnected, +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = + as NetworkBehaviour>::ConnectionHandler; + type ToSwarm = Event; + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.request_response.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.request_response + .handle_established_outbound_connection(connection_id, peer, addr, role_override) + } + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), libp2p_swarm::ConnectionDenied> { + self.request_response.handle_pending_inbound_connection( + connection_id, + local_addr, + remote_addr, + ) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.request_response.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { + self.connected.insert(peer_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id: _, + endpoint: _, + handler: _, + remaining_established, + }) => { + if remaining_established == 0 { + assert!(self.connected.remove(&peer_id)); + } + } + FromSwarm::AddressChange(_) + | FromSwarm::DialFailure(_) + | FromSwarm::ListenFailure(_) + | FromSwarm::NewListener(_) + | FromSwarm::NewListenAddr(_) + | FromSwarm::ExpiredListenAddr(_) + | FromSwarm::ListenerError(_) + | FromSwarm::ListenerClosed(_) + | FromSwarm::NewExternalAddrCandidate(_) + | FromSwarm::ExternalAddrConfirmed(_) + | FromSwarm::ExternalAddrExpired(_) => {} + }; + + self.request_response.on_swarm_event(event); + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.request_response + .on_connection_handler_event(peer_id, connection_id, event); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll>> { + self.request_response.poll(cx, params).map(|to_swarm| { + to_swarm.map_out(|m| match m { + request_response::Event::Message { + peer: _, + message: + request_response::Message::Response { + request_id, + response: Response::Receiver(run_duration), + }, + } => Event { + id: request_id.into(), + result: Ok(run_duration), + }, + request_response::Event::Message { + peer: _, + message: + request_response::Message::Response { + response: Response::Sender(_), + .. + }, + } => unreachable!(), + request_response::Event::Message { + peer: _, + message: request_response::Message::Request { .. }, + } => { + unreachable!() + } + request_response::Event::OutboundFailure { + peer: _, + request_id, + error, + } => Event { + id: request_id.into(), + result: Err(error), + }, + request_response::Event::InboundFailure { + peer: _, + request_id: _, + error: _, + } => unreachable!(), + request_response::Event::ResponseSent { .. } => unreachable!(), + }) + }) } } diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index aeb91ff2..b2b12244 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -24,6 +24,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +use std::fmt::Display; + +use instant::Duration; use libp2p_swarm::StreamProtocol; pub mod client; @@ -31,3 +34,85 @@ mod protocol; pub mod server; pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0"); + +/// Parameters for a single run, i.e. one stream, sending and receiving data. +/// +/// Property names are from the perspective of the actor. E.g. `to_send` is the amount of data to +/// send, both as the client and the server. +#[derive(Debug, Clone, Copy)] +pub struct RunParams { + pub to_send: usize, + pub to_receive: usize, +} + +/// Duration for a single run, i.e. one stream, sending and receiving data. +#[derive(Debug, Clone, Copy)] +pub struct RunDuration { + pub upload: Duration, + pub download: Duration, +} + +pub struct Run { + pub params: RunParams, + pub duration: RunDuration, +} + +impl Display for Run { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + const KILO: f64 = 1024.0; + const MEGA: f64 = KILO * 1024.0; + const GIGA: f64 = MEGA * 1024.0; + + fn format_bytes(bytes: usize) -> String { + let bytes = bytes as f64; + if bytes >= GIGA { + format!("{:.2} GiB", bytes / GIGA) + } else if bytes >= MEGA { + format!("{:.2} MiB", bytes / MEGA) + } else if bytes >= KILO { + format!("{:.2} KiB", bytes / KILO) + } else { + format!("{} B", bytes) + } + } + + fn format_bandwidth(duration: Duration, bytes: usize) -> String { + const KILO: f64 = 1024.0; + const MEGA: f64 = KILO * 1024.0; + const GIGA: f64 = MEGA * 1024.0; + + let bandwidth = (bytes as f64 * 8.0) / duration.as_secs_f64(); + + if bandwidth >= GIGA { + format!("{:.2} Gbit/s", bandwidth / GIGA) + } else if bandwidth >= MEGA { + format!("{:.2} Mbit/s", bandwidth / MEGA) + } else if bandwidth >= KILO { + format!("{:.2} Kbit/s", bandwidth / KILO) + } else { + format!("{:.2} bit/s", bandwidth) + } + } + + let Run { + params: RunParams { + to_send, + to_receive, + }, + duration: RunDuration { upload, download }, + } = self; + + write!( + f, + "uploaded {} in {:.4} s ({}), downloaded {} in {:.4} s ({})", + format_bytes(*to_send), + upload.as_secs_f64(), + format_bandwidth(*upload, *to_send), + format_bytes(*to_receive), + download.as_secs_f64(), + format_bandwidth(*download, *to_receive), + )?; + + Ok(()) + } +} diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index 7f9c5137..4b454fb8 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -18,189 +18,184 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use instant::Instant; - +use async_trait::async_trait; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use instant::Instant; +use libp2p_request_response as request_response; +use libp2p_swarm::StreamProtocol; +use std::io; -use crate::{client, server}; +use crate::{RunDuration, RunParams}; -const BUF: [u8; 1024] = [0; 1024]; +const BUF: [u8; 65536] = [0; 64 << 10]; -pub(crate) async fn send_receive( - params: client::RunParams, - mut stream: S, -) -> Result { - let client::RunParams { - to_send, - to_receive, - } = params; - - let mut receive_buf = vec![0; 1024]; - - stream.write_all(&(to_receive as u64).to_be_bytes()).await?; - - let write_start = Instant::now(); - - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; - - sent += stream.write(buf).await?; - } - - stream.close().await?; - - let write_done = Instant::now(); - - let mut received = 0; - while received < to_receive { - received += stream.read(&mut receive_buf).await?; - } - - let read_done = Instant::now(); - - Ok(client::RunTimers { - write_start, - write_done, - read_done, - }) +#[derive(Debug)] +pub enum Response { + Sender(usize), + Receiver(RunDuration), } -pub(crate) async fn receive_send( - mut stream: S, -) -> Result { - let to_send = { - let mut buf = [0; 8]; - stream.read_exact(&mut buf).await?; +#[derive(Default)] +pub struct Codec { + to_receive: Option, - u64::from_be_bytes(buf) as usize - }; - - let read_start = Instant::now(); - - let mut receive_buf = vec![0; 1024]; - let mut received = 0; - loop { - let n = stream.read(&mut receive_buf).await?; - received += n; - if n == 0 { - break; - } - } - - let read_done = Instant::now(); - - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; - - sent += stream.write(buf).await?; - } - - stream.close().await?; - let write_done = Instant::now(); - - Ok(server::RunStats { - params: server::RunParams { sent, received }, - timers: server::RunTimers { - read_start, - read_done, - write_done, - }, - }) + write_start: Option, + read_start: Option, + read_done: Option, } -#[cfg(test)] -mod tests { - use super::*; - use futures::{executor::block_on, AsyncRead, AsyncWrite}; - use std::{ - pin::Pin, - sync::{Arc, Mutex}, - task::Poll, - }; - - #[derive(Clone)] - struct DummyStream { - inner: Arc>, +impl Clone for Codec { + fn clone(&self) -> Self { + Default::default() } +} - struct DummyStreamInner { - read: Vec, - write: Vec, - } +#[async_trait] +impl request_response::Codec for Codec { + /// The type of protocol(s) or protocol versions being negotiated. + type Protocol = StreamProtocol; + /// The type of inbound and outbound requests. + type Request = RunParams; + /// The type of inbound and outbound responses. + type Response = Response; - impl DummyStream { - fn new(read: Vec) -> Self { - Self { - inner: Arc::new(Mutex::new(DummyStreamInner { - read, - write: Vec::new(), - })), + /// Reads a request from the given I/O stream according to the + /// negotiated protocol. + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut receive_buf = vec![0; 64 << 10]; + + let to_send = { + let mut buf = [0; 8]; + io.read_exact(&mut buf).await?; + + u64::from_be_bytes(buf) as usize + }; + + let mut received = 0; + loop { + let n = io.read(&mut receive_buf).await?; + received += n; + if n == 0 { + break; } } + + Ok(RunParams { + to_receive: received, + to_send, + }) } - impl Unpin for DummyStream {} + /// Reads a response from the given I/O stream according to the + /// negotiated protocol. + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + assert!(self.write_start.is_some()); + assert_eq!(self.read_start, None); + assert_eq!(self.read_done, None); - impl AsyncWrite for DummyStream { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf) + self.read_start = Some(Instant::now()); + + let mut receive_buf = vec![0; 64 << 10]; + + let mut received = 0; + loop { + let n = io.read(&mut receive_buf).await?; + received += n; + // Make sure to wait for the remote to close the stream. Otherwise with `to_receive` of `0` + // one does not measure the full round-trip of the previous write. + if n == 0 { + break; + } } - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx) - } + self.read_done = Some(Instant::now()); - fn poll_close( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx) - } + assert_eq!(received, self.to_receive.unwrap()); + + Ok(Response::Receiver(RunDuration { + upload: self + .read_start + .unwrap() + .duration_since(self.write_start.unwrap()), + download: self + .read_done + .unwrap() + .duration_since(self.read_start.unwrap()), + })) } - impl AsyncRead for DummyStream { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll> { - let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len()); - let new = self.inner.lock().unwrap().read.split_off(amt); + /// Writes a request to the given I/O stream according to the + /// negotiated protocol. + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + assert_eq!(self.to_receive, None); + assert_eq!(self.write_start, None); + assert_eq!(self.read_start, None); + assert_eq!(self.read_done, None); - buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice()); + self.write_start = Some(Instant::now()); - self.inner.lock().unwrap().read = new; - Poll::Ready(Ok(amt)) + let RunParams { + to_send, + to_receive, + } = req; + + self.to_receive = Some(to_receive); + + io.write_all(&(to_receive as u64).to_be_bytes()).await?; + + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + sent += io.write(buf).await?; } + + Ok(()) } - #[test] - fn test_client() { - let stream = DummyStream::new(vec![0]); + /// Writes a response to the given I/O stream according to the + /// negotiated protocol. + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + response: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let to_send = match response { + Response::Sender(to_send) => to_send, + Response::Receiver(_) => unreachable!(), + }; - block_on(send_receive( - client::RunParams { - to_send: 0, - to_receive: 0, - }, - stream.clone(), - )) - .unwrap(); + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; - assert_eq!( - stream.inner.lock().unwrap().write, - 0u64.to_be_bytes().to_vec() - ); + sent += io.write(buf).await?; + } + + Ok(()) } } diff --git a/protocols/perf/src/server.rs b/protocols/perf/src/server.rs index fd0643a0..79f77c74 100644 --- a/protocols/perf/src/server.rs +++ b/protocols/perf/src/server.rs @@ -18,31 +18,150 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod behaviour; -mod handler; +use std::task::{Context, Poll}; -use instant::Instant; +use instant::Duration; +use libp2p_core::Multiaddr; +use libp2p_identity::PeerId; +use libp2p_request_response as request_response; +use libp2p_swarm::{ + ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent, + ToSwarm, +}; -pub use behaviour::{Behaviour, Event}; +use crate::protocol::Response; -/// Parameters for a single run, i.e. one stream, sending and receiving data. -#[derive(Debug, Clone, Copy)] -pub struct RunParams { - pub sent: usize, - pub received: usize, +pub struct Behaviour { + request_response: request_response::Behaviour, } -/// Timers for a single run, i.e. one stream, sending and receiving data. -#[derive(Debug, Clone, Copy)] -pub struct RunTimers { - pub read_start: Instant, - pub read_done: Instant, - pub write_done: Instant, +impl Default for Behaviour { + fn default() -> Self { + let mut req_resp_config = request_response::Config::default(); + req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5)); + req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); + + Self { + request_response: request_response::Behaviour::new( + std::iter::once(( + crate::PROTOCOL_NAME, + request_response::ProtocolSupport::Inbound, + )), + req_resp_config, + ), + } + } } -/// Statistics for a single run, i.e. one stream, sending and receiving data. -#[derive(Debug)] -pub struct RunStats { - pub params: RunParams, - pub timers: RunTimers, +impl Behaviour { + pub fn new() -> Self { + Self::default() + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = + as NetworkBehaviour>::ConnectionHandler; + type ToSwarm = (); + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.request_response.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.request_response + .handle_established_outbound_connection(connection_id, peer, addr, role_override) + } + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), libp2p_swarm::ConnectionDenied> { + self.request_response.handle_pending_inbound_connection( + connection_id, + local_addr, + remote_addr, + ) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.request_response.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + self.request_response.on_swarm_event(event); + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.request_response + .on_connection_handler_event(peer_id, connection_id, event); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll>> { + self.request_response.poll(cx, params).map(|to_swarm| { + to_swarm.map_out(|m| match m { + request_response::Event::Message { + peer: _, + message: request_response::Message::Response { .. }, + } => { + unreachable!() + } + request_response::Event::Message { + peer: _, + message: + request_response::Message::Request { + request_id: _, + request, + channel, + }, + } => { + let _ = self + .request_response + .send_response(channel, Response::Sender(request.to_send)); + } + request_response::Event::OutboundFailure { .. } => unreachable!(), + request_response::Event::InboundFailure { .. } => {} + request_response::Event::ResponseSent { .. } => {} + }) + }) + } } diff --git a/protocols/perf/tests/lib.rs b/protocols/perf/tests/lib.rs index facde7d8..af5bc2c3 100644 --- a/protocols/perf/tests/lib.rs +++ b/protocols/perf/tests/lib.rs @@ -18,14 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_perf::{ - client::{self, RunParams}, - server, -}; +use libp2p_perf::{client, server, RunParams}; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; -#[async_std::test] +#[tokio::test] async fn perf() { let _ = env_logger::try_init(); @@ -36,7 +33,7 @@ async fn perf() { server.listen().await; client.connect(&mut server).await; - async_std::task::spawn(server.loop_on_next()); + tokio::spawn(server.loop_on_next()); client .behaviour_mut()