diff --git a/CHANGELOG.md b/CHANGELOG.md index f6b32c35..77da18ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,10 +32,11 @@ - [`libp2p-pnet` CHANGELOG](transports/pnet/CHANGELOG.md) - [`libp2p-quic` CHANGELOG](transports/quic/CHANGELOG.md) - [`libp2p-tcp` CHANGELOG](transports/tcp/CHANGELOG.md) +- [`libp2p-tls` CHANGELOG](transports/tls/CHANGELOG.md) - [`libp2p-uds` CHANGELOG](transports/uds/CHANGELOG.md) - [`libp2p-wasm-ext` CHANGELOG](transports/wasm-ext/CHANGELOG.md) - [`libp2p-websocket` CHANGELOG](transports/websocket/CHANGELOG.md) -- [`libp2p-tls` CHANGELOG](transports/tls/CHANGELOG.md) +- [`libp2p-websocket-websys` CHANGELOG](transports/websocket-websys/CHANGELOG.md) ## Multiplexers diff --git a/Cargo.lock b/Cargo.lock index 22f62f8f..27023d16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2359,7 +2359,7 @@ checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libp2p" -version = "0.52.3" +version = "0.52.4" dependencies = [ "async-std", "async-trait", @@ -2401,6 +2401,7 @@ dependencies = [ "libp2p-upnp", "libp2p-wasm-ext", "libp2p-websocket", + "libp2p-websocket-websys", "libp2p-webtransport-websys", "libp2p-yamux", "multiaddr", @@ -3281,6 +3282,25 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "libp2p-websocket-websys" +version = "0.2.0" +dependencies = [ + "bytes", + "futures", + "js-sys", + "libp2p-core", + "libp2p-identity", + "libp2p-noise", + "libp2p-yamux", + "log", + "parking_lot", + "send_wrapper 0.6.0", + "thiserror", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "libp2p-webtransport-websys" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 55129a8e..a0871d4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ members = [ "transports/webrtc-websys", "transports/websocket", "transports/webtransport-websys", + "transports/websocket-websys", "wasm-tests/webtransport-tests", ] resolver = "2" @@ -71,7 +72,7 @@ rust-version = "1.65.0" [workspace.dependencies] futures-bounded = { version = "0.1.0", path = "misc/futures-bounded" } -libp2p = { version = "0.52.3", path = "libp2p" } +libp2p = { version = "0.52.4", path = "libp2p" } libp2p-allow-block-list = { version = "0.2.0", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.11.0", path = "protocols/autonat" } libp2p-connection-limits = { version = "0.2.1", path = "misc/connection-limits" } @@ -111,6 +112,7 @@ libp2p-webrtc = { version = "0.6.1-alpha", path = "transports/webrtc" } libp2p-webrtc-utils = { version = "0.1.0", path = "misc/webrtc-utils" } libp2p-webrtc-websys = { version = "0.1.0-alpha", path = "transports/webrtc-websys" } libp2p-websocket = { version = "0.42.1", path = "transports/websocket" } +libp2p-websocket-websys = { version = "0.2.0", path = "transports/websocket-websys" } libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" } libp2p-yamux = { version = "0.44.1", path = "muxers/yamux" } multistream-select = { version = "0.13.0", path = "misc/multistream-select" } diff --git a/interop-tests/Cargo.toml b/interop-tests/Cargo.toml index ac046da6..59ff1f37 100644 --- a/interop-tests/Cargo.toml +++ b/interop-tests/Cargo.toml @@ -35,7 +35,8 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } [target.'cfg(target_arch = "wasm32")'.dependencies] -libp2p = { path = "../libp2p", features = [ "ping", "macros", "webtransport-websys", "wasm-bindgen", "identify"] } +libp2p = { path = "../libp2p", features = [ "ping", "macros", "webtransport-websys", "wasm-bindgen", "identify", "websocket-websys", "yamux", "noise"] } +libp2p-mplex = { path = "../muxers/mplex" } libp2p-webrtc-websys = { workspace = true } wasm-bindgen = { version = "0.2" } wasm-bindgen-futures = { version = "0.4" } diff --git a/interop-tests/chromium-ping-version.json b/interop-tests/chromium-ping-version.json index ae5c6e10..6ee0a075 100644 --- a/interop-tests/chromium-ping-version.json +++ b/interop-tests/chromium-ping-version.json @@ -3,8 +3,9 @@ "containerImageID": "chromium-rust-libp2p-head", "transports": [ { "name": "webtransport", "onlyDial": true }, - { "name": "webrtc-direct", "onlyDial": true } + { "name": "webrtc-direct", "onlyDial": true }, + { "name": "ws", "onlyDial": true } ], - "secureChannels": [], - "muxers": [] + "secureChannels": ["noise"], + "muxers": ["mplex", "yamux"] } diff --git a/interop-tests/src/arch.rs b/interop-tests/src/arch.rs index b30c3ad1..c1adb119 100644 --- a/interop-tests/src/arch.rs +++ b/interop-tests/src/arch.rs @@ -17,7 +17,6 @@ pub(crate) mod native { use std::time::Duration; use anyhow::{bail, Context, Result}; - use either::Either; use env_logger::{Env, Target}; use futures::future::BoxFuture; use futures::FutureExt; @@ -31,7 +30,7 @@ pub(crate) mod native { use libp2p_webrtc as webrtc; use redis::AsyncCommands; - use crate::{from_env, Muxer, SecProtocol, Transport}; + use crate::{Muxer, SecProtocol, Transport}; use super::BoxedTransport; @@ -47,66 +46,103 @@ pub(crate) mod native { tokio::time::sleep(duration).boxed() } - fn muxer_protocol_from_env() -> Result> { - Ok(match from_env("muxer")? { - Muxer::Yamux => Either::Left(yamux::Config::default()), - Muxer::Mplex => Either::Right(mplex::MplexConfig::new()), - }) - } - pub(crate) fn build_transport( local_key: Keypair, ip: &str, transport: Transport, + sec_protocol: Option, + muxer: Option, ) -> Result<(BoxedTransport, String)> { - let (transport, addr) = match (transport, from_env::("security")) { - (Transport::QuicV1, _) => ( + let (transport, addr) = match (transport, sec_protocol, muxer) { + (Transport::QuicV1, _, _) => ( quic::tokio::Transport::new(quic::Config::new(&local_key)) .map(|(p, c), _| (p, StreamMuxerBox::new(c))) .boxed(), format!("/ip4/{ip}/udp/0/quic-v1"), ), - (Transport::Tcp, Ok(SecProtocol::Tls)) => ( + (Transport::Tcp, Some(SecProtocol::Tls), Some(Muxer::Mplex)) => ( tcp::tokio::Transport::new(tcp::Config::new()) .upgrade(Version::V1Lazy) .authenticate(tls::Config::new(&local_key).context("failed to initialise tls")?) - .multiplex(muxer_protocol_from_env()?) + .multiplex(mplex::MplexConfig::new()) .timeout(Duration::from_secs(5)) .boxed(), format!("/ip4/{ip}/tcp/0"), ), - (Transport::Tcp, Ok(SecProtocol::Noise)) => ( + (Transport::Tcp, Some(SecProtocol::Tls), Some(Muxer::Yamux)) => ( + tcp::tokio::Transport::new(tcp::Config::new()) + .upgrade(Version::V1Lazy) + .authenticate(tls::Config::new(&local_key).context("failed to initialise tls")?) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0"), + ), + (Transport::Tcp, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => ( tcp::tokio::Transport::new(tcp::Config::new()) .upgrade(Version::V1Lazy) .authenticate( noise::Config::new(&local_key).context("failed to intialise noise")?, ) - .multiplex(muxer_protocol_from_env()?) + .multiplex(mplex::MplexConfig::new()) .timeout(Duration::from_secs(5)) .boxed(), format!("/ip4/{ip}/tcp/0"), ), - (Transport::Ws, Ok(SecProtocol::Tls)) => ( + (Transport::Tcp, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => ( + tcp::tokio::Transport::new(tcp::Config::new()) + .upgrade(Version::V1Lazy) + .authenticate( + noise::Config::new(&local_key).context("failed to intialise noise")?, + ) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0"), + ), + (Transport::Ws, Some(SecProtocol::Tls), Some(Muxer::Mplex)) => ( WsConfig::new(tcp::tokio::Transport::new(tcp::Config::new())) .upgrade(Version::V1Lazy) .authenticate(tls::Config::new(&local_key).context("failed to initialise tls")?) - .multiplex(muxer_protocol_from_env()?) + .multiplex(mplex::MplexConfig::new()) .timeout(Duration::from_secs(5)) .boxed(), format!("/ip4/{ip}/tcp/0/ws"), ), - (Transport::Ws, Ok(SecProtocol::Noise)) => ( + (Transport::Ws, Some(SecProtocol::Tls), Some(Muxer::Yamux)) => ( + WsConfig::new(tcp::tokio::Transport::new(tcp::Config::new())) + .upgrade(Version::V1Lazy) + .authenticate( + tls::Config::new(&local_key).context("failed to intialise noise")?, + ) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0/ws"), + ), + (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => ( + WsConfig::new(tcp::tokio::Transport::new(tcp::Config::new())) + .upgrade(Version::V1Lazy) + .authenticate( + noise::Config::new(&local_key).context("failed to initialise tls")?, + ) + .multiplex(mplex::MplexConfig::new()) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0/ws"), + ), + (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => ( WsConfig::new(tcp::tokio::Transport::new(tcp::Config::new())) .upgrade(Version::V1Lazy) .authenticate( noise::Config::new(&local_key).context("failed to intialise noise")?, ) - .multiplex(muxer_protocol_from_env()?) + .multiplex(yamux::Config::default()) .timeout(Duration::from_secs(5)) .boxed(), format!("/ip4/{ip}/tcp/0/ws"), ), - (Transport::WebRtcDirect, _) => ( + (Transport::WebRtcDirect, _, _) => ( webrtc::tokio::Transport::new( local_key, webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?, @@ -115,9 +151,13 @@ pub(crate) mod native { .boxed(), format!("/ip4/{ip}/udp/0/webrtc-direct"), ), - (Transport::Tcp, Err(_)) => bail!("Missing security protocol for TCP transport"), - (Transport::Ws, Err(_)) => bail!("Missing security protocol for Websocket transport"), - (Transport::Webtransport, _) => bail!("Webtransport can only be used with wasm"), + (Transport::Webtransport, _, _) => bail!("Webtransport can only be used with wasm"), + (Transport::Tcp | Transport::Ws, None, _) => { + bail!("Missing security protocol for {transport:?}") + } + (Transport::Tcp | Transport::Ws, _, None) => { + bail!("Missing muxer protocol for {transport:?}") + } }; Ok((transport, addr)) } @@ -154,15 +194,17 @@ pub(crate) mod native { #[cfg(target_arch = "wasm32")] pub(crate) mod wasm { - use anyhow::{bail, Result}; + use anyhow::{bail, Context, Result}; use futures::future::{BoxFuture, FutureExt}; + use libp2p::core::upgrade::Version; use libp2p::identity::Keypair; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; - use libp2p::PeerId; + use libp2p::{noise, yamux, PeerId, Transport as _}; + use libp2p_mplex as mplex; use libp2p_webrtc_websys as webrtc; use std::time::Duration; - use crate::{BlpopRequest, Transport}; + use crate::{BlpopRequest, Muxer, SecProtocol, Transport}; use super::BoxedTransport; @@ -181,21 +223,56 @@ pub(crate) mod wasm { local_key: Keypair, ip: &str, transport: Transport, + sec_protocol: Option, + muxer: Option, ) -> Result<(BoxedTransport, String)> { - match transport { - Transport::Webtransport => Ok(( + Ok(match (transport, sec_protocol, muxer) { + (Transport::Webtransport, _, _) => ( libp2p::webtransport_websys::Transport::new( libp2p::webtransport_websys::Config::new(&local_key), ) .boxed(), format!("/ip4/{ip}/udp/0/quic/webtransport"), - )), - Transport::WebRtcDirect => Ok(( + ), + (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => ( + libp2p::websocket_websys::Transport::default() + .upgrade(Version::V1Lazy) + .authenticate( + noise::Config::new(&local_key).context("failed to initialise noise")?, + ) + .multiplex(mplex::MplexConfig::new()) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0/wss"), + ), + (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => ( + libp2p::websocket_websys::Transport::default() + .upgrade(Version::V1Lazy) + .authenticate( + noise::Config::new(&local_key).context("failed to initialise noise")?, + ) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(5)) + .boxed(), + format!("/ip4/{ip}/tcp/0/wss"), + ), + (Transport::Ws, None, _) => { + bail!("Missing security protocol for WS") + } + (Transport::Ws, Some(SecProtocol::Tls), _) => { + bail!("TLS not supported in WASM") + } + (Transport::Ws, _, None) => { + bail!("Missing muxer protocol for WS") + } + (Transport::WebRtcDirect, _, _) => ( webrtc::Transport::new(webrtc::Config::new(&local_key)).boxed(), format!("/ip4/{ip}/udp/0/webrtc-direct"), - )), - _ => bail!("Only webtransport and webrtc-direct are supported with wasm"), - } + ), + (Transport::QuicV1 | Transport::Tcp, _, _) => { + bail!("{transport:?} is not supported in WASM") + } + }) } pub(crate) fn swarm_builder( diff --git a/interop-tests/src/bin/config/mod.rs b/interop-tests/src/bin/config/mod.rs index 82747e82..dff297ef 100644 --- a/interop-tests/src/bin/config/mod.rs +++ b/interop-tests/src/bin/config/mod.rs @@ -5,6 +5,8 @@ use anyhow::{Context, Result}; #[derive(Debug, Clone)] pub(crate) struct Config { pub(crate) transport: String, + pub(crate) sec_protocol: Option, + pub(crate) muxer: Option, pub(crate) ip: String, pub(crate) is_dialer: bool, pub(crate) test_timeout: u64, @@ -26,8 +28,13 @@ impl Config { .map(|addr| format!("redis://{addr}")) .unwrap_or_else(|_| "redis://redis:6379".into()); + let sec_protocol = env::var("security").ok(); + let muxer = env::var("muxer").ok(); + Ok(Self { transport, + sec_protocol, + muxer, ip, is_dialer, test_timeout, diff --git a/interop-tests/src/bin/native_ping.rs b/interop-tests/src/bin/native_ping.rs index 88905803..2fb6ce12 100644 --- a/interop-tests/src/bin/native_ping.rs +++ b/interop-tests/src/bin/native_ping.rs @@ -12,6 +12,8 @@ async fn main() -> Result<()> { config.is_dialer, config.test_timeout, &config.redis_addr, + config.sec_protocol, + config.muxer, ) .await?; diff --git a/interop-tests/src/bin/wasm_ping.rs b/interop-tests/src/bin/wasm_ping.rs index b3a91819..a228b913 100644 --- a/interop-tests/src/bin/wasm_ping.rs +++ b/interop-tests/src/bin/wasm_ping.rs @@ -179,8 +179,18 @@ async fn serve_index_html(state: State) -> Result @@ -200,7 +210,9 @@ async fn serve_index_html(state: State) -> Result diff --git a/interop-tests/src/lib.rs b/interop-tests/src/lib.rs index 40c06b57..43a0b643 100644 --- a/interop-tests/src/lib.rs +++ b/interop-tests/src/lib.rs @@ -18,18 +18,35 @@ pub async fn run_test( is_dialer: bool, test_timeout_seconds: u64, redis_addr: &str, + sec_protocol: Option, + muxer: Option, ) -> Result { init_logger(); let test_timeout = Duration::from_secs(test_timeout_seconds); let transport = transport.parse().context("Couldn't parse transport")?; + let sec_protocol = sec_protocol + .map(|sec_protocol| { + sec_protocol + .parse() + .context("Couldn't parse security protocol") + }) + .transpose()?; + let muxer = muxer + .map(|sec_protocol| { + sec_protocol + .parse() + .context("Couldn't parse muxer protocol") + }) + .transpose()?; let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?; // Build the transport from the passed ENV var. - let (boxed_transport, local_addr) = build_transport(local_key.clone(), ip, transport)?; + let (boxed_transport, local_addr) = + build_transport(local_key.clone(), ip, transport, sec_protocol, muxer)?; let mut swarm = swarm_builder( boxed_transport, Behaviour { @@ -113,7 +130,8 @@ pub async fn run_test( } if listener_id == id { let ma = format!("{address}/p2p/{local_peer_id}"); - redis_client.rpush("listenerAddr", ma).await?; + redis_client.rpush("listenerAddr", ma.clone()).await?; + break; } } @@ -123,7 +141,9 @@ pub async fn run_test( futures::future::select( async move { loop { - swarm.next().await; + let event = swarm.next().await.unwrap(); + + log::debug!("{event:?}"); } } .boxed(), @@ -145,8 +165,19 @@ pub async fn run_test_wasm( is_dialer: bool, test_timeout_secs: u64, base_url: &str, + sec_protocol: Option, + muxer: Option, ) -> Result<(), JsValue> { - let result = run_test(transport, ip, is_dialer, test_timeout_secs, base_url).await; + let result = run_test( + transport, + ip, + is_dialer, + test_timeout_secs, + base_url, + sec_protocol, + muxer, + ) + .await; log::info!("Sending test result: {result:?}"); reqwest::Client::new() .post(&format!("http://{}/results", base_url)) @@ -244,14 +275,3 @@ struct Behaviour { ping: ping::Behaviour, identify: identify::Behaviour, } - -/// Helper function to get a ENV variable into an test parameter like `Transport`. -pub fn from_env(env_var: &str) -> Result -where - T: FromStr, -{ - std::env::var(env_var) - .with_context(|| format!("{env_var} environment variable is not set"))? - .parse() - .map_err(Into::into) -} diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 9ae0d1c2..f60ccc07 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -1,3 +1,11 @@ +## 0.52.4 - unreleased + +- Introduce `libp2p::websocket_websys` module behind `websocket-websys` feature flag. + This supersedes the existing `libp2p::wasm_ext` module which is now deprecated. + See [PR 3679]. + +[PR 3679]: https://github.com/libp2p/rust-libp2p/pull/3679 + ## 0.52.3 - Add `libp2p-quic` stable release. @@ -64,8 +72,8 @@ ## 0.51.3 - Deprecate the `mplex` feature. -The recommended baseline stream multiplexer is `yamux`. -See [PR 3689]. + The recommended baseline stream multiplexer is `yamux`. + See [PR 3689]. [PR 3689]: https://github.com/libp2p/rust-libp2p/pull/3689 diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 1264a050..d57653bc 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p" edition = "2021" rust-version = { workspace = true } description = "Peer-to-peer networking library" -version = "0.52.3" +version = "0.52.4" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -47,6 +47,7 @@ full = [ "wasm-bindgen", "wasm-ext", "wasm-ext-websocket", + "websocket-websys", "websocket", "webtransport-websys", "yamux", @@ -88,6 +89,7 @@ uds = ["dep:libp2p-uds"] wasm-bindgen = [ "futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js", "libp2p-swarm/wasm-bindgen", "libp2p-gossipsub?/wasm-bindgen",] wasm-ext = ["dep:libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] +websocket-websys = ["dep:libp2p-websocket-websys"] websocket = ["dep:libp2p-websocket"] webtransport-websys = ["dep:libp2p-webtransport-websys"] yamux = ["dep:libp2p-yamux"] @@ -120,9 +122,9 @@ libp2p-rendezvous = { workspace = true, optional = true } libp2p-request-response = { workspace = true, optional = true } libp2p-swarm = { workspace = true } libp2p-wasm-ext = { workspace = true, optional = true } +libp2p-websocket-websys = { workspace = true, optional = true } libp2p-webtransport-websys = { workspace = true, optional = true } libp2p-yamux = { workspace = true, optional = true } - multiaddr = { workspace = true } pin-project = "1.0.0" diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index e3934e6b..bcfdd41e 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -135,12 +135,20 @@ pub use libp2p_uds as uds; #[doc(inline)] pub use libp2p_upnp as upnp; #[cfg(feature = "wasm-ext")] -#[doc(inline)] -pub use libp2p_wasm_ext as wasm_ext; +#[deprecated( + note = "`wasm-ext` is deprecated and will be removed in favor of `libp2p-websocket-websys`." +)] +pub mod wasm_ext { + #[doc(inline)] + pub use libp2p_wasm_ext::*; +} #[cfg(feature = "websocket")] #[cfg(not(target_arch = "wasm32"))] #[doc(inline)] pub use libp2p_websocket as websocket; +#[cfg(feature = "websocket-websys")] +#[doc(inline)] +pub use libp2p_websocket_websys as websocket_websys; #[cfg(feature = "webtransport-websys")] #[cfg_attr(docsrs, doc(cfg(feature = "webtransport-websys")))] #[doc(inline)] diff --git a/transports/websocket-websys/CHANGELOG.md b/transports/websocket-websys/CHANGELOG.md new file mode 100644 index 00000000..22728548 --- /dev/null +++ b/transports/websocket-websys/CHANGELOG.md @@ -0,0 +1,7 @@ +# 0.2.0 - unreleased + +- Add Websys Websocket transport. + +# 0.1.0 + +- Crate claimed. diff --git a/transports/websocket-websys/Cargo.toml b/transports/websocket-websys/Cargo.toml new file mode 100644 index 00000000..985896ad --- /dev/null +++ b/transports/websocket-websys/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "libp2p-websocket-websys" +edition = "2021" +rust-version = "1.60.0" +description = "WebSocket for libp2p under WASM environment" +version = "0.2.0" +authors = ["Vince Vasta "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +bytes = "1.4.0" +futures = "0.3.26" +js-sys = "0.3.61" +libp2p-core = { workspace = true } +log = "0.4.19" +parking_lot = "0.12.1" +send_wrapper = "0.6.0" +thiserror = "1.0.38" +wasm-bindgen = "0.2.84" +web-sys = { version = "0.3.61", features = ["BinaryType", "CloseEvent", "MessageEvent", "WebSocket", "Window"] } + +# Passing arguments to the docsrs builder in order to properly document cfg's. +# More information: https://docs.rs/about/builds#cross-compiling +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] +rustc-args = ["--cfg", "docsrs"] + +[dev-dependencies] +libp2p-yamux = { workspace = true } +libp2p-noise = { workspace = true } +libp2p-identity = { workspace = true } diff --git a/transports/websocket-websys/src/lib.rs b/transports/websocket-websys/src/lib.rs new file mode 100644 index 00000000..24ca4fdc --- /dev/null +++ b/transports/websocket-websys/src/lib.rs @@ -0,0 +1,446 @@ +// Copyright (C) 2023 Vince Vasta +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +//! Libp2p websocket transports built on [web-sys](https://rustwasm.github.io/wasm-bindgen/web-sys/index.html). + +use bytes::BytesMut; +use futures::task::AtomicWaker; +use futures::{future::Ready, io, prelude::*}; +use js_sys::Array; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + transport::{ListenerId, TransportError, TransportEvent}, +}; +use send_wrapper::SendWrapper; +use std::cmp::min; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; +use std::{pin::Pin, task::Context, task::Poll}; +use wasm_bindgen::{prelude::*, JsCast}; +use web_sys::{window, CloseEvent, Event, MessageEvent, WebSocket}; + +/// A Websocket transport that can be used in a wasm environment. +/// +/// ## Example +/// +/// To create an authenticated transport instance with Noise protocol and Yamux: +/// +/// ``` +/// # use libp2p_core::{upgrade::Version, Transport}; +/// # use libp2p_identity::Keypair; +/// # use libp2p_yamux as yamux; +/// # use libp2p_noise as noise; +/// let local_key = Keypair::generate_ed25519(); +/// let transport = libp2p_websocket_websys::Transport::default() +/// .upgrade(Version::V1) +/// .authenticate(noise::Config::new(&local_key).unwrap()) +/// .multiplex(yamux::Config::default()) +/// .boxed(); +/// ``` +/// +#[derive(Default)] +pub struct Transport { + _private: (), +} + +/// Arbitrary, maximum amount we are willing to buffer before we throttle our user. +const MAX_BUFFER: usize = 1024 * 1024; + +impl libp2p_core::Transport for Transport { + type Output = Connection; + type Error = Error; + type ListenerUpgrade = Ready>; + type Dial = Pin> + Send>>; + + fn listen_on( + &mut self, + _: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + + fn remove_listener(&mut self, _id: ListenerId) -> bool { + false + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let url = extract_websocket_url(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr))?; + + Ok(async move { + let socket = match WebSocket::new(&url) { + Ok(ws) => ws, + Err(_) => return Err(Error::invalid_websocket_url(&url)), + }; + + Ok(Connection::new(socket)) + } + .boxed()) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> std::task::Poll> { + Poll::Pending + } + + fn address_translation(&self, _listen: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } +} + +// Try to convert Multiaddr to a Websocket url. +fn extract_websocket_url(addr: &Multiaddr) -> Option { + let mut protocols = addr.iter(); + let host_port = match (protocols.next(), protocols.next()) { + (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) => { + format!("{ip}:{port}") + } + (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) => { + format!("[{ip}]:{port}") + } + (Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) + | (Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) + | (Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) + | (Some(Protocol::Dnsaddr(h)), Some(Protocol::Tcp(port))) => { + format!("{}:{}", &h, port) + } + _ => return None, + }; + + let (scheme, wspath) = match protocols.next() { + Some(Protocol::Ws(path)) => ("ws", path.into_owned()), + Some(Protocol::Wss(path)) => ("wss", path.into_owned()), + _ => return None, + }; + + Some(format!("{scheme}://{host_port}{wspath}")) +} + +#[derive(thiserror::Error, Debug)] +#[error("{msg}")] +pub struct Error { + msg: String, +} + +impl Error { + fn invalid_websocket_url(url: &str) -> Self { + Self { + msg: format!("Invalid websocket url: {url}"), + } + } +} + +/// A Websocket connection created by the [`Transport`]. +pub struct Connection { + inner: SendWrapper, +} + +struct Inner { + socket: WebSocket, + + new_data_waker: Rc, + read_buffer: Rc>, + + /// Waker for when we are waiting for the WebSocket to be opened. + open_waker: Rc, + + /// Waker for when we are waiting to write (again) to the WebSocket because we previously exceeded the [`MAX_BUFFER`] threshold. + write_waker: Rc, + + /// Waker for when we are waiting for the WebSocket to be closed. + close_waker: Rc, + + /// Whether the connection errored. + errored: Rc, + + // Store the closures for proper garbage collection. + // These are wrapped in an [`Rc`] so we can implement [`Clone`]. + _on_open_closure: Rc>, + _on_buffered_amount_low_closure: Rc>, + _on_close_closure: Rc>, + _on_error_closure: Rc>, + _on_message_closure: Rc>, + buffered_amount_low_interval: i32, +} + +impl Inner { + fn ready_state(&self) -> ReadyState { + match self.socket.ready_state() { + 0 => ReadyState::Connecting, + 1 => ReadyState::Open, + 2 => ReadyState::Closing, + 3 => ReadyState::Closed, + unknown => unreachable!("invalid `ReadyState` value: {unknown}"), + } + } + + fn poll_open(&mut self, cx: &Context<'_>) -> Poll> { + match self.ready_state() { + ReadyState::Connecting => { + self.open_waker.register(cx.waker()); + Poll::Pending + } + ReadyState::Open => Poll::Ready(Ok(())), + ReadyState::Closed | ReadyState::Closing => { + Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) + } + } + } + + fn error_barrier(&self) -> io::Result<()> { + if self.errored.load(Ordering::SeqCst) { + return Err(io::ErrorKind::BrokenPipe.into()); + } + + Ok(()) + } +} + +/// The state of the WebSocket. +/// +/// See . +#[derive(PartialEq)] +enum ReadyState { + Connecting, + Open, + Closing, + Closed, +} + +impl Connection { + fn new(socket: WebSocket) -> Self { + socket.set_binary_type(web_sys::BinaryType::Arraybuffer); + + let open_waker = Rc::new(AtomicWaker::new()); + let onopen_closure = Closure::::new({ + let open_waker = open_waker.clone(); + move |_| { + open_waker.wake(); + } + }); + socket.set_onopen(Some(onopen_closure.as_ref().unchecked_ref())); + + let close_waker = Rc::new(AtomicWaker::new()); + let onclose_closure = Closure::::new({ + let close_waker = close_waker.clone(); + move |_| { + close_waker.wake(); + } + }); + socket.set_onclose(Some(onclose_closure.as_ref().unchecked_ref())); + + let errored = Rc::new(AtomicBool::new(false)); + let onerror_closure = Closure::::new({ + let errored = errored.clone(); + move |_| { + errored.store(true, Ordering::SeqCst); + } + }); + socket.set_onerror(Some(onerror_closure.as_ref().unchecked_ref())); + + let read_buffer = Rc::new(Mutex::new(BytesMut::new())); + let new_data_waker = Rc::new(AtomicWaker::new()); + let onmessage_closure = Closure::::new({ + let read_buffer = read_buffer.clone(); + let new_data_waker = new_data_waker.clone(); + let errored = errored.clone(); + move |e: MessageEvent| { + let data = js_sys::Uint8Array::new(&e.data()); + + let mut read_buffer = read_buffer.lock().unwrap(); + + if read_buffer.len() + data.length() as usize > MAX_BUFFER { + log::warn!("Remote is overloading us with messages, closing connection"); + errored.store(true, Ordering::SeqCst); + + return; + } + + read_buffer.extend_from_slice(&data.to_vec()); + new_data_waker.wake(); + } + }); + socket.set_onmessage(Some(onmessage_closure.as_ref().unchecked_ref())); + + let write_waker = Rc::new(AtomicWaker::new()); + let on_buffered_amount_low_closure = Closure::::new({ + let write_waker = write_waker.clone(); + let socket = socket.clone(); + move |_| { + if socket.buffered_amount() == 0 { + write_waker.wake(); + } + } + }); + let buffered_amount_low_interval = window() + .expect("to have a window") + .set_interval_with_callback_and_timeout_and_arguments( + on_buffered_amount_low_closure.as_ref().unchecked_ref(), + 100, // Chosen arbitrarily and likely worth tuning. Due to low impact of the /ws transport, no further effort was invested at the time. + &Array::new(), + ) + .expect("to be able to set an interval"); + + Self { + inner: SendWrapper::new(Inner { + socket, + new_data_waker, + read_buffer, + open_waker, + write_waker, + close_waker, + errored, + _on_open_closure: Rc::new(onopen_closure), + _on_buffered_amount_low_closure: Rc::new(on_buffered_amount_low_closure), + _on_close_closure: Rc::new(onclose_closure), + _on_error_closure: Rc::new(onerror_closure), + _on_message_closure: Rc::new(onmessage_closure), + buffered_amount_low_interval, + }), + } + } + + fn buffered_amount(&self) -> usize { + self.inner.socket.buffered_amount() as usize + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.get_mut(); + this.inner.error_barrier()?; + futures::ready!(this.inner.poll_open(cx))?; + + let mut read_buffer = this.inner.read_buffer.lock().unwrap(); + + if read_buffer.is_empty() { + this.inner.new_data_waker.register(cx.waker()); + return Poll::Pending; + } + + // Ensure that we: + // - at most return what the caller can read (`buf.len()`) + // - at most what we have (`read_buffer.len()`) + let split_index = min(buf.len(), read_buffer.len()); + + let bytes_to_return = read_buffer.split_to(split_index); + let len = bytes_to_return.len(); + buf[..len].copy_from_slice(&bytes_to_return); + + Poll::Ready(Ok(len)) + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + + this.inner.error_barrier()?; + futures::ready!(this.inner.poll_open(cx))?; + + debug_assert!(this.buffered_amount() <= MAX_BUFFER); + let remaining_space = MAX_BUFFER - this.buffered_amount(); + + if remaining_space == 0 { + this.inner.write_waker.register(cx.waker()); + return Poll::Pending; + } + + let bytes_to_send = min(buf.len(), remaining_space); + + if this + .inner + .socket + .send_with_u8_array(&buf[..bytes_to_send]) + .is_err() + { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } + + Poll::Ready(Ok(bytes_to_send)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.buffered_amount() == 0 { + return Poll::Ready(Ok(())); + } + + self.inner.error_barrier()?; + + self.inner.write_waker.register(cx.waker()); + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + const REGULAR_CLOSE: u16 = 1000; // See https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1. + + if self.inner.ready_state() == ReadyState::Closed { + return Poll::Ready(Ok(())); + } + + self.inner.error_barrier()?; + + if self.inner.ready_state() != ReadyState::Closing { + let _ = self + .inner + .socket + .close_with_code_and_reason(REGULAR_CLOSE, "user initiated"); + } + + self.inner.close_waker.register(cx.waker()); + Poll::Pending + } +} + +impl Drop for Connection { + fn drop(&mut self) { + const GO_AWAY_STATUS_CODE: u16 = 1001; // See https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1. + + if let ReadyState::Connecting | ReadyState::Open = self.inner.ready_state() { + let _ = self + .inner + .socket + .close_with_code_and_reason(GO_AWAY_STATUS_CODE, "connection dropped"); + } + + window() + .expect("to have a window") + .clear_interval_with_handle(self.inner.buffered_amount_low_interval) + } +}