fix(pnet): flush nonce in handshake

Previously, we did not flush the nonce within the handshake to the stream. This caused problems when `pnet` was composed with the websocket transport. Inserting a flush fixes these compatibility problems.

Resolves https://github.com/libp2p/rust-libp2p/issues/3475.

Pull-Request: #3476.
This commit is contained in:
hanabi1224 2023-02-28 12:34:54 +08:00 committed by GitHub
parent b8d95ff974
commit 52a32ffac1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 157 additions and 2 deletions

9
Cargo.lock generated
View File

@ -2549,15 +2549,22 @@ dependencies = [
[[package]]
name = "libp2p-pnet"
version = "0.22.2"
version = "0.22.3"
dependencies = [
"futures",
"libp2p-core",
"libp2p-noise",
"libp2p-swarm",
"libp2p-tcp",
"libp2p-websocket",
"libp2p-yamux",
"log",
"pin-project",
"quickcheck-ext",
"rand 0.8.5",
"salsa20",
"sha3",
"tokio",
]
[[package]]

View File

@ -1,3 +1,9 @@
# 0.22.3 - unreleased
- Fix handshake over websocket. See [PR 3476]
[PR 3476]: https://github.com/libp2p/rust-libp2p/pull/3476
# 0.22.2
- Update `rust-version` to reflect the actual MSRV: 1.60.0. See [PR 3090].

View File

@ -3,7 +3,7 @@ name = "libp2p-pnet"
edition = "2021"
rust-version = "1.60.0"
description = "Private swarm support for libp2p"
version = "0.22.2"
version = "0.22.3"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -19,7 +19,14 @@ rand = "0.8"
pin-project = "1.0.2"
[dev-dependencies]
libp2p-core = { path = "../../core", features = ["rsa", "ecdsa", "secp256k1"] }
libp2p-noise = { path = "../noise" }
libp2p-swarm = { path = "../../swarm", features = ["tokio"] }
libp2p-tcp = { path = "../tcp", features = ["tokio"] }
libp2p-websocket = { path = "../websocket" }
libp2p-yamux = { path = "../../muxers/yamux" }
quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" }
tokio = { version = "1.21.1", features = ["full"] }
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling

View File

@ -217,6 +217,7 @@ impl PnetConfig {
.write_all(&local_nonce)
.await
.map_err(PnetError::HandshakeError)?;
socket.flush().await?;
socket
.read_exact(&mut remote_nonce)
.await

View File

@ -0,0 +1,134 @@
use std::time::Duration;
use futures::{future, AsyncRead, AsyncWrite, StreamExt};
use libp2p_core::transport::MemoryTransport;
use libp2p_core::upgrade::Version;
use libp2p_core::Transport;
use libp2p_core::{multiaddr::Protocol, Multiaddr};
use libp2p_pnet::{PnetConfig, PreSharedKey};
use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent};
const TIMEOUT: Duration = Duration::from_secs(5);
#[tokio::test]
async fn can_establish_connection_memory() {
can_establish_connection_inner_with_timeout(
MemoryTransport::default,
Protocol::Memory(0).into(),
)
.await
}
#[tokio::test]
async fn can_establish_connection_tcp() {
can_establish_connection_inner_with_timeout(
libp2p_tcp::tokio::Transport::default,
"/ip4/127.0.0.1/tcp/0".parse().unwrap(),
)
.await
}
#[tokio::test]
async fn can_establish_connection_websocket() {
can_establish_connection_inner_with_timeout(
|| libp2p_websocket::WsConfig::new(libp2p_tcp::tokio::Transport::default()),
"/ip4/127.0.0.1/tcp/0/ws".parse().unwrap(),
)
.await
}
async fn can_establish_connection_inner_with_timeout<F, T>(
build_transport: F,
listen_addr: Multiaddr,
) where
F: Fn() -> T,
T: Transport + Send + Unpin + 'static,
<T as libp2p_core::Transport>::Error: Send + Sync + 'static,
<T as libp2p_core::Transport>::Output: AsyncRead + AsyncWrite + Send + Unpin,
<T as libp2p_core::Transport>::ListenerUpgrade: Send,
<T as libp2p_core::Transport>::Dial: Send,
{
let task = can_establish_connection_inner(build_transport, listen_addr);
tokio::time::timeout(TIMEOUT, task).await.unwrap();
}
async fn can_establish_connection_inner<F, T>(build_transport: F, listen_addr: Multiaddr)
where
F: Fn() -> T,
T: Transport + Send + Unpin + 'static,
<T as libp2p_core::Transport>::Error: Send + Sync + 'static,
<T as libp2p_core::Transport>::Output: AsyncRead + AsyncWrite + Send + Unpin,
<T as libp2p_core::Transport>::ListenerUpgrade: Send,
<T as libp2p_core::Transport>::Dial: Send,
{
let pnet = PnetConfig::new(PreSharedKey::new([0; 32]));
let mut swarm1 = make_swarm(build_transport(), pnet);
let mut swarm2 = make_swarm(build_transport(), pnet);
let listen_address = listen_on(&mut swarm1, listen_addr).await;
swarm2.dial(listen_address).unwrap();
let await_inbound_connection = async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::IncomingConnectionError { error, .. } => {
panic!("Incoming connection failed: {error}")
}
_ => continue,
};
}
};
let await_outbound_connection = async {
loop {
match swarm2.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::OutgoingConnectionError { error, .. } => {
panic!("Failed to dial: {error}")
}
_ => continue,
};
}
};
let (inbound_peer_id, outbound_peer_id) =
future::join(await_inbound_connection, await_outbound_connection).await;
assert_eq!(&inbound_peer_id, swarm2.local_peer_id());
assert_eq!(&outbound_peer_id, swarm1.local_peer_id());
}
fn make_swarm<T>(transport: T, pnet: PnetConfig) -> Swarm<keep_alive::Behaviour>
where
T: Transport + Send + Unpin + 'static,
<T as libp2p_core::Transport>::Error: Send + Sync + 'static,
<T as libp2p_core::Transport>::Output: AsyncRead + AsyncWrite + Send + Unpin,
<T as libp2p_core::Transport>::ListenerUpgrade: Send,
<T as libp2p_core::Transport>::Dial: Send,
{
let identity = libp2p_core::identity::Keypair::generate_ed25519();
let transport = transport
.and_then(move |socket, _| pnet.handshake(socket))
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
Swarm::with_tokio_executor(
transport,
keep_alive::Behaviour,
identity.public().to_peer_id(),
)
}
async fn listen_on<B: NetworkBehaviour>(swarm: &mut Swarm<B>, addr: Multiaddr) -> Multiaddr {
let expected_listener_id = swarm.listen_on(addr).unwrap();
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == expected_listener_id => break address,
_ => continue,
};
}
}