refactor(inter-op): misc clean-up (#3409)

This patch contains misc cleanup that I did as part of reading through the code to understand how it works.
This commit is contained in:
Thomas Eizinger
2023-02-02 09:20:25 +11:00
committed by GitHub
parent b98b03eb7e
commit b7fa2bccdf
3 changed files with 121 additions and 122 deletions

1
Cargo.lock generated
View File

@ -2001,7 +2001,6 @@ name = "interop-tests"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"either", "either",
"env_logger 0.9.3", "env_logger 0.9.3",
"futures", "futures",

View File

@ -6,15 +6,13 @@ publish = false
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
async-trait = "0.1.58"
either = "1.8.0" either = "1.8.0"
env_logger = "0.9.0" env_logger = "0.9.0"
futures = "0.3.26" futures = "0.3.26"
if-addrs = "0.8.0" if-addrs = "0.8.0"
libp2p = { path = "../", default_features = false, features = ["websocket", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros", "webrtc"] } libp2p = { path = "../", features = ["websocket", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros", "webrtc"] }
log = "0.4" log = "0.4"
rand = "0.8.5" rand = "0.8.5"
redis = { version = "0.22.1", features = ["tokio-native-tls-comp", "tokio-comp"] } redis = { version = "0.22.1", features = ["tokio-native-tls-comp", "tokio-comp"] }
strum = { version = "0.24.1", features = ["derive"] } strum = { version = "0.24.1", features = ["derive"] }
tokio = { version = "1.24.1", features = ["full"] } tokio = { version = "1.24.1", features = ["full"] }

View File

@ -5,96 +5,20 @@ use std::time::Duration;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use either::Either; use either::Either;
use env_logger::{Env, Target}; use env_logger::{Env, Target};
use futures::{AsyncRead, AsyncWrite, StreamExt}; use futures::{future, AsyncRead, AsyncWrite, StreamExt};
use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed; use libp2p::core::upgrade::{MapInboundUpgrade, MapOutboundUpgrade, Version};
use libp2p::noise::{NoiseOutput, X25519Spec, XX};
use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent};
use libp2p::tls::TlsStream;
use libp2p::websocket::WsConfig; use libp2p::websocket::WsConfig;
use libp2p::{ use libp2p::{
core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport as _, identity, mplex, noise, ping, quic, tcp, tls, webrtc, yamux, InboundUpgradeExt, Multiaddr,
OutboundUpgradeExt, PeerId, Swarm, Transport as _,
}; };
use redis::AsyncCommands; use redis::AsyncCommands;
use strum::EnumString; use strum::EnumString;
/// Supported transports by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Transport {
Tcp,
QuicV1,
Webrtc,
Ws,
}
/// Supported stream multiplexers by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Muxer {
Mplex,
Yamux,
}
/// Supported security protocols by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum SecProtocol {
Noise,
Tls,
}
#[derive(NetworkBehaviour)]
struct Behaviour {
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
}
/// Helper function to get a ENV variable into an test parameter like `Transport`.
pub fn from_env<T>(env_var: &str) -> Result<T>
where
T: FromStr,
T::Err: std::error::Error + Send + Sync + 'static,
{
env::var(env_var)
.with_context(|| format!("{env_var} environment variable is not set"))?
.parse()
.map_err(Into::into)
}
/// Build the Tcp and Ws transports multiplexer and security protocol.
fn build_builder<T, C>(
builder: core::transport::upgrade::Builder<T>,
secure_channel_param: SecProtocol,
muxer_param: Muxer,
local_key: &identity::Keypair,
) -> Boxed<(libp2p::PeerId, StreamMuxerBox)>
where
T: libp2p::Transport<Output = C> + Send + Unpin + 'static,
<T as libp2p::Transport>::Error: Sync + Send + 'static,
<T as libp2p::Transport>::ListenerUpgrade: Send,
<T as libp2p::Transport>::Dial: Send,
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
let mux_upgrade = match muxer_param {
Muxer::Yamux => Either::Left(yamux::YamuxConfig::default()),
Muxer::Mplex => Either::Right(mplex::MplexConfig::default()),
};
let timeout = Duration::from_secs(5);
match secure_channel_param {
SecProtocol::Noise => builder
.authenticate(noise::NoiseAuthenticated::xx(local_key).unwrap())
.multiplex(mux_upgrade)
.timeout(timeout)
.boxed(),
SecProtocol::Tls => builder
.authenticate(libp2p::tls::Config::new(local_key).unwrap())
.multiplex(mux_upgrade)
.timeout(timeout)
.boxed(),
}
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let local_key = identity::Keypair::generate_ed25519(); let local_key = identity::Keypair::generate_ed25519();
@ -120,42 +44,30 @@ async fn main() -> Result<()> {
// Build the transport from the passed ENV var. // Build the transport from the passed ENV var.
let (boxed_transport, local_addr) = match transport_param { let (boxed_transport, local_addr) = match transport_param {
Transport::QuicV1 => { Transport::QuicV1 => (
let builder = quic::tokio::Transport::new(quic::Config::new(&local_key))
libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(&local_key)) .map(|(p, c), _| (p, StreamMuxerBox::new(c)))
.map(|(p, c), _| (p, StreamMuxerBox::new(c))); .boxed(),
(builder.boxed(), format!("/ip4/{ip}/udp/0/quic-v1")) format!("/ip4/{ip}/udp/0/quic-v1"),
} ),
Transport::Tcp => { Transport::Tcp => (
let builder = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new()) tcp::tokio::Transport::new(tcp::Config::new())
.upgrade(libp2p::core::upgrade::Version::V1Lazy); .upgrade(Version::V1Lazy)
.authenticate(secure_channel_protocol_from_env(&local_key)?)
let secure_channel_param: SecProtocol = .multiplex(muxer_protocol_from_env()?)
from_env("security").context("unsupported secure channel")?; .timeout(Duration::from_secs(5))
.boxed(),
let muxer_param: Muxer = from_env("muxer").context("unsupported multiplexer")?;
(
build_builder(builder, secure_channel_param, muxer_param, &local_key),
format!("/ip4/{ip}/tcp/0"), format!("/ip4/{ip}/tcp/0"),
) ),
} Transport::Ws => (
Transport::Ws => { WsConfig::new(tcp::tokio::Transport::new(tcp::Config::new()))
let builder = WsConfig::new(libp2p::tcp::tokio::Transport::new( .upgrade(Version::V1Lazy)
libp2p::tcp::Config::new(), .authenticate(secure_channel_protocol_from_env(&local_key)?)
)) .multiplex(muxer_protocol_from_env()?)
.upgrade(libp2p::core::upgrade::Version::V1Lazy); .timeout(Duration::from_secs(5))
.boxed(),
let secure_channel_param: SecProtocol =
from_env("security").context("unsupported secure channel")?;
let muxer_param: Muxer = from_env("muxer").context("unsupported multiplexer")?;
(
build_builder(builder, secure_channel_param, muxer_param, &local_key),
format!("/ip4/{ip}/tcp/0/ws"), format!("/ip4/{ip}/tcp/0/ws"),
) ),
}
Transport::Webrtc => ( Transport::Webrtc => (
webrtc::tokio::Transport::new( webrtc::tokio::Transport::new(
local_key, local_key,
@ -246,3 +158,93 @@ async fn main() -> Result<()> {
Ok(()) Ok(())
} }
fn secure_channel_protocol_from_env<C: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
identity: &identity::Keypair,
) -> Result<
MapOutboundUpgrade<
MapInboundUpgrade<
Either<noise::NoiseAuthenticated<XX, X25519Spec, ()>, tls::Config>,
MapSecOutputFn<C>,
>,
MapSecOutputFn<C>,
>,
> {
let either_sec_upgrade = match from_env("security").context("unsupported secure channel")? {
SecProtocol::Noise => Either::Left(
noise::NoiseAuthenticated::xx(identity).context("failed to intialise noise")?,
),
SecProtocol::Tls => {
Either::Right(tls::Config::new(identity).context("failed to initialise tls")?)
}
};
Ok(either_sec_upgrade
.map_inbound(factor_peer_id as MapSecOutputFn<C>)
.map_outbound(factor_peer_id as MapSecOutputFn<C>))
}
type SecOutput<C> = future::Either<(PeerId, NoiseOutput<C>), (PeerId, TlsStream<C>)>;
type MapSecOutputFn<C> = fn(SecOutput<C>) -> (PeerId, future::Either<NoiseOutput<C>, TlsStream<C>>);
fn factor_peer_id<C>(
output: SecOutput<C>,
) -> (PeerId, future::Either<NoiseOutput<C>, TlsStream<C>>) {
match output {
future::Either::Left((peer, stream)) => (peer, future::Either::Left(stream)),
future::Either::Right((peer, stream)) => (peer, future::Either::Right(stream)),
}
}
fn muxer_protocol_from_env() -> Result<Either<yamux::YamuxConfig, mplex::MplexConfig>> {
Ok(
match from_env("muxer").context("unsupported multiplexer")? {
Muxer::Yamux => Either::Left(yamux::YamuxConfig::default()),
Muxer::Mplex => Either::Right(mplex::MplexConfig::new()),
},
)
}
/// Supported transports by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Transport {
Tcp,
QuicV1,
Webrtc,
Ws,
}
/// Supported stream multiplexers by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Muxer {
Mplex,
Yamux,
}
/// Supported security protocols by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum SecProtocol {
Noise,
Tls,
}
#[derive(NetworkBehaviour)]
struct Behaviour {
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
}
/// Helper function to get a ENV variable into an test parameter like `Transport`.
pub fn from_env<T>(env_var: &str) -> Result<T>
where
T: FromStr,
T::Err: std::error::Error + Send + Sync + 'static,
{
env::var(env_var)
.with_context(|| format!("{env_var} environment variable is not set"))?
.parse()
.map_err(Into::into)
}