deps(yamux): update yamux to v0.12

Pull-Request: #3013.
This commit is contained in:
Thomas Eizinger
2023-08-02 17:02:54 +02:00
committed by GitHub
parent 37d57f41de
commit cb8a92026a
8 changed files with 56 additions and 66 deletions

7
Cargo.lock generated
View File

@ -3354,7 +3354,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-yamux" name = "libp2p-yamux"
version = "0.44.0" version = "0.44.1"
dependencies = [ dependencies = [
"async-std", "async-std",
"futures", "futures",
@ -6729,14 +6729,15 @@ dependencies = [
[[package]] [[package]]
name = "yamux" name = "yamux"
version = "0.10.2" version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d9ba232399af1783a58d8eb26f6b5006fbefe2dc9ef36bd283324792d03ea5" checksum = "0329ef377816896f014435162bb3711ea7a07729c23d0960e6f8048b21b8fe91"
dependencies = [ dependencies = [
"futures", "futures",
"log", "log",
"nohash-hasher", "nohash-hasher",
"parking_lot", "parking_lot",
"pin-project",
"rand 0.8.5", "rand 0.8.5",
"static_assertions", "static_assertions",
] ]

View File

@ -97,7 +97,7 @@ libp2p-wasm-ext = { version = "0.40.0", path = "transports/wasm-ext" }
libp2p-webrtc = { version = "0.6.0-alpha", path = "transports/webrtc" } libp2p-webrtc = { version = "0.6.0-alpha", path = "transports/webrtc" }
libp2p-websocket = { version = "0.42.0", path = "transports/websocket" } libp2p-websocket = { version = "0.42.0", path = "transports/websocket" }
libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" } libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" }
libp2p-yamux = { version = "0.44.0", path = "muxers/yamux" } libp2p-yamux = { version = "0.44.1", path = "muxers/yamux" }
multistream-select = { version = "0.13.0", path = "misc/multistream-select" } multistream-select = { version = "0.13.0", path = "misc/multistream-select" }
quick-protobuf-codec = { version = "0.2.0", path = "misc/quick-protobuf-codec" } quick-protobuf-codec = { version = "0.2.0", path = "misc/quick-protobuf-codec" }
quickcheck = { package = "quickcheck-ext", path = "misc/quickcheck-ext" } quickcheck = { package = "quickcheck-ext", path = "misc/quickcheck-ext" }

View File

@ -19,7 +19,7 @@ rand = "0.8.5"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
axum = "0.6" axum = "0.6"
libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns"] } libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns", "identify"] }
libp2p-quic = { workspace = true, features = ["tokio"] } libp2p-quic = { workspace = true, features = ["tokio"] }
libp2p-webrtc = { workspace = true, features = ["tokio"] } libp2p-webrtc = { workspace = true, features = ["tokio"] }
libp2p-mplex = { path = "../muxers/mplex" } libp2p-mplex = { path = "../muxers/mplex" }
@ -34,7 +34,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen"] } libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen", "identify"] }
wasm-bindgen = { version = "0.2" } wasm-bindgen = { version = "0.2" }
wasm-bindgen-futures = { version = "0.4" } wasm-bindgen-futures = { version = "0.4" }
wasm-logger = { version = "0.2.0" } wasm-logger = { version = "0.2.0" }

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent};
use libp2p::{identity, ping, Multiaddr, PeerId}; use libp2p::{identify, identity, ping, Multiaddr, PeerId};
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
@ -29,12 +29,17 @@ pub async fn run_test(
let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?; let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?;
// Build the transport from the passed ENV var. // Build the transport from the passed ENV var.
let (boxed_transport, local_addr) = build_transport(local_key, ip, transport)?; let (boxed_transport, local_addr) = build_transport(local_key.clone(), ip, transport)?;
let mut swarm = swarm_builder( let mut swarm = swarm_builder(
boxed_transport, boxed_transport,
Behaviour { Behaviour {
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour, keep_alive: keep_alive::Behaviour,
// Need to include identify until https://github.com/status-im/nim-libp2p/issues/924 is resolved.
identify: identify::Behaviour::new(identify::Config::new(
"/interop-tests".to_owned(),
local_key.public(),
)),
}, },
local_peer_id, local_peer_id,
) )
@ -237,6 +242,7 @@ impl FromStr for SecProtocol {
struct Behaviour { struct Behaviour {
ping: ping::Behaviour, ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour, keep_alive: keep_alive::Behaviour,
identify: identify::Behaviour,
} }
/// Helper function to get a ENV variable into an test parameter like `Transport`. /// Helper function to get a ENV variable into an test parameter like `Transport`.

View File

@ -1,3 +1,11 @@
## 0.44.1 - unreleased
- Update to `yamux` `v0.12` which brings performance improvements and introduces an ACK backlog of 256 inbound streams.
When interacting with other libp2p nodes that are also running this or a newer version, the creation of inbound streams will be backpressured once the ACK backlog is hit.
See [PR 3013].
[PR 3013]: https://github.com/libp2p/rust-libp2p/pull/3013
## 0.44.0 ## 0.44.0
- Raise MSRV to 1.65. - Raise MSRV to 1.65.

View File

@ -3,7 +3,7 @@ name = "libp2p-yamux"
edition = "2021" edition = "2021"
rust-version = { workspace = true } rust-version = { workspace = true }
description = "Yamux multiplexing protocol for libp2p" description = "Yamux multiplexing protocol for libp2p"
version = "0.44.0" version = "0.44.1"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"]
futures = "0.3.28" futures = "0.3.28"
libp2p-core = { workspace = true } libp2p-core = { workspace = true }
thiserror = "1.0" thiserror = "1.0"
yamux = "0.10.0" yamux = "0.12"
log = "0.4" log = "0.4"
[dev-dependencies] [dev-dependencies]

View File

@ -22,14 +22,14 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use futures::{future, prelude::*, ready, stream::BoxStream}; use futures::{future, prelude::*, ready};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{IoSlice, IoSliceMut}; use std::io::{IoSlice, IoSliceMut};
use std::task::Waker; use std::task::Waker;
use std::{ use std::{
fmt, io, iter, mem, io, iter,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -37,14 +37,12 @@ use thiserror::Error;
use yamux::ConnectionError; use yamux::ConnectionError;
/// A Yamux connection. /// A Yamux connection.
#[derive(Debug)]
pub struct Muxer<C> { pub struct Muxer<C> {
/// The [`futures::stream::Stream`] of incoming substreams. connection: yamux::Connection<C>,
incoming: BoxStream<'static, Result<yamux::Stream, yamux::ConnectionError>>,
/// Handle to control the connection.
control: yamux::Control,
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote.
/// ///
/// The only way how yamux can make progress is by driving the stream. However, the /// The only way how yamux can make progress is by calling [`yamux::Connection::poll_next_inbound`]. However, the
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
@ -54,17 +52,13 @@ pub struct Muxer<C> {
inbound_stream_buffer: VecDeque<Stream>, inbound_stream_buffer: VecDeque<Stream>,
/// Waker to be called when new inbound streams are available. /// Waker to be called when new inbound streams are available.
inbound_stream_waker: Option<Waker>, inbound_stream_waker: Option<Waker>,
_phantom: std::marker::PhantomData<C>,
} }
const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; /// How many streams to buffer before we start resetting them.
///
impl<S> fmt::Debug for Muxer<S> { /// This is equal to the ACK BACKLOG in `rust-yamux`.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { /// Thus, for peers running on a recent version of `rust-libp2p`, we should never need to reset streams because they'll voluntarily stop opening them once they hit the ACK backlog.
f.write_str("Yamux") const MAX_BUFFERED_INBOUND_STREAMS: usize = 256;
}
}
impl<C> Muxer<C> impl<C> Muxer<C>
where where
@ -72,22 +66,17 @@ where
{ {
/// Create a new Yamux connection. /// Create a new Yamux connection.
fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
let conn = yamux::Connection::new(io, cfg, mode); Muxer {
let ctrl = conn.control(); connection: yamux::Connection::new(io, cfg, mode),
Self {
incoming: yamux::into_stream(conn).err_into().boxed(),
control: ctrl,
inbound_stream_buffer: VecDeque::default(), inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None, inbound_stream_waker: None,
_phantom: Default::default(),
} }
} }
} }
impl<C> StreamMuxer for Muxer<C> impl<C> StreamMuxer for Muxer<C>
where where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static, C: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
type Substream = Stream; type Substream = Stream;
type Error = Error; type Error = Error;
@ -112,10 +101,15 @@ where
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> { ) -> Poll<Result<Self::Substream, Self::Error>> {
Pin::new(&mut self.control) let stream = ready!(self.connection.poll_new_outbound(cx).map_err(Error)?);
.poll_open_stream(cx)
.map_ok(Stream) Poll::Ready(Ok(Stream(stream)))
.map_err(Error) }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.connection.poll_close(cx).map_err(Error)?);
Poll::Ready(Ok(()))
} }
fn poll( fn poll(
@ -141,23 +135,6 @@ where
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
Poll::Pending Poll::Pending
} }
fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<Result<(), Error>> {
if let Poll::Ready(()) = Pin::new(&mut self.control).poll_close(c).map_err(Error)? {
return Poll::Ready(Ok(()));
}
while let Poll::Ready(maybe_inbound_stream) =
self.incoming.poll_next_unpin(c).map_err(Error)?
{
match maybe_inbound_stream {
Some(inbound_stream) => mem::drop(inbound_stream),
None => return Poll::Ready(Ok(())),
}
}
Poll::Pending
}
} }
/// A stream produced by the yamux multiplexer. /// A stream produced by the yamux multiplexer.
@ -210,18 +187,16 @@ impl AsyncWrite for Stream {
impl<C> Muxer<C> impl<C> Muxer<C>
where where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static, C: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream, Error>> { fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream, Error>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| { let stream = ready!(self.connection.poll_next_inbound(cx))
let stream = maybe_stream .transpose()
.transpose() .map_err(Error)?
.map_err(Error)? .map(Stream)
.map(Stream) .ok_or(Error(ConnectionError::Closed))?;
.ok_or(Error(ConnectionError::Closed))?;
Ok(stream) Poll::Ready(Ok(stream))
})
} }
} }

View File

@ -59,11 +59,11 @@ async fn two_servers_add_each_other_to_routing_table() {
match libp2p_swarm_test::drive(&mut server2, &mut server1).await { match libp2p_swarm_test::drive(&mut server2, &mut server1).await {
( (
[Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)], [Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)],
[Identify(_), Identify(_)], [Identify(_), Identify(_)],
) )
| ( | (
[Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })], [Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })],
[Identify(_), Identify(_)], [Identify(_), Identify(_)],
) => { ) => {
assert_eq!(peer2, server1_peer_id); assert_eq!(peer2, server1_peer_id);