feat(perf): implement libp2p perf protocol

Implementation of the libp2p perf protocol according to https://github.com/libp2p/specs/pull/478/.

//CC @MarcoPolo  as the author of the specification.

**Don't (yet) expect this to produce reliable performance metrics.**

Pull-Request: #3508.
This commit is contained in:
Max Inden
2023-03-19 19:20:42 +01:00
committed by GitHub
parent 8f271caec3
commit 9d1116fd5d
18 changed files with 1411 additions and 1 deletions

View File

@ -0,0 +1,140 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use anyhow::{bail, Result};
use clap::Parser;
use futures::{future::Either, StreamExt};
use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, Transport};
use libp2p_dns::DnsConfig;
use libp2p_identity::PeerId;
use libp2p_perf::client::RunParams;
use libp2p_swarm::{SwarmBuilder, SwarmEvent};
use log::info;
#[derive(Debug, Parser)]
#[clap(name = "libp2p perf client")]
struct Opts {
#[arg(long)]
server_address: Multiaddr,
}
#[async_std::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opts = Opts::parse();
info!("Initiating performance tests with {}", opts.server_address);
// Create a random PeerId
let local_key = libp2p_identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
let transport = {
let tcp =
libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true))
.upgrade(upgrade::Version::V1Lazy)
.authenticate(
libp2p_noise::NoiseAuthenticated::xx(&local_key)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p_yamux::YamuxConfig::default());
let quic = {
let mut config = libp2p_quic::Config::new(&local_key);
config.support_draft_29 = true;
libp2p_quic::async_std::Transport::new(config)
};
let dns = DnsConfig::system(OrTransport::new(quic, tcp))
.await
.unwrap();
dns.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed()
};
let mut swarm = SwarmBuilder::with_async_std_executor(
transport,
libp2p_perf::client::Behaviour::default(),
local_peer_id,
)
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.build();
swarm.dial(opts.server_address.clone()).unwrap();
let server_peer_id = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
e => panic!("{e:?}"),
}
};
info!(
"Connection to {} established. Launching benchmarks.",
opts.server_address
);
swarm.behaviour_mut().perf(
server_peer_id,
RunParams {
to_send: 10 * 1024 * 1024,
to_receive: 10 * 1024 * 1024,
},
)?;
let stats = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?,
e => panic!("{e:?}"),
}
};
let sent_mebibytes = stats.params.to_send as f64 / 1024.0 / 1024.0;
let sent_time = (stats.timers.write_done - stats.timers.write_start).as_secs_f64();
let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time;
let received_mebibytes = stats.params.to_receive as f64 / 1024.0 / 1024.0;
let receive_time = (stats.timers.read_done - stats.timers.write_done).as_secs_f64();
let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time;
info!(
"Finished run: Sent {sent_mebibytes:.2} MiB in {sent_time:.2} s with \
{sent_bandwidth_mebibit_second:.2} MiBit/s and received \
{received_mebibytes:.2} MiB in {receive_time:.2} s with \
{receive_bandwidth_mebibit_second:.2} MiBit/s",
);
Ok(())
}

View File

@ -0,0 +1,128 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use clap::Parser;
use futures::{future::Either, StreamExt};
use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Transport};
use libp2p_dns::DnsConfig;
use libp2p_identity::PeerId;
use libp2p_swarm::{SwarmBuilder, SwarmEvent};
use log::{error, info};
#[derive(Debug, Parser)]
#[clap(name = "libp2p perf server")]
struct Opts {}
#[async_std::main]
async fn main() {
env_logger::init();
let _opts = Opts::parse();
// Create a random PeerId
let local_key = libp2p_identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {local_peer_id}");
let transport = {
let tcp =
libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true))
.upgrade(upgrade::Version::V1Lazy)
.authenticate(
libp2p_noise::NoiseAuthenticated::xx(&local_key)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p_yamux::YamuxConfig::default());
let quic = {
let mut config = libp2p_quic::Config::new(&local_key);
config.support_draft_29 = true;
libp2p_quic::async_std::Transport::new(config)
};
let dns = DnsConfig::system(OrTransport::new(quic, tcp))
.await
.unwrap();
dns.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed()
};
let mut swarm = SwarmBuilder::with_async_std_executor(
transport,
libp2p_perf::server::Behaviour::default(),
local_peer_id,
)
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.build();
swarm
.listen_on("/ip4/0.0.0.0/tcp/4001".parse().unwrap())
.unwrap();
swarm
.listen_on("/ip4/0.0.0.0/udp/4001/quic-v1".parse().unwrap())
.unwrap();
loop {
match swarm.next().await.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
info!("Listening on {address}");
}
SwarmEvent::IncomingConnection { .. } => {}
e @ SwarmEvent::IncomingConnectionError { .. } => {
error!("{e:?}");
}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::Behaviour(libp2p_perf::server::Event {
remote_peer_id,
stats,
}) => {
let received_mebibytes = stats.params.received as f64 / 1024.0 / 1024.0;
let receive_time = (stats.timers.read_done - stats.timers.read_start).as_secs_f64();
let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time;
let sent_mebibytes = stats.params.sent as f64 / 1024.0 / 1024.0;
let sent_time = (stats.timers.write_done - stats.timers.read_done).as_secs_f64();
let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time;
info!(
"Finished run with {}: Received {:.2} MiB in {:.2} s with {:.2} MiBit/s and sent {:.2} MiB in {:.2} s with {:.2} MiBit/s",
remote_peer_id,
received_mebibytes,
receive_time,
receive_bandwidth_mebibit_second,
sent_mebibytes,
sent_time,
sent_bandwidth_mebibit_second,
)
}
e => panic!("{e:?}"),
}
}
}

View File

@ -0,0 +1,62 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod behaviour;
mod handler;
use instant::Instant;
use std::sync::atomic::{AtomicUsize, Ordering};
pub use behaviour::{Behaviour, Event};
/// Parameters for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunParams {
pub to_send: usize,
pub to_receive: usize,
}
/// Timers for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunTimers {
pub write_start: Instant,
pub write_done: Instant,
pub read_done: Instant,
}
/// Statistics for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug)]
pub struct RunStats {
pub params: RunParams,
pub timers: RunTimers,
}
static NEXT_RUN_ID: AtomicUsize = AtomicUsize::new(1);
/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct RunId(usize);
impl RunId {
/// Returns the next available [`RunId`].
pub(crate) fn next() -> Self {
Self(NEXT_RUN_ID.fetch_add(1, Ordering::SeqCst))
}
}

View File

@ -0,0 +1,158 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! [`NetworkBehaviour`] of the libp2p perf client protocol.
use std::{
collections::{HashSet, VecDeque},
task::{Context, Poll},
};
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_swarm::{
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr,
ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters, THandlerInEvent, THandlerOutEvent,
};
use void::Void;
use crate::client::handler::Handler;
use super::{RunId, RunParams, RunStats};
#[derive(Debug)]
pub struct Event {
pub id: RunId,
pub result: Result<RunStats, ConnectionHandlerUpgrErr<Void>>,
}
#[derive(Default)]
pub struct Behaviour {
/// Queue of actions to return when polled.
queued_events: VecDeque<NetworkBehaviourAction<Event, THandlerInEvent<Self>>>,
/// Set of connected peers.
connected: HashSet<PeerId>,
}
impl Behaviour {
pub fn new() -> Self {
Self::default()
}
pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result<RunId, PerfError> {
if !self.connected.contains(&server) {
return Err(PerfError::NotConnected);
}
let id = RunId::next();
self.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: server,
handler: NotifyHandler::Any,
event: crate::client::handler::Command { id, params },
});
Ok(id)
}
}
#[derive(thiserror::Error, Debug)]
pub enum PerfError {
#[error("Not connected to peer")]
NotConnected,
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type OutEvent = Event;
fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: libp2p_core::Endpoint,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
Ok(Handler::default())
}
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
Ok(Handler::default())
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
self.connected.insert(peer_id);
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id: _,
endpoint: _,
handler: _,
remaining_established,
}) => {
if remaining_established == 0 {
assert!(self.connected.remove(&peer_id));
}
}
FromSwarm::AddressChange(_)
| FromSwarm::DialFailure(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {}
}
}
fn on_connection_handler_event(
&mut self,
_event_source: PeerId,
_connection_id: ConnectionId,
super::handler::Event { id, result }: THandlerOutEvent<Self>,
) {
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(Event { id, result }));
}
fn poll(
&mut self,
_cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}

View File

@ -0,0 +1,196 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::{
collections::VecDeque,
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt};
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
},
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use void::Void;
use super::{RunId, RunParams, RunStats};
#[derive(Debug)]
pub struct Command {
pub(crate) id: RunId,
pub(crate) params: RunParams,
}
#[derive(Debug)]
pub struct Event {
pub(crate) id: RunId,
pub(crate) result: Result<RunStats, ConnectionHandlerUpgrErr<Void>>,
}
pub struct Handler {
/// Queue of events to return when polled.
queued_events: VecDeque<
ConnectionHandlerEvent<
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutEvent,
<Self as ConnectionHandler>::Error,
>,
>,
outbound: FuturesUnordered<BoxFuture<'static, Result<Event, std::io::Error>>>,
keep_alive: KeepAlive,
}
impl Handler {
pub fn new() -> Self {
Self {
queued_events: Default::default(),
outbound: Default::default(),
keep_alive: KeepAlive::Yes,
}
}
}
impl Default for Handler {
fn default() -> Self {
Self::new()
}
}
impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = ReadyUpgrade<&'static [u8]>;
type OutboundOpenInfo = Command;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(DeniedUpgrade, ())
}
fn on_behaviour_event(&mut self, command: Self::InEvent) {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), command),
})
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol, ..
}) => void::unreachable(protocol),
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info: Command { params, id },
}) => self.outbound.push(
crate::protocol::send_receive(params, protocol)
.map_ok(move |timers| Event {
id,
result: Ok(RunStats { params, timers }),
})
.boxed(),
),
ConnectionEvent::AddressChange(_) => {}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
info: Command { id, .. },
error,
}) => self
.queued_events
.push_back(ConnectionHandlerEvent::Custom(Event {
id,
result: Err(error),
})),
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
match error {
ConnectionHandlerUpgrErr::Timeout => {}
ConnectionHandlerUpgrErr::Timer => {}
ConnectionHandlerUpgrErr::Upgrade(error) => match error {
libp2p_core::UpgradeError::Select(_) => {}
libp2p_core::UpgradeError::Apply(v) => void::unreachable(v),
},
}
}
}
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
// Return queued events.
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
}
while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) {
match result {
Ok(event) => return Poll::Ready(ConnectionHandlerEvent::Custom(event)),
Err(e) => {
panic!("{e:?}")
}
}
}
if self.outbound.is_empty() {
match self.keep_alive {
KeepAlive::Yes => {
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10));
}
KeepAlive::Until(_) => {}
KeepAlive::No => panic!("Handler never sets KeepAlive::No."),
}
} else {
self.keep_alive = KeepAlive::Yes
}
Poll::Pending
}
}

31
protocols/perf/src/lib.rs Normal file
View File

@ -0,0 +1,31 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of the [libp2p perf protocol](https://github.com/libp2p/specs/pull/478/).
//!
//! Do not use in untrusted environments.
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub mod client;
mod protocol;
pub mod server;
pub const PROTOCOL_NAME: &[u8; 11] = b"/perf/1.0.0";

View File

@ -0,0 +1,206 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use instant::Instant;
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::{client, server};
const BUF: [u8; 1024] = [0; 1024];
pub async fn send_receive<S: AsyncRead + AsyncWrite + Unpin>(
params: client::RunParams,
mut stream: S,
) -> Result<client::RunTimers, std::io::Error> {
let client::RunParams {
to_send,
to_receive,
} = params;
let mut receive_buf = vec![0; 1024];
stream.write_all(&(to_receive as u64).to_be_bytes()).await?;
let write_start = Instant::now();
let mut sent = 0;
while sent < to_send {
let n = std::cmp::min(to_send - sent, BUF.len());
let buf = &BUF[..n];
sent += stream.write(buf).await?;
}
stream.close().await?;
let write_done = Instant::now();
let mut received = 0;
while received < to_receive {
received += stream.read(&mut receive_buf).await?;
}
let read_done = Instant::now();
Ok(client::RunTimers {
write_start,
write_done,
read_done,
})
}
pub async fn receive_send<S: AsyncRead + AsyncWrite + Unpin>(
mut stream: S,
) -> Result<server::RunStats, std::io::Error> {
let to_send = {
let mut buf = [0; 8];
stream.read_exact(&mut buf).await?;
u64::from_be_bytes(buf) as usize
};
let read_start = Instant::now();
let mut receive_buf = vec![0; 1024];
let mut received = 0;
loop {
let n = stream.read(&mut receive_buf).await?;
received += n;
if n == 0 {
break;
}
}
let read_done = Instant::now();
let mut sent = 0;
while sent < to_send {
let n = std::cmp::min(to_send - sent, BUF.len());
let buf = &BUF[..n];
sent += stream.write(buf).await?;
}
stream.close().await?;
let write_done = Instant::now();
Ok(server::RunStats {
params: server::RunParams { sent, received },
timers: server::RunTimers {
read_start,
read_done,
write_done,
},
})
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::block_on, AsyncRead, AsyncWrite};
use std::{
pin::Pin,
sync::{Arc, Mutex},
task::Poll,
};
#[derive(Clone)]
struct DummyStream {
inner: Arc<Mutex<DummyStreamInner>>,
}
struct DummyStreamInner {
read: Vec<u8>,
write: Vec<u8>,
}
impl DummyStream {
fn new(read: Vec<u8>) -> Self {
Self {
inner: Arc::new(Mutex::new(DummyStreamInner {
read,
write: Vec::new(),
})),
}
}
}
impl Unpin for DummyStream {}
impl AsyncWrite for DummyStream {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx)
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx)
}
}
impl AsyncRead for DummyStream {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len());
let new = self.inner.lock().unwrap().read.split_off(amt);
buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice());
self.inner.lock().unwrap().read = new;
Poll::Ready(Ok(amt))
}
}
#[test]
fn test_client() {
let stream = DummyStream::new(vec![0]);
block_on(send_receive(
client::RunParams {
to_send: 0,
to_receive: 0,
},
stream.clone(),
))
.unwrap();
assert_eq!(
stream.inner.lock().unwrap().write,
0u64.to_be_bytes().to_vec()
);
}
}

View File

@ -0,0 +1,48 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod behaviour;
mod handler;
use instant::Instant;
pub use behaviour::{Behaviour, Event};
/// Parameters for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunParams {
pub sent: usize,
pub received: usize,
}
/// Timers for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug, Clone, Copy)]
pub struct RunTimers {
pub read_start: Instant,
pub read_done: Instant,
pub write_done: Instant,
}
/// Statistics for a single run, i.e. one stream, sending and receiving data.
#[derive(Debug)]
pub struct RunStats {
pub params: RunParams,
pub timers: RunTimers,
}

View File

@ -0,0 +1,121 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! [`NetworkBehaviour`] of the libp2p perf server protocol.
use std::{
collections::VecDeque,
task::{Context, Poll},
};
use libp2p_identity::PeerId;
use libp2p_swarm::{
ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
THandlerInEvent, THandlerOutEvent,
};
use crate::server::handler::Handler;
use super::RunStats;
#[derive(Debug)]
pub struct Event {
pub remote_peer_id: PeerId,
pub stats: RunStats,
}
#[derive(Default)]
pub struct Behaviour {
/// Queue of actions to return when polled.
queued_events: VecDeque<NetworkBehaviourAction<Event, THandlerInEvent<Self>>>,
}
impl Behaviour {
pub fn new() -> Self {
Self::default()
}
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type OutEvent = Event;
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &libp2p_core::Multiaddr,
_remote_addr: &libp2p_core::Multiaddr,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
Ok(Handler::default())
}
fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &libp2p_core::Multiaddr,
_role_override: libp2p_core::Endpoint,
) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
Ok(Handler::default())
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(_) => {}
FromSwarm::ConnectionClosed(_) => {}
FromSwarm::AddressChange(_) => {}
FromSwarm::DialFailure(_) => {}
FromSwarm::ListenFailure(_) => {}
FromSwarm::NewListener(_) => {}
FromSwarm::NewListenAddr(_) => {}
FromSwarm::ExpiredListenAddr(_) => {}
FromSwarm::ListenerError(_) => {}
FromSwarm::ListenerClosed(_) => {}
FromSwarm::NewExternalAddr(_) => {}
FromSwarm::ExpiredExternalAddr(_) => {}
}
}
fn on_connection_handler_event(
&mut self,
event_source: PeerId,
_connection_id: ConnectionId,
super::handler::Event { stats }: THandlerOutEvent<Self>,
) {
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(Event {
remote_peer_id: event_source,
stats,
}))
}
fn poll(
&mut self,
_cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}

View File

@ -0,0 +1,159 @@
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
},
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use log::error;
use void::Void;
use super::RunStats;
#[derive(Debug)]
pub struct Event {
pub stats: RunStats,
}
pub struct Handler {
inbound: FuturesUnordered<BoxFuture<'static, Result<RunStats, std::io::Error>>>,
keep_alive: KeepAlive,
}
impl Handler {
pub fn new() -> Self {
Self {
inbound: Default::default(),
keep_alive: KeepAlive::Yes,
}
}
}
impl Default for Handler {
fn default() -> Self {
Self::new()
}
}
impl ConnectionHandler for Handler {
type InEvent = Void;
type OutEvent = Event;
type Error = Void;
type InboundProtocol = ReadyUpgrade<&'static [u8]>;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ())
}
fn on_behaviour_event(&mut self, v: Self::InEvent) {
void::unreachable(v)
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol,
info: _,
}) => {
self.inbound
.push(crate::protocol::receive_send(protocol).boxed());
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => {
void::unreachable(info)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, .. }) => {
void::unreachable(info)
}
ConnectionEvent::AddressChange(_) => {}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
match error {
ConnectionHandlerUpgrErr::Timeout => {}
ConnectionHandlerUpgrErr::Timer => {}
ConnectionHandlerUpgrErr::Upgrade(error) => match error {
libp2p_core::UpgradeError::Select(_) => {}
libp2p_core::UpgradeError::Apply(v) => void::unreachable(v),
},
}
}
}
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
match result {
Ok(stats) => return Poll::Ready(ConnectionHandlerEvent::Custom(Event { stats })),
Err(e) => {
error!("{e:?}")
}
}
}
if self.inbound.is_empty() {
match self.keep_alive {
KeepAlive::Yes => {
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10));
}
KeepAlive::Until(_) => {}
KeepAlive::No => panic!("Handler never sets KeepAlive::No."),
}
} else {
self.keep_alive = KeepAlive::Yes
}
Poll::Pending
}
}