From 234a0d24dbb211e0264afe34f41afdfb873f74ce Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 12 May 2023 05:01:06 +0200 Subject: [PATCH] feat(yamux): future-proof public API With this set of changes, we prepare the public API of `libp2p-yamux` to be as minimal as possible and allow for upgrades of the underlying `yamux` library in patch releases. Related: #3013. Pull-Request: #3908. --- muxers/yamux/CHANGELOG.md | 4 + muxers/yamux/src/lib.rs | 227 +++++++++++++------------------------- 2 files changed, 81 insertions(+), 150 deletions(-) diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index e4c72580..126b2613 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -6,8 +6,12 @@ - Remove deprecated items. See [PR 3897]. +- Remove `Incoming`, `LocalIncoming` and `LocalConfig` as well as anything from the underlying `yamux` crate from the public API. + See [PR 3908]. + [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3897]: https://github.com/libp2p/rust-libp2p/pull/3897 +[PR 3908]: https://github.com/libp2p/rust-libp2p/pull/3908 ## 0.43.1 diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index f398c022..fcfa068d 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,15 +22,11 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use futures::{ - future, - prelude::*, - ready, - stream::{BoxStream, LocalBoxStream}, -}; +use futures::{future, prelude::*, ready, stream::BoxStream}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::collections::VecDeque; +use std::io::{IoSlice, IoSliceMut}; use std::task::Waker; use std::{ fmt, io, iter, mem, @@ -41,23 +37,25 @@ use thiserror::Error; use yamux::ConnectionError; /// A Yamux connection. -pub struct Muxer { +pub struct Muxer { /// The [`futures::stream::Stream`] of incoming substreams. - incoming: S, + incoming: BoxStream<'static, Result>, /// Handle to control the connection. control: yamux::Control, /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. /// - /// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the + /// The only way how yamux can make progress is by driving the stream. However, the /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. /// /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. /// Once the buffer is full, new inbound streams are dropped. - inbound_stream_buffer: VecDeque, + inbound_stream_buffer: VecDeque, /// Waker to be called when new inbound streams are available. inbound_stream_waker: Option, + + _phantom: std::marker::PhantomData, } const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; @@ -68,7 +66,7 @@ impl fmt::Debug for Muxer { } } -impl Muxer> +impl Muxer where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { @@ -78,43 +76,20 @@ where let ctrl = conn.control(); Self { - incoming: Incoming { - stream: yamux::into_stream(conn).err_into().boxed(), - _marker: std::marker::PhantomData, - }, + incoming: yamux::into_stream(conn).err_into().boxed(), control: ctrl, inbound_stream_buffer: VecDeque::default(), inbound_stream_waker: None, + _phantom: Default::default(), } } } -impl Muxer> +impl StreamMuxer for Muxer where - C: AsyncRead + AsyncWrite + Unpin + 'static, + C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - /// Create a new Yamux connection (which is ![`Send`]). - fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { - let conn = yamux::Connection::new(io, cfg, mode); - let ctrl = conn.control(); - - Self { - incoming: LocalIncoming { - stream: yamux::into_stream(conn).err_into().boxed_local(), - _marker: std::marker::PhantomData, - }, - control: ctrl, - inbound_stream_buffer: VecDeque::default(), - inbound_stream_waker: None, - } - } -} - -impl StreamMuxer for Muxer -where - S: Stream> + Unpin, -{ - type Substream = yamux::Stream; + type Substream = Stream; type Error = Error; fn poll_inbound( @@ -136,6 +111,7 @@ where ) -> Poll> { Pin::new(&mut self.control) .poll_open_stream(cx) + .map_ok(Stream) .map_err(Error) } @@ -148,7 +124,7 @@ where let inbound_stream = ready!(this.poll_inner(cx))?; if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { - log::warn!("dropping {inbound_stream} because buffer is full"); + log::warn!("dropping {} because buffer is full", inbound_stream.0); drop(inbound_stream); } else { this.inbound_stream_buffer.push_back(inbound_stream); @@ -168,7 +144,9 @@ where return Poll::Ready(Ok(())); } - while let Poll::Ready(maybe_inbound_stream) = self.incoming.poll_next_unpin(c)? { + while let Poll::Ready(maybe_inbound_stream) = + self.incoming.poll_next_unpin(c).map_err(Error)? + { match maybe_inbound_stream { Some(inbound_stream) => mem::drop(inbound_stream), None => return Poll::Ready(Ok(())), @@ -179,14 +157,64 @@ where } } -impl Muxer +/// A stream produced by the yamux multiplexer. +#[derive(Debug)] +pub struct Stream(yamux::Stream); + +impl AsyncRead for Stream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut self.0).poll_read_vectored(cx, bufs) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.0).poll_write_vectored(cx, bufs) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_close(cx) + } +} + +impl Muxer where - S: Stream> + Unpin, + C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { self.incoming.poll_next_unpin(cx).map(|maybe_stream| { let stream = maybe_stream - .transpose()? + .transpose() + .map_err(Error)? + .map(Stream) .ok_or(Error(ConnectionError::Closed))?; Ok(stream) @@ -241,10 +269,6 @@ impl WindowUpdateMode { } } -/// The yamux configuration for upgrading I/O resources which are ![`Send`]. -#[derive(Clone)] -pub struct LocalConfig(Config); - impl Config { /// Creates a new `YamuxConfig` in client mode, regardless of whether /// it will be used for an inbound or outbound upgrade. @@ -288,12 +312,6 @@ impl Config { self.inner.set_window_update_mode(mode.0); self } - - /// Converts the config into a [`LocalConfig`] for use with upgrades - /// of I/O streams that are ![`Send`]. - pub fn into_local(self) -> LocalConfig { - LocalConfig(self) - } } impl Default for Config { @@ -315,20 +333,11 @@ impl UpgradeInfo for Config { } } -impl UpgradeInfo for LocalConfig { - type Info = &'static str; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once("/yamux/1.0.0") - } -} - impl InboundUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Output = Muxer>; + type Output = Muxer; type Error = io::Error; type Future = future::Ready>; @@ -338,26 +347,11 @@ where } } -impl InboundUpgrade for LocalConfig -where - C: AsyncRead + AsyncWrite + Unpin + 'static, -{ - type Output = Muxer>; - type Error = io::Error; - type Future = future::Ready>; - - fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { - let cfg = self.0; - let mode = cfg.mode.unwrap_or(yamux::Mode::Server); - future::ready(Ok(Muxer::local(io, cfg.inner, mode))) - } -} - impl OutboundUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Output = Muxer>; + type Output = Muxer; type Error = io::Error; type Future = future::Ready>; @@ -367,25 +361,10 @@ where } } -impl OutboundUpgrade for LocalConfig -where - C: AsyncRead + AsyncWrite + Unpin + 'static, -{ - type Output = Muxer>; - type Error = io::Error; - type Future = future::Ready>; - - fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - let cfg = self.0; - let mode = cfg.mode.unwrap_or(yamux::Mode::Client); - future::ready(Ok(Muxer::local(io, cfg.inner, mode))) - } -} - /// The Yamux [`StreamMuxer`] error type. #[derive(Debug, Error)] -#[error("yamux error: {0}")] -pub struct Error(#[from] yamux::ConnectionError); +#[error(transparent)] +pub struct Error(yamux::ConnectionError); impl From for io::Error { fn from(err: Error) -> Self { @@ -395,55 +374,3 @@ impl From for io::Error { } } } - -/// The [`futures::stream::Stream`] of incoming substreams. -pub struct Incoming { - stream: BoxStream<'static, Result>, - _marker: std::marker::PhantomData, -} - -impl fmt::Debug for Incoming { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Incoming") - } -} - -/// The [`futures::stream::Stream`] of incoming substreams (`!Send`). -pub struct LocalIncoming { - stream: LocalBoxStream<'static, Result>, - _marker: std::marker::PhantomData, -} - -impl fmt::Debug for LocalIncoming { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("LocalIncoming") - } -} - -impl Stream for Incoming { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.as_mut().poll_next_unpin(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -impl Unpin for Incoming {} - -impl Stream for LocalIncoming { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.as_mut().poll_next_unpin(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -impl Unpin for LocalIncoming {}