diff --git a/Cargo.lock b/Cargo.lock index b1e414ea..014cc268 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2001,7 +2001,6 @@ name = "interop-tests" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "either", "env_logger 0.9.3", "futures", diff --git a/interop-tests/Cargo.toml b/interop-tests/Cargo.toml index ab16ff41..cae0bb3b 100644 --- a/interop-tests/Cargo.toml +++ b/interop-tests/Cargo.toml @@ -6,15 +6,13 @@ publish = false [dependencies] anyhow = "1" -async-trait = "0.1.58" either = "1.8.0" env_logger = "0.9.0" futures = "0.3.26" if-addrs = "0.8.0" -libp2p = { path = "../", default_features = false, features = ["websocket", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros", "webrtc"] } +libp2p = { path = "../", features = ["websocket", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros", "webrtc"] } log = "0.4" rand = "0.8.5" redis = { version = "0.22.1", features = ["tokio-native-tls-comp", "tokio-comp"] } strum = { version = "0.24.1", features = ["derive"] } tokio = { version = "1.24.1", features = ["full"] } - diff --git a/interop-tests/src/bin/ping.rs b/interop-tests/src/bin/ping.rs index 197bbe6e..60dc8a0b 100644 --- a/interop-tests/src/bin/ping.rs +++ b/interop-tests/src/bin/ping.rs @@ -5,96 +5,20 @@ use std::time::Duration; use anyhow::{Context, Result}; use either::Either; use env_logger::{Env, Target}; -use futures::{AsyncRead, AsyncWrite, StreamExt}; +use futures::{future, AsyncRead, AsyncWrite, StreamExt}; use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::transport::Boxed; +use libp2p::core::upgrade::{MapInboundUpgrade, MapOutboundUpgrade, Version}; +use libp2p::noise::{NoiseOutput, X25519Spec, XX}; use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; +use libp2p::tls::TlsStream; use libp2p::websocket::WsConfig; use libp2p::{ - core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport as _, + identity, mplex, noise, ping, quic, tcp, tls, webrtc, yamux, InboundUpgradeExt, Multiaddr, + OutboundUpgradeExt, PeerId, Swarm, Transport as _, }; use redis::AsyncCommands; use strum::EnumString; -/// Supported transports by rust-libp2p. -#[derive(Clone, Debug, EnumString)] -#[strum(serialize_all = "kebab-case")] -pub enum Transport { - Tcp, - QuicV1, - Webrtc, - Ws, -} - -/// Supported stream multiplexers by rust-libp2p. -#[derive(Clone, Debug, EnumString)] -#[strum(serialize_all = "kebab-case")] -pub enum Muxer { - Mplex, - Yamux, -} - -/// Supported security protocols by rust-libp2p. -#[derive(Clone, Debug, EnumString)] -#[strum(serialize_all = "kebab-case")] -pub enum SecProtocol { - Noise, - Tls, -} - -#[derive(NetworkBehaviour)] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, -} - -/// Helper function to get a ENV variable into an test parameter like `Transport`. -pub fn from_env(env_var: &str) -> Result -where - T: FromStr, - T::Err: std::error::Error + Send + Sync + 'static, -{ - env::var(env_var) - .with_context(|| format!("{env_var} environment variable is not set"))? - .parse() - .map_err(Into::into) -} - -/// Build the Tcp and Ws transports multiplexer and security protocol. -fn build_builder( - builder: core::transport::upgrade::Builder, - secure_channel_param: SecProtocol, - muxer_param: Muxer, - local_key: &identity::Keypair, -) -> Boxed<(libp2p::PeerId, StreamMuxerBox)> -where - T: libp2p::Transport + Send + Unpin + 'static, - ::Error: Sync + Send + 'static, - ::ListenerUpgrade: Send, - ::Dial: Send, - C: AsyncRead + AsyncWrite + Send + Unpin + 'static, -{ - let mux_upgrade = match muxer_param { - Muxer::Yamux => Either::Left(yamux::YamuxConfig::default()), - Muxer::Mplex => Either::Right(mplex::MplexConfig::default()), - }; - - let timeout = Duration::from_secs(5); - - match secure_channel_param { - SecProtocol::Noise => builder - .authenticate(noise::NoiseAuthenticated::xx(local_key).unwrap()) - .multiplex(mux_upgrade) - .timeout(timeout) - .boxed(), - SecProtocol::Tls => builder - .authenticate(libp2p::tls::Config::new(local_key).unwrap()) - .multiplex(mux_upgrade) - .timeout(timeout) - .boxed(), - } -} - #[tokio::main] async fn main() -> Result<()> { let local_key = identity::Keypair::generate_ed25519(); @@ -120,42 +44,30 @@ async fn main() -> Result<()> { // Build the transport from the passed ENV var. let (boxed_transport, local_addr) = match transport_param { - Transport::QuicV1 => { - let builder = - libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(&local_key)) - .map(|(p, c), _| (p, StreamMuxerBox::new(c))); - (builder.boxed(), format!("/ip4/{ip}/udp/0/quic-v1")) - } - Transport::Tcp => { - let builder = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new()) - .upgrade(libp2p::core::upgrade::Version::V1Lazy); - - let secure_channel_param: SecProtocol = - from_env("security").context("unsupported secure channel")?; - - let muxer_param: Muxer = from_env("muxer").context("unsupported multiplexer")?; - - ( - build_builder(builder, secure_channel_param, muxer_param, &local_key), - format!("/ip4/{ip}/tcp/0"), - ) - } - Transport::Ws => { - let builder = WsConfig::new(libp2p::tcp::tokio::Transport::new( - libp2p::tcp::Config::new(), - )) - .upgrade(libp2p::core::upgrade::Version::V1Lazy); - - let secure_channel_param: SecProtocol = - from_env("security").context("unsupported secure channel")?; - - let muxer_param: Muxer = from_env("muxer").context("unsupported multiplexer")?; - - ( - build_builder(builder, secure_channel_param, muxer_param, &local_key), - format!("/ip4/{ip}/tcp/0/ws"), - ) - } + Transport::QuicV1 => ( + quic::tokio::Transport::new(quic::Config::new(&local_key)) + .map(|(p, c), _| (p, StreamMuxerBox::new(c))) + .boxed(), + format!("/ip4/{ip}/udp/0/quic-v1"), + ), + Transport::Tcp => ( + tcp::tokio::Transport::new(tcp::Config::new()) + .upgrade(Version::V1Lazy) + .authenticate(secure_channel_protocol_from_env(&local_key)?) + .multiplex(muxer_protocol_from_env()?) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0"), + ), + Transport::Ws => ( + WsConfig::new(tcp::tokio::Transport::new(tcp::Config::new())) + .upgrade(Version::V1Lazy) + .authenticate(secure_channel_protocol_from_env(&local_key)?) + .multiplex(muxer_protocol_from_env()?) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0/ws"), + ), Transport::Webrtc => ( webrtc::tokio::Transport::new( local_key, @@ -246,3 +158,93 @@ async fn main() -> Result<()> { Ok(()) } + +fn secure_channel_protocol_from_env( + identity: &identity::Keypair, +) -> Result< + MapOutboundUpgrade< + MapInboundUpgrade< + Either, tls::Config>, + MapSecOutputFn, + >, + MapSecOutputFn, + >, +> { + let either_sec_upgrade = match from_env("security").context("unsupported secure channel")? { + SecProtocol::Noise => Either::Left( + noise::NoiseAuthenticated::xx(identity).context("failed to intialise noise")?, + ), + SecProtocol::Tls => { + Either::Right(tls::Config::new(identity).context("failed to initialise tls")?) + } + }; + + Ok(either_sec_upgrade + .map_inbound(factor_peer_id as MapSecOutputFn) + .map_outbound(factor_peer_id as MapSecOutputFn)) +} + +type SecOutput = future::Either<(PeerId, NoiseOutput), (PeerId, TlsStream)>; +type MapSecOutputFn = fn(SecOutput) -> (PeerId, future::Either, TlsStream>); + +fn factor_peer_id( + output: SecOutput, +) -> (PeerId, future::Either, TlsStream>) { + match output { + future::Either::Left((peer, stream)) => (peer, future::Either::Left(stream)), + future::Either::Right((peer, stream)) => (peer, future::Either::Right(stream)), + } +} + +fn muxer_protocol_from_env() -> Result> { + Ok( + match from_env("muxer").context("unsupported multiplexer")? { + Muxer::Yamux => Either::Left(yamux::YamuxConfig::default()), + Muxer::Mplex => Either::Right(mplex::MplexConfig::new()), + }, + ) +} + +/// Supported transports by rust-libp2p. +#[derive(Clone, Debug, EnumString)] +#[strum(serialize_all = "kebab-case")] +pub enum Transport { + Tcp, + QuicV1, + Webrtc, + Ws, +} + +/// Supported stream multiplexers by rust-libp2p. +#[derive(Clone, Debug, EnumString)] +#[strum(serialize_all = "kebab-case")] +pub enum Muxer { + Mplex, + Yamux, +} + +/// Supported security protocols by rust-libp2p. +#[derive(Clone, Debug, EnumString)] +#[strum(serialize_all = "kebab-case")] +pub enum SecProtocol { + Noise, + Tls, +} + +#[derive(NetworkBehaviour)] +struct Behaviour { + ping: ping::Behaviour, + keep_alive: keep_alive::Behaviour, +} + +/// Helper function to get a ENV variable into an test parameter like `Transport`. +pub fn from_env(env_var: &str) -> Result +where + T: FromStr, + T::Err: std::error::Error + Send + Sync + 'static, +{ + env::var(env_var) + .with_context(|| format!("{env_var} environment variable is not set"))? + .parse() + .map_err(Into::into) +}