diff --git a/Cargo.lock b/Cargo.lock index e8f83408..8707f0a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2549,15 +2549,22 @@ dependencies = [ [[package]] name = "libp2p-pnet" -version = "0.22.2" +version = "0.22.3" dependencies = [ "futures", + "libp2p-core", + "libp2p-noise", + "libp2p-swarm", + "libp2p-tcp", + "libp2p-websocket", + "libp2p-yamux", "log", "pin-project", "quickcheck-ext", "rand 0.8.5", "salsa20", "sha3", + "tokio", ] [[package]] diff --git a/transports/pnet/CHANGELOG.md b/transports/pnet/CHANGELOG.md index 1d66f1bc..fd76f412 100644 --- a/transports/pnet/CHANGELOG.md +++ b/transports/pnet/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.22.3 - unreleased + +- Fix handshake over websocket. See [PR 3476] + +[PR 3476]: https://github.com/libp2p/rust-libp2p/pull/3476 + # 0.22.2 - Update `rust-version` to reflect the actual MSRV: 1.60.0. See [PR 3090]. diff --git a/transports/pnet/Cargo.toml b/transports/pnet/Cargo.toml index 8dec21dd..7c7c70f3 100644 --- a/transports/pnet/Cargo.toml +++ b/transports/pnet/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-pnet" edition = "2021" rust-version = "1.60.0" description = "Private swarm support for libp2p" -version = "0.22.2" +version = "0.22.3" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -19,7 +19,14 @@ rand = "0.8" pin-project = "1.0.2" [dev-dependencies] +libp2p-core = { path = "../../core", features = ["rsa", "ecdsa", "secp256k1"] } +libp2p-noise = { path = "../noise" } +libp2p-swarm = { path = "../../swarm", features = ["tokio"] } +libp2p-tcp = { path = "../tcp", features = ["tokio"] } +libp2p-websocket = { path = "../websocket" } +libp2p-yamux = { path = "../../muxers/yamux" } quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } +tokio = { version = "1.21.1", features = ["full"] } # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/transports/pnet/src/lib.rs b/transports/pnet/src/lib.rs index 7b2aa073..15f42556 100644 --- a/transports/pnet/src/lib.rs +++ b/transports/pnet/src/lib.rs @@ -217,6 +217,7 @@ impl PnetConfig { .write_all(&local_nonce) .await .map_err(PnetError::HandshakeError)?; + socket.flush().await?; socket .read_exact(&mut remote_nonce) .await diff --git a/transports/pnet/tests/smoke.rs b/transports/pnet/tests/smoke.rs new file mode 100644 index 00000000..9084c4e6 --- /dev/null +++ b/transports/pnet/tests/smoke.rs @@ -0,0 +1,134 @@ +use std::time::Duration; + +use futures::{future, AsyncRead, AsyncWrite, StreamExt}; +use libp2p_core::transport::MemoryTransport; +use libp2p_core::upgrade::Version; +use libp2p_core::Transport; +use libp2p_core::{multiaddr::Protocol, Multiaddr}; +use libp2p_pnet::{PnetConfig, PreSharedKey}; +use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; + +const TIMEOUT: Duration = Duration::from_secs(5); + +#[tokio::test] +async fn can_establish_connection_memory() { + can_establish_connection_inner_with_timeout( + MemoryTransport::default, + Protocol::Memory(0).into(), + ) + .await +} + +#[tokio::test] +async fn can_establish_connection_tcp() { + can_establish_connection_inner_with_timeout( + libp2p_tcp::tokio::Transport::default, + "/ip4/127.0.0.1/tcp/0".parse().unwrap(), + ) + .await +} + +#[tokio::test] +async fn can_establish_connection_websocket() { + can_establish_connection_inner_with_timeout( + || libp2p_websocket::WsConfig::new(libp2p_tcp::tokio::Transport::default()), + "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap(), + ) + .await +} + +async fn can_establish_connection_inner_with_timeout( + build_transport: F, + listen_addr: Multiaddr, +) where + F: Fn() -> T, + T: Transport + Send + Unpin + 'static, + ::Error: Send + Sync + 'static, + ::Output: AsyncRead + AsyncWrite + Send + Unpin, + ::ListenerUpgrade: Send, + ::Dial: Send, +{ + let task = can_establish_connection_inner(build_transport, listen_addr); + tokio::time::timeout(TIMEOUT, task).await.unwrap(); +} + +async fn can_establish_connection_inner(build_transport: F, listen_addr: Multiaddr) +where + F: Fn() -> T, + T: Transport + Send + Unpin + 'static, + ::Error: Send + Sync + 'static, + ::Output: AsyncRead + AsyncWrite + Send + Unpin, + ::ListenerUpgrade: Send, + ::Dial: Send, +{ + let pnet = PnetConfig::new(PreSharedKey::new([0; 32])); + + let mut swarm1 = make_swarm(build_transport(), pnet); + let mut swarm2 = make_swarm(build_transport(), pnet); + + let listen_address = listen_on(&mut swarm1, listen_addr).await; + swarm2.dial(listen_address).unwrap(); + let await_inbound_connection = async { + loop { + match swarm1.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id, + SwarmEvent::IncomingConnectionError { error, .. } => { + panic!("Incoming connection failed: {error}") + } + _ => continue, + }; + } + }; + let await_outbound_connection = async { + loop { + match swarm2.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id, + SwarmEvent::OutgoingConnectionError { error, .. } => { + panic!("Failed to dial: {error}") + } + _ => continue, + }; + } + }; + + let (inbound_peer_id, outbound_peer_id) = + future::join(await_inbound_connection, await_outbound_connection).await; + + assert_eq!(&inbound_peer_id, swarm2.local_peer_id()); + assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); +} + +fn make_swarm(transport: T, pnet: PnetConfig) -> Swarm +where + T: Transport + Send + Unpin + 'static, + ::Error: Send + Sync + 'static, + ::Output: AsyncRead + AsyncWrite + Send + Unpin, + ::ListenerUpgrade: Send, + ::Dial: Send, +{ + let identity = libp2p_core::identity::Keypair::generate_ed25519(); + let transport = transport + .and_then(move |socket, _| pnet.handshake(socket)) + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); + Swarm::with_tokio_executor( + transport, + keep_alive::Behaviour, + identity.public().to_peer_id(), + ) +} + +async fn listen_on(swarm: &mut Swarm, addr: Multiaddr) -> Multiaddr { + let expected_listener_id = swarm.listen_on(addr).unwrap(); + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { + address, + listener_id, + } if listener_id == expected_listener_id => break address, + _ => continue, + }; + } +}