feat(yamux): future-proof public API

With this set of changes, we prepare the public API of `libp2p-yamux` to be as minimal as possible and allow for upgrades of the underlying `yamux` library in patch releases.

Related: #3013.

Pull-Request: #3908.
This commit is contained in:
Thomas Eizinger 2023-05-12 05:01:06 +02:00 committed by GitHub
parent 25958a2f8c
commit 234a0d24db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 150 deletions

View File

@ -6,8 +6,12 @@
- Remove deprecated items. - Remove deprecated items.
See [PR 3897]. See [PR 3897].
- Remove `Incoming`, `LocalIncoming` and `LocalConfig` as well as anything from the underlying `yamux` crate from the public API.
See [PR 3908].
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3897]: https://github.com/libp2p/rust-libp2p/pull/3897 [PR 3897]: https://github.com/libp2p/rust-libp2p/pull/3897
[PR 3908]: https://github.com/libp2p/rust-libp2p/pull/3908
## 0.43.1 ## 0.43.1

View File

@ -22,15 +22,11 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use futures::{ use futures::{future, prelude::*, ready, stream::BoxStream};
future,
prelude::*,
ready,
stream::{BoxStream, LocalBoxStream},
};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{IoSlice, IoSliceMut};
use std::task::Waker; use std::task::Waker;
use std::{ use std::{
fmt, io, iter, mem, fmt, io, iter, mem,
@ -41,23 +37,25 @@ use thiserror::Error;
use yamux::ConnectionError; use yamux::ConnectionError;
/// A Yamux connection. /// A Yamux connection.
pub struct Muxer<S> { pub struct Muxer<C> {
/// The [`futures::stream::Stream`] of incoming substreams. /// The [`futures::stream::Stream`] of incoming substreams.
incoming: S, incoming: BoxStream<'static, Result<yamux::Stream, yamux::ConnectionError>>,
/// Handle to control the connection. /// Handle to control the connection.
control: yamux::Control, control: yamux::Control,
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote.
/// ///
/// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the /// The only way how yamux can make progress is by driving the stream. However, the
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
/// ///
/// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called.
/// Once the buffer is full, new inbound streams are dropped. /// Once the buffer is full, new inbound streams are dropped.
inbound_stream_buffer: VecDeque<yamux::Stream>, inbound_stream_buffer: VecDeque<Stream>,
/// Waker to be called when new inbound streams are available. /// Waker to be called when new inbound streams are available.
inbound_stream_waker: Option<Waker>, inbound_stream_waker: Option<Waker>,
_phantom: std::marker::PhantomData<C>,
} }
const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;
@ -68,7 +66,7 @@ impl<S> fmt::Debug for Muxer<S> {
} }
} }
impl<C> Muxer<Incoming<C>> impl<C> Muxer<C>
where where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static, C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
@ -78,43 +76,20 @@ where
let ctrl = conn.control(); let ctrl = conn.control();
Self { Self {
incoming: Incoming { incoming: yamux::into_stream(conn).err_into().boxed(),
stream: yamux::into_stream(conn).err_into().boxed(),
_marker: std::marker::PhantomData,
},
control: ctrl, control: ctrl,
inbound_stream_buffer: VecDeque::default(), inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None, inbound_stream_waker: None,
_phantom: Default::default(),
} }
} }
} }
impl<C> Muxer<LocalIncoming<C>> impl<C> StreamMuxer for Muxer<C>
where where
C: AsyncRead + AsyncWrite + Unpin + 'static, C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
/// Create a new Yamux connection (which is ![`Send`]). type Substream = Stream;
fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
let conn = yamux::Connection::new(io, cfg, mode);
let ctrl = conn.control();
Self {
incoming: LocalIncoming {
stream: yamux::into_stream(conn).err_into().boxed_local(),
_marker: std::marker::PhantomData,
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None,
}
}
}
impl<S> StreamMuxer for Muxer<S>
where
S: Stream<Item = Result<yamux::Stream, Error>> + Unpin,
{
type Substream = yamux::Stream;
type Error = Error; type Error = Error;
fn poll_inbound( fn poll_inbound(
@ -136,6 +111,7 @@ where
) -> Poll<Result<Self::Substream, Self::Error>> { ) -> Poll<Result<Self::Substream, Self::Error>> {
Pin::new(&mut self.control) Pin::new(&mut self.control)
.poll_open_stream(cx) .poll_open_stream(cx)
.map_ok(Stream)
.map_err(Error) .map_err(Error)
} }
@ -148,7 +124,7 @@ where
let inbound_stream = ready!(this.poll_inner(cx))?; let inbound_stream = ready!(this.poll_inner(cx))?;
if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
log::warn!("dropping {inbound_stream} because buffer is full"); log::warn!("dropping {} because buffer is full", inbound_stream.0);
drop(inbound_stream); drop(inbound_stream);
} else { } else {
this.inbound_stream_buffer.push_back(inbound_stream); this.inbound_stream_buffer.push_back(inbound_stream);
@ -168,7 +144,9 @@ where
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
while let Poll::Ready(maybe_inbound_stream) = self.incoming.poll_next_unpin(c)? { while let Poll::Ready(maybe_inbound_stream) =
self.incoming.poll_next_unpin(c).map_err(Error)?
{
match maybe_inbound_stream { match maybe_inbound_stream {
Some(inbound_stream) => mem::drop(inbound_stream), Some(inbound_stream) => mem::drop(inbound_stream),
None => return Poll::Ready(Ok(())), None => return Poll::Ready(Ok(())),
@ -179,14 +157,64 @@ where
} }
} }
impl<S> Muxer<S> /// A stream produced by the yamux multiplexer.
#[derive(Debug)]
pub struct Stream(yamux::Stream);
impl AsyncRead for Stream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_read_vectored(cx, bufs)
}
}
impl AsyncWrite for Stream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_write_vectored(cx, bufs)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_close(cx)
}
}
impl<C> Muxer<C>
where where
S: Stream<Item = Result<yamux::Stream, Error>> + Unpin, C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<yamux::Stream, Error>> { fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream, Error>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| { self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
let stream = maybe_stream let stream = maybe_stream
.transpose()? .transpose()
.map_err(Error)?
.map(Stream)
.ok_or(Error(ConnectionError::Closed))?; .ok_or(Error(ConnectionError::Closed))?;
Ok(stream) Ok(stream)
@ -241,10 +269,6 @@ impl WindowUpdateMode {
} }
} }
/// The yamux configuration for upgrading I/O resources which are ![`Send`].
#[derive(Clone)]
pub struct LocalConfig(Config);
impl Config { impl Config {
/// Creates a new `YamuxConfig` in client mode, regardless of whether /// Creates a new `YamuxConfig` in client mode, regardless of whether
/// it will be used for an inbound or outbound upgrade. /// it will be used for an inbound or outbound upgrade.
@ -288,12 +312,6 @@ impl Config {
self.inner.set_window_update_mode(mode.0); self.inner.set_window_update_mode(mode.0);
self self
} }
/// Converts the config into a [`LocalConfig`] for use with upgrades
/// of I/O streams that are ![`Send`].
pub fn into_local(self) -> LocalConfig {
LocalConfig(self)
}
} }
impl Default for Config { impl Default for Config {
@ -315,20 +333,11 @@ impl UpgradeInfo for Config {
} }
} }
impl UpgradeInfo for LocalConfig {
type Info = &'static str;
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once("/yamux/1.0.0")
}
}
impl<C> InboundUpgrade<C> for Config impl<C> InboundUpgrade<C> for Config
where where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static, C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type Output = Muxer<Incoming<C>>; type Output = Muxer<C>;
type Error = io::Error; type Error = io::Error;
type Future = future::Ready<Result<Self::Output, Self::Error>>; type Future = future::Ready<Result<Self::Output, Self::Error>>;
@ -338,26 +347,11 @@ where
} }
} }
impl<C> InboundUpgrade<C> for LocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Muxer<LocalIncoming<C>>;
type Error = io::Error;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
let cfg = self.0;
let mode = cfg.mode.unwrap_or(yamux::Mode::Server);
future::ready(Ok(Muxer::local(io, cfg.inner, mode)))
}
}
impl<C> OutboundUpgrade<C> for Config impl<C> OutboundUpgrade<C> for Config
where where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static, C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type Output = Muxer<Incoming<C>>; type Output = Muxer<C>;
type Error = io::Error; type Error = io::Error;
type Future = future::Ready<Result<Self::Output, Self::Error>>; type Future = future::Ready<Result<Self::Output, Self::Error>>;
@ -367,25 +361,10 @@ where
} }
} }
impl<C> OutboundUpgrade<C> for LocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Muxer<LocalIncoming<C>>;
type Error = io::Error;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
let cfg = self.0;
let mode = cfg.mode.unwrap_or(yamux::Mode::Client);
future::ready(Ok(Muxer::local(io, cfg.inner, mode)))
}
}
/// The Yamux [`StreamMuxer`] error type. /// The Yamux [`StreamMuxer`] error type.
#[derive(Debug, Error)] #[derive(Debug, Error)]
#[error("yamux error: {0}")] #[error(transparent)]
pub struct Error(#[from] yamux::ConnectionError); pub struct Error(yamux::ConnectionError);
impl From<Error> for io::Error { impl From<Error> for io::Error {
fn from(err: Error) -> Self { fn from(err: Error) -> Self {
@ -395,55 +374,3 @@ impl From<Error> for io::Error {
} }
} }
} }
/// The [`futures::stream::Stream`] of incoming substreams.
pub struct Incoming<T> {
stream: BoxStream<'static, Result<yamux::Stream, Error>>,
_marker: std::marker::PhantomData<T>,
}
impl<T> fmt::Debug for Incoming<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Incoming")
}
}
/// The [`futures::stream::Stream`] of incoming substreams (`!Send`).
pub struct LocalIncoming<T> {
stream: LocalBoxStream<'static, Result<yamux::Stream, Error>>,
_marker: std::marker::PhantomData<T>,
}
impl<T> fmt::Debug for LocalIncoming<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("LocalIncoming")
}
}
impl<T> Stream for Incoming<T> {
type Item = Result<yamux::Stream, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl<T> Unpin for Incoming<T> {}
impl<T> Stream for LocalIncoming<T> {
type Item = Result<yamux::Stream, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl<T> Unpin for LocalIncoming<T> {}