refactor(perf): use libp2p-request-response

Rewrite `libp2p-perf` protocol using `libp2p-request-response`. Additionally adjust to latest conventions, e.g. the final JSON output.

Pull-Request: #3646.
This commit is contained in:
Max Inden
2023-05-28 07:51:53 +02:00
committed by GitHub
parent d4c4078e6d
commit 92af0d1281
11 changed files with 1103 additions and 498 deletions

View File

@ -1,140 +0,0 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use anyhow::{bail, Result};
use clap::Parser;
use futures::{future::Either, StreamExt};
use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, Transport};
use libp2p_dns::DnsConfig;
use libp2p_identity::PeerId;
use libp2p_perf::client::RunParams;
use libp2p_swarm::{SwarmBuilder, SwarmEvent};
use log::info;
#[derive(Debug, Parser)]
#[clap(name = "libp2p perf client")]
struct Opts {
#[arg(long)]
server_address: Multiaddr,
}
#[async_std::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opts = Opts::parse();
info!("Initiating performance tests with {}", opts.server_address);
// Create a random PeerId
let local_key = libp2p_identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
let transport = {
let tcp =
libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true))
.upgrade(upgrade::Version::V1Lazy)
.authenticate(
libp2p_noise::Config::new(&local_key)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p_yamux::Config::default());
let quic = {
let mut config = libp2p_quic::Config::new(&local_key);
config.support_draft_29 = true;
libp2p_quic::async_std::Transport::new(config)
};
let dns = DnsConfig::system(OrTransport::new(quic, tcp))
.await
.unwrap();
dns.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed()
};
let mut swarm = SwarmBuilder::with_async_std_executor(
transport,
libp2p_perf::client::Behaviour::default(),
local_peer_id,
)
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.build();
swarm.dial(opts.server_address.clone()).unwrap();
let server_peer_id = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
e => panic!("{e:?}"),
}
};
info!(
"Connection to {} established. Launching benchmarks.",
opts.server_address
);
swarm.behaviour_mut().perf(
server_peer_id,
RunParams {
to_send: 10 * 1024 * 1024,
to_receive: 10 * 1024 * 1024,
},
)?;
let stats = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?,
e => panic!("{e:?}"),
}
};
let sent_mebibytes = stats.params.to_send as f64 / 1024.0 / 1024.0;
let sent_time = (stats.timers.write_done - stats.timers.write_start).as_secs_f64();
let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time;
let received_mebibytes = stats.params.to_receive as f64 / 1024.0 / 1024.0;
let receive_time = (stats.timers.read_done - stats.timers.write_done).as_secs_f64();
let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time;
info!(
"Finished run: Sent {sent_mebibytes:.2} MiB in {sent_time:.2} s with \
{sent_bandwidth_mebibit_second:.2} MiBit/s and received \
{received_mebibytes:.2} MiB in {receive_time:.2} s with \
{receive_bandwidth_mebibit_second:.2} MiBit/s",
);
Ok(())
}

View File

@ -1,128 +0,0 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use clap::Parser;
use futures::{future::Either, StreamExt};
use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Transport};
use libp2p_dns::DnsConfig;
use libp2p_identity::PeerId;
use libp2p_swarm::{SwarmBuilder, SwarmEvent};
use log::{error, info};
#[derive(Debug, Parser)]
#[clap(name = "libp2p perf server")]
struct Opts {}
#[async_std::main]
async fn main() {
env_logger::init();
let _opts = Opts::parse();
// Create a random PeerId
let local_key = libp2p_identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {local_peer_id}");
let transport = {
let tcp =
libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true))
.upgrade(upgrade::Version::V1Lazy)
.authenticate(
libp2p_noise::Config::new(&local_key)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p_yamux::Config::default());
let quic = {
let mut config = libp2p_quic::Config::new(&local_key);
config.support_draft_29 = true;
libp2p_quic::async_std::Transport::new(config)
};
let dns = DnsConfig::system(OrTransport::new(quic, tcp))
.await
.unwrap();
dns.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed()
};
let mut swarm = SwarmBuilder::with_async_std_executor(
transport,
libp2p_perf::server::Behaviour::default(),
local_peer_id,
)
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.build();
swarm
.listen_on("/ip4/0.0.0.0/tcp/4001".parse().unwrap())
.unwrap();
swarm
.listen_on("/ip4/0.0.0.0/udp/4001/quic-v1".parse().unwrap())
.unwrap();
loop {
match swarm.next().await.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
info!("Listening on {address}");
}
SwarmEvent::IncomingConnection { .. } => {}
e @ SwarmEvent::IncomingConnectionError { .. } => {
error!("{e:?}");
}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::Behaviour(libp2p_perf::server::Event {
remote_peer_id,
stats,
}) => {
let received_mebibytes = stats.params.received as f64 / 1024.0 / 1024.0;
let receive_time = (stats.timers.read_done - stats.timers.read_start).as_secs_f64();
let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time;
let sent_mebibytes = stats.params.sent as f64 / 1024.0 / 1024.0;
let sent_time = (stats.timers.write_done - stats.timers.read_done).as_secs_f64();
let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time;
info!(
"Finished run with {}: Received {:.2} MiB in {:.2} s with {:.2} MiBit/s and sent {:.2} MiB in {:.2} s with {:.2} MiBit/s",
remote_peer_id,
received_mebibytes,
receive_time,
receive_bandwidth_mebibit_second,
sent_mebibytes,
sent_time,
sent_bandwidth_mebibit_second,
)
}
e => panic!("{e:?}"),
}
}
}

View File

@ -0,0 +1,486 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::{net::SocketAddr, str::FromStr};
use anyhow::{bail, Result};
use clap::Parser;
use futures::FutureExt;
use futures::{future::Either, StreamExt};
use instant::{Duration, Instant};
use libp2p_core::{
multiaddr::Protocol, muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr,
Transport as _,
};
use libp2p_identity::PeerId;
use libp2p_perf::{Run, RunDuration, RunParams};
use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent};
use log::{error, info};
use serde::{Deserialize, Serialize};
#[derive(Debug, Parser)]
#[clap(name = "libp2p perf client")]
struct Opts {
#[arg(long)]
server_address: Option<SocketAddr>,
#[arg(long)]
transport: Option<Transport>,
#[arg(long)]
upload_bytes: Option<usize>,
#[arg(long)]
download_bytes: Option<usize>,
/// Run in server mode.
#[clap(long)]
run_server: bool,
/// Fixed value to generate deterministic peer id.
#[clap(long)]
secret_key_seed: Option<u8>,
}
/// Supported transports by rust-libp2p.
#[derive(Clone, Debug)]
pub enum Transport {
Tcp,
QuicV1,
}
impl FromStr for Transport {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Ok(match s {
"tcp" => Self::Tcp,
"quic-v1" => Self::QuicV1,
other => bail!("unknown transport {other}"),
})
}
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format_timestamp_millis()
.init();
let opts = Opts::parse();
match opts {
Opts {
server_address: Some(server_address),
transport: None,
upload_bytes: None,
download_bytes: None,
run_server: true,
secret_key_seed: Some(secret_key_seed),
} => server(server_address, secret_key_seed).await?,
Opts {
server_address: Some(server_address),
transport: Some(transport),
upload_bytes,
download_bytes,
run_server: false,
secret_key_seed: None,
} => {
client(server_address, transport, upload_bytes, download_bytes).await?;
}
_ => panic!("invalid command line arguments: {opts:?}"),
};
Ok(())
}
async fn server(server_address: SocketAddr, secret_key_seed: u8) -> Result<()> {
let mut swarm = swarm::<libp2p_perf::server::Behaviour>(Some(secret_key_seed)).await?;
swarm.listen_on(
Multiaddr::empty()
.with(server_address.ip().into())
.with(Protocol::Tcp(server_address.port())),
)?;
swarm
.listen_on(
Multiaddr::empty()
.with(server_address.ip().into())
.with(Protocol::Udp(server_address.port()))
.with(Protocol::QuicV1),
)
.unwrap();
tokio::spawn(async move {
loop {
match swarm.next().await.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
info!("Listening on {address}");
}
SwarmEvent::IncomingConnection { .. } => {}
e @ SwarmEvent::IncomingConnectionError { .. } => {
error!("{e:?}");
}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::Behaviour(()) => {
info!("Finished run",)
}
e => panic!("{e:?}"),
}
}
})
.await
.unwrap();
Ok(())
}
async fn client(
server_address: SocketAddr,
transport: Transport,
upload_bytes: Option<usize>,
download_bytes: Option<usize>,
) -> Result<()> {
let server_address = match transport {
Transport::Tcp => Multiaddr::empty()
.with(server_address.ip().into())
.with(Protocol::Tcp(server_address.port())),
Transport::QuicV1 => Multiaddr::empty()
.with(server_address.ip().into())
.with(Protocol::Udp(server_address.port()))
.with(Protocol::QuicV1),
};
let benchmarks = if upload_bytes.is_some() {
vec![custom(
server_address,
RunParams {
to_send: upload_bytes.unwrap(),
to_receive: download_bytes.unwrap(),
},
)
.boxed()]
} else {
vec![
latency(server_address.clone()).boxed(),
throughput(server_address.clone()).boxed(),
requests_per_second(server_address.clone()).boxed(),
sequential_connections_per_second(server_address.clone()).boxed(),
]
};
tokio::spawn(async move {
for benchmark in benchmarks {
benchmark.await?;
}
anyhow::Ok(())
})
.await??;
Ok(())
}
async fn custom(server_address: Multiaddr, params: RunParams) -> Result<()> {
info!("start benchmark: custom");
let mut swarm = swarm(None).await?;
let (server_peer_id, connection_established) =
connect(&mut swarm, server_address.clone()).await?;
let RunDuration { upload, download } = perf(&mut swarm, server_peer_id, params).await?;
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct CustomResult {
connection_established_seconds: f64,
upload_seconds: f64,
download_seconds: f64,
}
println!(
"{}",
serde_json::to_string(&CustomResult {
connection_established_seconds: connection_established.as_secs_f64(),
upload_seconds: upload.as_secs_f64(),
download_seconds: download.as_secs_f64(),
})
.unwrap()
);
Ok(())
}
async fn latency(server_address: Multiaddr) -> Result<()> {
info!("start benchmark: round-trip-time latency");
let mut swarm = swarm(None).await?;
let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?;
let mut rounds = 0;
let start = Instant::now();
let mut latencies = Vec::new();
loop {
if start.elapsed() > Duration::from_secs(30) {
break;
}
let start = Instant::now();
perf(
&mut swarm,
server_peer_id,
RunParams {
to_send: 1,
to_receive: 1,
},
)
.await?;
latencies.push(start.elapsed().as_secs_f64());
rounds += 1;
}
latencies.sort_by(|a, b| a.partial_cmp(b).unwrap());
info!(
"Finished: {rounds} pings in {:.4}s",
start.elapsed().as_secs_f64()
);
info!("- {:.4} s median", percentile(&latencies, 0.50),);
info!("- {:.4} s 95th percentile\n", percentile(&latencies, 0.95),);
Ok(())
}
fn percentile<V: PartialOrd + Copy>(values: &[V], percentile: f64) -> V {
let n: usize = (values.len() as f64 * percentile).ceil() as usize - 1;
values[n]
}
async fn throughput(server_address: Multiaddr) -> Result<()> {
info!("start benchmark: single connection single channel throughput");
let mut swarm = swarm(None).await?;
let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?;
let params = RunParams {
to_send: 10 * 1024 * 1024,
to_receive: 10 * 1024 * 1024,
};
perf(&mut swarm, server_peer_id, params).await?;
Ok(())
}
async fn requests_per_second(server_address: Multiaddr) -> Result<()> {
info!("start benchmark: single connection parallel requests per second");
let mut swarm = swarm(None).await?;
let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?;
let num = 1_000;
let to_send = 1;
let to_receive = 1;
for _ in 0..num {
swarm.behaviour_mut().perf(
server_peer_id,
RunParams {
to_send,
to_receive,
},
)?;
}
let mut finished = 0;
let start = Instant::now();
loop {
match swarm.next().await.unwrap() {
SwarmEvent::Behaviour(libp2p_perf::client::Event {
id: _,
result: Ok(_),
}) => {
finished += 1;
if finished == num {
break;
}
}
e => panic!("{e:?}"),
}
}
let duration = start.elapsed().as_secs_f64();
let requests_per_second = num as f64 / duration;
info!(
"Finished: sent {num} {to_send} bytes requests with {to_receive} bytes response each within {duration:.2} s",
);
info!("- {requests_per_second:.2} req/s\n");
Ok(())
}
async fn sequential_connections_per_second(server_address: Multiaddr) -> Result<()> {
info!("start benchmark: sequential connections with single request per second");
let mut rounds = 0;
let to_send = 1;
let to_receive = 1;
let start = Instant::now();
let mut latency_connection_establishment = Vec::new();
let mut latency_connection_establishment_plus_request = Vec::new();
loop {
if start.elapsed() > Duration::from_secs(30) {
break;
}
let mut swarm = swarm(None).await?;
let start = Instant::now();
let (server_peer_id, _) = connect(&mut swarm, server_address.clone()).await?;
latency_connection_establishment.push(start.elapsed().as_secs_f64());
perf(
&mut swarm,
server_peer_id,
RunParams {
to_send,
to_receive,
},
)
.await?;
latency_connection_establishment_plus_request.push(start.elapsed().as_secs_f64());
rounds += 1;
}
let duration = start.elapsed().as_secs_f64();
latency_connection_establishment.sort_by(|a, b| a.partial_cmp(b).unwrap());
latency_connection_establishment_plus_request.sort_by(|a, b| a.partial_cmp(b).unwrap());
let connection_establishment_95th = percentile(&latency_connection_establishment, 0.95);
let connection_establishment_plus_request_95th =
percentile(&latency_connection_establishment_plus_request, 0.95);
info!(
"Finished: established {rounds} connections with one {to_send} bytes request and one {to_receive} bytes response within {duration:.2} s",
);
info!("- {connection_establishment_95th:.4} s 95th percentile connection establishment");
info!("- {connection_establishment_plus_request_95th:.4} s 95th percentile connection establishment + one request");
Ok(())
}
async fn swarm<B: NetworkBehaviour + Default>(secret_key_seed: Option<u8>) -> Result<Swarm<B>> {
let local_key = if let Some(seed) = secret_key_seed {
generate_ed25519(seed)
} else {
libp2p_identity::Keypair::generate_ed25519()
};
let local_peer_id = PeerId::from(local_key.public());
let transport = {
let tcp = libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_tls::Config::new(&local_key)?)
.multiplex(libp2p_yamux::Config::default());
let quic = {
let mut config = libp2p_quic::Config::new(&local_key);
config.support_draft_29 = true;
libp2p_quic::tokio::Transport::new(config)
};
let dns = libp2p_dns::TokioDnsConfig::system(OrTransport::new(quic, tcp))?;
dns.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed()
};
Ok(
SwarmBuilder::with_tokio_executor(transport, Default::default(), local_peer_id)
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.build(),
)
}
async fn connect(
swarm: &mut Swarm<libp2p_perf::client::Behaviour>,
server_address: Multiaddr,
) -> Result<(PeerId, Duration)> {
let start = Instant::now();
swarm.dial(server_address.clone()).unwrap();
let server_peer_id = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
e => panic!("{e:?}"),
}
};
let duration = start.elapsed();
let duration_seconds = duration.as_secs_f64();
info!("established connection in {duration_seconds:.4} s");
Ok((server_peer_id, duration))
}
async fn perf(
swarm: &mut Swarm<libp2p_perf::client::Behaviour>,
server_peer_id: PeerId,
params: RunParams,
) -> Result<RunDuration> {
swarm.behaviour_mut().perf(server_peer_id, params)?;
let duration = match swarm.next().await.unwrap() {
SwarmEvent::Behaviour(libp2p_perf::client::Event {
id: _,
result: Ok(duration),
}) => duration,
e => panic!("{e:?}"),
};
info!("{}", Run { params, duration });
Ok(duration)
}
fn generate_ed25519(secret_key_seed: u8) -> libp2p_identity::Keypair {
let mut bytes = [0u8; 32];
bytes[0] = secret_key_seed;
libp2p_identity::Keypair::ed25519_from_bytes(bytes).expect("only errors on wrong length")
}

View File

@ -18,45 +18,232 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod behaviour;
mod handler;
use instant::Duration;
use instant::Instant;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
collections::HashSet,
task::{Context, Poll},
};
pub use behaviour::{Behaviour, Event};
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_request_response as request_response;
use libp2p_swarm::{
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm,
NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
/// Parameters for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunParams {
pub to_send: usize,
pub to_receive: usize,
}
/// Timers for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunTimers {
pub write_start: Instant,
pub write_done: Instant,
pub read_done: Instant,
}
/// Statistics for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug)]
pub struct RunStats {
pub params: RunParams,
pub timers: RunTimers,
}
static NEXT_RUN_ID: AtomicUsize = AtomicUsize::new(1);
use crate::{protocol::Response, RunDuration, RunParams};
/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct RunId(usize);
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct RunId(request_response::RequestId);
impl RunId {
/// Returns the next available [`RunId`].
pub(crate) fn next() -> Self {
Self(NEXT_RUN_ID.fetch_add(1, Ordering::SeqCst))
impl From<request_response::RequestId> for RunId {
fn from(value: request_response::RequestId) -> Self {
Self(value)
}
}
#[derive(Debug)]
pub struct Event {
pub id: RunId,
pub result: Result<RunDuration, request_response::OutboundFailure>,
}
pub struct Behaviour {
connected: HashSet<PeerId>,
request_response: request_response::Behaviour<crate::protocol::Codec>,
}
impl Default for Behaviour {
fn default() -> Self {
let mut req_resp_config = request_response::Config::default();
req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5));
req_resp_config.set_request_timeout(Duration::from_secs(60 * 5));
Self {
connected: Default::default(),
request_response: request_response::Behaviour::new(
std::iter::once((
crate::PROTOCOL_NAME,
request_response::ProtocolSupport::Outbound,
)),
req_resp_config,
),
}
}
}
impl Behaviour {
pub fn new() -> Self {
Self::default()
}
pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result<RunId, PerfError> {
if !self.connected.contains(&server) {
return Err(PerfError::NotConnected);
}
let id = self.request_response.send_request(&server, params).into();
Ok(id)
}
}
#[derive(thiserror::Error, Debug)]
pub enum PerfError {
#[error("Not connected to peer")]
NotConnected,
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler =
<request_response::Behaviour<crate::protocol::Codec> as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = Event;
fn handle_pending_outbound_connection(
&mut self,
connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: libp2p_core::Endpoint,
) -> Result<Vec<Multiaddr>, libp2p_swarm::ConnectionDenied> {
self.request_response.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: libp2p_core::Endpoint,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
self.request_response
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}
fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), libp2p_swarm::ConnectionDenied> {
self.request_response.handle_pending_inbound_connection(
connection_id,
local_addr,
remote_addr,
)
}
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
self.request_response.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
self.connected.insert(peer_id);
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id: _,
endpoint: _,
handler: _,
remaining_established,
}) => {
if remaining_established == 0 {
assert!(self.connected.remove(&peer_id));
}
}
FromSwarm::AddressChange(_)
| FromSwarm::DialFailure(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrConfirmed(_)
| FromSwarm::ExternalAddrExpired(_) => {}
};
self.request_response.on_swarm_event(event);
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.request_response
.on_connection_handler_event(peer_id, connection_id, event);
}
fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
self.request_response.poll(cx, params).map(|to_swarm| {
to_swarm.map_out(|m| match m {
request_response::Event::Message {
peer: _,
message:
request_response::Message::Response {
request_id,
response: Response::Receiver(run_duration),
},
} => Event {
id: request_id.into(),
result: Ok(run_duration),
},
request_response::Event::Message {
peer: _,
message:
request_response::Message::Response {
response: Response::Sender(_),
..
},
} => unreachable!(),
request_response::Event::Message {
peer: _,
message: request_response::Message::Request { .. },
} => {
unreachable!()
}
request_response::Event::OutboundFailure {
peer: _,
request_id,
error,
} => Event {
id: request_id.into(),
result: Err(error),
},
request_response::Event::InboundFailure {
peer: _,
request_id: _,
error: _,
} => unreachable!(),
request_response::Event::ResponseSent { .. } => unreachable!(),
})
})
}
}

View File

@ -24,6 +24,9 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use std::fmt::Display;
use instant::Duration;
use libp2p_swarm::StreamProtocol;
pub mod client;
@ -31,3 +34,85 @@ mod protocol;
pub mod server;
pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0");
/// Parameters for a single run, i.e. one stream, sending and receiving data.
///
/// Property names are from the perspective of the actor. E.g. `to_send` is the amount of data to
/// send, both as the client and the server.
#[derive(Debug, Clone, Copy)]
pub struct RunParams {
pub to_send: usize,
pub to_receive: usize,
}
/// Duration for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunDuration {
pub upload: Duration,
pub download: Duration,
}
pub struct Run {
pub params: RunParams,
pub duration: RunDuration,
}
impl Display for Run {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const KILO: f64 = 1024.0;
const MEGA: f64 = KILO * 1024.0;
const GIGA: f64 = MEGA * 1024.0;
fn format_bytes(bytes: usize) -> String {
let bytes = bytes as f64;
if bytes >= GIGA {
format!("{:.2} GiB", bytes / GIGA)
} else if bytes >= MEGA {
format!("{:.2} MiB", bytes / MEGA)
} else if bytes >= KILO {
format!("{:.2} KiB", bytes / KILO)
} else {
format!("{} B", bytes)
}
}
fn format_bandwidth(duration: Duration, bytes: usize) -> String {
const KILO: f64 = 1024.0;
const MEGA: f64 = KILO * 1024.0;
const GIGA: f64 = MEGA * 1024.0;
let bandwidth = (bytes as f64 * 8.0) / duration.as_secs_f64();
if bandwidth >= GIGA {
format!("{:.2} Gbit/s", bandwidth / GIGA)
} else if bandwidth >= MEGA {
format!("{:.2} Mbit/s", bandwidth / MEGA)
} else if bandwidth >= KILO {
format!("{:.2} Kbit/s", bandwidth / KILO)
} else {
format!("{:.2} bit/s", bandwidth)
}
}
let Run {
params: RunParams {
to_send,
to_receive,
},
duration: RunDuration { upload, download },
} = self;
write!(
f,
"uploaded {} in {:.4} s ({}), downloaded {} in {:.4} s ({})",
format_bytes(*to_send),
upload.as_secs_f64(),
format_bandwidth(*upload, *to_send),
format_bytes(*to_receive),
download.as_secs_f64(),
format_bandwidth(*download, *to_receive),
)?;
Ok(())
}
}

View File

@ -18,189 +18,184 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use instant::Instant;
use async_trait::async_trait;
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use instant::Instant;
use libp2p_request_response as request_response;
use libp2p_swarm::StreamProtocol;
use std::io;
use crate::{client, server};
use crate::{RunDuration, RunParams};
const BUF: [u8; 1024] = [0; 1024];
const BUF: [u8; 65536] = [0; 64 << 10];
pub(crate) async fn send_receive<S: AsyncRead + AsyncWrite + Unpin>(
params: client::RunParams,
mut stream: S,
) -> Result<client::RunTimers, std::io::Error> {
let client::RunParams {
to_send,
to_receive,
} = params;
let mut receive_buf = vec![0; 1024];
stream.write_all(&(to_receive as u64).to_be_bytes()).await?;
let write_start = Instant::now();
let mut sent = 0;
while sent < to_send {
let n = std::cmp::min(to_send - sent, BUF.len());
let buf = &BUF[..n];
sent += stream.write(buf).await?;
}
stream.close().await?;
let write_done = Instant::now();
let mut received = 0;
while received < to_receive {
received += stream.read(&mut receive_buf).await?;
}
let read_done = Instant::now();
Ok(client::RunTimers {
write_start,
write_done,
read_done,
})
#[derive(Debug)]
pub enum Response {
Sender(usize),
Receiver(RunDuration),
}
pub(crate) async fn receive_send<S: AsyncRead + AsyncWrite + Unpin>(
mut stream: S,
) -> Result<server::RunStats, std::io::Error> {
let to_send = {
let mut buf = [0; 8];
stream.read_exact(&mut buf).await?;
#[derive(Default)]
pub struct Codec {
to_receive: Option<usize>,
u64::from_be_bytes(buf) as usize
};
let read_start = Instant::now();
let mut receive_buf = vec![0; 1024];
let mut received = 0;
loop {
let n = stream.read(&mut receive_buf).await?;
received += n;
if n == 0 {
break;
}
}
let read_done = Instant::now();
let mut sent = 0;
while sent < to_send {
let n = std::cmp::min(to_send - sent, BUF.len());
let buf = &BUF[..n];
sent += stream.write(buf).await?;
}
stream.close().await?;
let write_done = Instant::now();
Ok(server::RunStats {
params: server::RunParams { sent, received },
timers: server::RunTimers {
read_start,
read_done,
write_done,
},
})
write_start: Option<Instant>,
read_start: Option<Instant>,
read_done: Option<Instant>,
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::block_on, AsyncRead, AsyncWrite};
use std::{
pin::Pin,
sync::{Arc, Mutex},
task::Poll,
};
#[derive(Clone)]
struct DummyStream {
inner: Arc<Mutex<DummyStreamInner>>,
impl Clone for Codec {
fn clone(&self) -> Self {
Default::default()
}
}
struct DummyStreamInner {
read: Vec<u8>,
write: Vec<u8>,
}
#[async_trait]
impl request_response::Codec for Codec {
/// The type of protocol(s) or protocol versions being negotiated.
type Protocol = StreamProtocol;
/// The type of inbound and outbound requests.
type Request = RunParams;
/// The type of inbound and outbound responses.
type Response = Response;
impl DummyStream {
fn new(read: Vec<u8>) -> Self {
Self {
inner: Arc::new(Mutex::new(DummyStreamInner {
read,
write: Vec::new(),
})),
/// Reads a request from the given I/O stream according to the
/// negotiated protocol.
async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let mut receive_buf = vec![0; 64 << 10];
let to_send = {
let mut buf = [0; 8];
io.read_exact(&mut buf).await?;
u64::from_be_bytes(buf) as usize
};
let mut received = 0;
loop {
let n = io.read(&mut receive_buf).await?;
received += n;
if n == 0 {
break;
}
}
Ok(RunParams {
to_receive: received,
to_send,
})
}
impl Unpin for DummyStream {}
/// Reads a response from the given I/O stream according to the
/// negotiated protocol.
async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
assert!(self.write_start.is_some());
assert_eq!(self.read_start, None);
assert_eq!(self.read_done, None);
impl AsyncWrite for DummyStream {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf)
self.read_start = Some(Instant::now());
let mut receive_buf = vec![0; 64 << 10];
let mut received = 0;
loop {
let n = io.read(&mut receive_buf).await?;
received += n;
// Make sure to wait for the remote to close the stream. Otherwise with `to_receive` of `0`
// one does not measure the full round-trip of the previous write.
if n == 0 {
break;
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx)
}
self.read_done = Some(Instant::now());
fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx)
}
assert_eq!(received, self.to_receive.unwrap());
Ok(Response::Receiver(RunDuration {
upload: self
.read_start
.unwrap()
.duration_since(self.write_start.unwrap()),
download: self
.read_done
.unwrap()
.duration_since(self.read_start.unwrap()),
}))
}
impl AsyncRead for DummyStream {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len());
let new = self.inner.lock().unwrap().read.split_off(amt);
/// Writes a request to the given I/O stream according to the
/// negotiated protocol.
async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
assert_eq!(self.to_receive, None);
assert_eq!(self.write_start, None);
assert_eq!(self.read_start, None);
assert_eq!(self.read_done, None);
buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice());
self.write_start = Some(Instant::now());
self.inner.lock().unwrap().read = new;
Poll::Ready(Ok(amt))
let RunParams {
to_send,
to_receive,
} = req;
self.to_receive = Some(to_receive);
io.write_all(&(to_receive as u64).to_be_bytes()).await?;
let mut sent = 0;
while sent < to_send {
let n = std::cmp::min(to_send - sent, BUF.len());
let buf = &BUF[..n];
sent += io.write(buf).await?;
}
Ok(())
}
#[test]
fn test_client() {
let stream = DummyStream::new(vec![0]);
/// Writes a response to the given I/O stream according to the
/// negotiated protocol.
async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
response: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let to_send = match response {
Response::Sender(to_send) => to_send,
Response::Receiver(_) => unreachable!(),
};
block_on(send_receive(
client::RunParams {
to_send: 0,
to_receive: 0,
},
stream.clone(),
))
.unwrap();
let mut sent = 0;
while sent < to_send {
let n = std::cmp::min(to_send - sent, BUF.len());
let buf = &BUF[..n];
assert_eq!(
stream.inner.lock().unwrap().write,
0u64.to_be_bytes().to_vec()
);
sent += io.write(buf).await?;
}
Ok(())
}
}

View File

@ -18,31 +18,150 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod behaviour;
mod handler;
use std::task::{Context, Poll};
use instant::Instant;
use instant::Duration;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_request_response as request_response;
use libp2p_swarm::{
ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent,
ToSwarm,
};
pub use behaviour::{Behaviour, Event};
use crate::protocol::Response;
/// Parameters for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunParams {
pub sent: usize,
pub received: usize,
pub struct Behaviour {
request_response: request_response::Behaviour<crate::protocol::Codec>,
}
/// Timers for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunTimers {
pub read_start: Instant,
pub read_done: Instant,
pub write_done: Instant,
impl Default for Behaviour {
fn default() -> Self {
let mut req_resp_config = request_response::Config::default();
req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5));
req_resp_config.set_request_timeout(Duration::from_secs(60 * 5));
Self {
request_response: request_response::Behaviour::new(
std::iter::once((
crate::PROTOCOL_NAME,
request_response::ProtocolSupport::Inbound,
)),
req_resp_config,
),
}
}
}
/// Statistics for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug)]
pub struct RunStats {
pub params: RunParams,
pub timers: RunTimers,
impl Behaviour {
pub fn new() -> Self {
Self::default()
}
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler =
<request_response::Behaviour<crate::protocol::Codec> as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = ();
fn handle_pending_outbound_connection(
&mut self,
connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: libp2p_core::Endpoint,
) -> Result<Vec<Multiaddr>, libp2p_swarm::ConnectionDenied> {
self.request_response.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: libp2p_core::Endpoint,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
self.request_response
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}
fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), libp2p_swarm::ConnectionDenied> {
self.request_response.handle_pending_inbound_connection(
connection_id,
local_addr,
remote_addr,
)
}
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
self.request_response.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.request_response.on_swarm_event(event);
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.request_response
.on_connection_handler_event(peer_id, connection_id, event);
}
fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
self.request_response.poll(cx, params).map(|to_swarm| {
to_swarm.map_out(|m| match m {
request_response::Event::Message {
peer: _,
message: request_response::Message::Response { .. },
} => {
unreachable!()
}
request_response::Event::Message {
peer: _,
message:
request_response::Message::Request {
request_id: _,
request,
channel,
},
} => {
let _ = self
.request_response
.send_response(channel, Response::Sender(request.to_send));
}
request_response::Event::OutboundFailure { .. } => unreachable!(),
request_response::Event::InboundFailure { .. } => {}
request_response::Event::ResponseSent { .. } => {}
})
})
}
}