[mplex, yamux] Streamline configuration API. (#1822)

* Streamline mplex and yamux configurations.

  * For all configuration options that exist for both multiplexers
    and have the same semantics, use the same names for the
    configuration.
  * Rename `Config` to `YamuxConfig` for consistentcy with
    the majority of other protocols, e.g. `MplexConfig`, `PingConfig`,
    `KademliaConfig`, etc.
  * Completely hide `yamux` APIs within `libp2p-yamux`. This allows
    to fully control the libp2p API and streamline it with other
    muxer APIs, consciously choosing e.g. which configuration options
    to make configurable in libp2p and which to fix to certain values.
    It does also not necessarily prescribe new incompatible version bumps of
    yamux for `libp2p-yamux`, as no `yamux` types are exposed. The cost
    is some more duplication of configuration options in the API, as well
    as the need to update `libp2p-yamux` if `yamux` introduces new
    configuration options that `libp2p-yamux` wants to expose as well.

* Update CHANGELOGs.
This commit is contained in:
Roman Borschel
2020-11-06 09:46:22 +01:00
committed by GitHub
parent 689eeaf533
commit 2ba78b4ce7
10 changed files with 158 additions and 77 deletions

View File

@ -44,7 +44,7 @@ use libp2p::{
noise,
swarm::NetworkBehaviourEventProcess,
tcp::TcpConfig,
yamux::Config as YamuxConfig,
yamux::YamuxConfig,
Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport,
};
use std::{

View File

@ -1,5 +1,9 @@
# 0.24.0 [unreleased]
- Tweak the naming in the `MplexConfig` API for better
consistency with `libp2p-yamux`.
[PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822).
- Update dependencies.
# 0.23.1 [2020-10-28]

View File

@ -55,7 +55,7 @@ impl MplexConfig {
/// accumulates too quickly (judged by internal bounds), the
/// connection is closed with an error due to the misbehaved
/// remote.
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
pub fn set_max_num_streams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
}
@ -63,7 +63,7 @@ impl MplexConfig {
/// Sets the maximum number of frames buffered per substream.
///
/// A limit is necessary in order to avoid DoS attacks.
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
pub fn set_max_buffer_size(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
}
@ -72,14 +72,14 @@ impl MplexConfig {
/// for a substream.
///
/// See the documentation of [`MaxBufferBehaviour`].
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
pub fn set_max_buffer_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
}
/// Sets the frame size used when sending data. Capped at 1Mbyte as per the
/// Mplex spec.
pub fn split_send_size(&mut self, size: usize) -> &mut Self {
pub fn set_split_send_size(&mut self, size: usize) -> &mut Self {
let size = cmp::min(size, MAX_FRAME_SIZE);
self.split_send_size = size;
self

View File

@ -1,5 +1,9 @@
# 0.27.0 [unreleased]
- Tweak the naming in the `MplexConfig` API for better
consistency with `libp2p-mplex`.
[PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822).
- Update dependencies.
# 0.26.0 [2020-10-16]

View File

@ -22,18 +22,13 @@
//! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md).
use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}};
use libp2p_core::muxing::StreamMuxerEvent;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use parking_lot::Mutex;
use std::{fmt, io, iter, ops::{Deref, DerefMut}, pin::Pin, task::Context};
use std::{fmt, io, iter, pin::Pin, task::{Context, Poll}};
use thiserror::Error;
pub use yamux::{Mode, WindowUpdateMode};
/// A Yamux connection.
///
/// This implementation isn't capable of detecting when the underlying socket changes its address,
/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
pub struct Yamux<S>(Mutex<Inner<S>>);
impl<S> fmt::Debug for Yamux<S> {
@ -58,8 +53,7 @@ where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
/// Create a new Yamux connection.
pub fn new(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self {
cfg.set_read_after_close(false);
fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
let conn = yamux::Connection::new(io, cfg, mode);
let ctrl = conn.control();
let inner = Inner {
@ -78,8 +72,7 @@ where
C: AsyncRead + AsyncWrite + Unpin + 'static
{
/// Create a new Yamux connection (which is ![`Send`]).
pub fn local(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self {
cfg.set_read_after_close(false);
fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
let conn = yamux::Connection::new(io, cfg, mode);
let ctrl = conn.control();
let inner = Inner {
@ -93,9 +86,10 @@ where
}
}
type Poll<T> = std::task::Poll<Result<T, YamuxError>>;
pub type YamuxResult<T> = Result<T, YamuxError>;
impl<S> libp2p_core::StreamMuxer for Yamux<S>
/// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events.
impl<S> StreamMuxer for Yamux<S>
where
S: Stream<Item = Result<yamux::Stream, YamuxError>> + Unpin
{
@ -103,7 +97,9 @@ where
type OutboundSubstream = OpenSubstreamToken;
type Error = YamuxError;
fn poll_event(&self, c: &mut Context<'_>) -> Poll<StreamMuxerEvent<Self::Substream>> {
fn poll_event(&self, c: &mut Context<'_>)
-> Poll<YamuxResult<StreamMuxerEvent<Self::Substream>>>
{
let mut inner = self.0.lock();
match ready!(inner.incoming.poll_next_unpin(c)) {
Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))),
@ -116,7 +112,9 @@ where
OpenSubstreamToken(())
}
fn poll_outbound(&self, c: &mut Context<'_>, _: &mut OpenSubstreamToken) -> Poll<Self::Substream> {
fn poll_outbound(&self, c: &mut Context<'_>, _: &mut OpenSubstreamToken)
-> Poll<YamuxResult<Self::Substream>>
{
let mut inner = self.0.lock();
Pin::new(&mut inner.control).poll_open_stream(c).map_err(YamuxError)
}
@ -125,25 +123,33 @@ where
self.0.lock().control.abort_open_stream()
}
fn read_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8]) -> Poll<usize> {
fn read_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8])
-> Poll<YamuxResult<usize>>
{
Pin::new(s).poll_read(c, b).map_err(|e| YamuxError(e.into()))
}
fn write_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8]) -> Poll<usize> {
fn write_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8])
-> Poll<YamuxResult<usize>>
{
Pin::new(s).poll_write(c, b).map_err(|e| YamuxError(e.into()))
}
fn flush_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) -> Poll<()> {
fn flush_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream)
-> Poll<YamuxResult<()>>
{
Pin::new(s).poll_flush(c).map_err(|e| YamuxError(e.into()))
}
fn shutdown_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) -> Poll<()> {
fn shutdown_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream)
-> Poll<YamuxResult<()>>
{
Pin::new(s).poll_close(c).map_err(|e| YamuxError(e.into()))
}
fn destroy_substream(&self, _: Self::Substream) { }
fn close(&self, c: &mut Context<'_>) -> Poll<()> {
fn close(&self, c: &mut Context<'_>) -> Poll<YamuxResult<()>> {
let mut inner = self.0.lock();
if let std::task::Poll::Ready(x) = Pin::new(&mut inner.control).poll_close(c) {
return Poll::Ready(x.map_err(YamuxError))
@ -158,62 +164,122 @@ where
Poll::Pending
}
fn flush_all(&self, _: &mut Context<'_>) -> Poll<()> {
fn flush_all(&self, _: &mut Context<'_>) -> Poll<YamuxResult<()>> {
Poll::Ready(Ok(()))
}
}
/// The yamux configuration.
#[derive(Clone)]
pub struct Config {
config: yamux::Config,
pub struct YamuxConfig {
inner: yamux::Config,
mode: Option<yamux::Mode>
}
/// The window update mode determines when window updates are
/// sent to the remote, giving it new credit to send more data.
pub struct WindowUpdateMode(yamux::WindowUpdateMode);
impl WindowUpdateMode {
/// The window update mode whereby the remote is given
/// new credit via a window update whenever the current
/// receive window is exhausted when data is received,
/// i.e. this mode cannot exert back-pressure from application
/// code that is slow to read from a substream.
///
/// > **Note**: The receive buffer may overflow with this
/// > strategy if the receiver is too slow in reading the
/// > data from the buffer. The maximum receive buffer
/// > size must be tuned appropriately for the desired
/// > throughput and level of tolerance for (temporarily)
/// > slow receivers.
pub fn on_receive() -> Self {
WindowUpdateMode(yamux::WindowUpdateMode::OnReceive)
}
/// The window update mode whereby the remote is given new
/// credit only when the current receive window is exhausted
/// when data is read from the substream's receive buffer,
/// i.e. application code that is slow to read from a substream
/// exerts back-pressure on the remote.
///
/// > **Note**: If the receive window of a substream on
/// > both peers is exhausted and both peers are blocked on
/// > sending data before reading from the stream, a deadlock
/// > occurs. To avoid this situation, reading from a substream
/// > should never be blocked on writing to the same substream.
///
/// > **Note**: With this strategy, there is usually no point in the
/// > receive buffer being larger than the window size.
pub fn on_read() -> Self {
WindowUpdateMode(yamux::WindowUpdateMode::OnRead)
}
}
/// The yamux configuration for upgrading I/O resources which are ![`Send`].
#[derive(Clone)]
pub struct LocalConfig(Config);
pub struct YamuxLocalConfig(YamuxConfig);
impl Config {
pub fn new(cfg: yamux::Config) -> Self {
Config { config: cfg, mode: None }
impl YamuxConfig {
/// Creates a new `YamuxConfig` in client mode, regardless of whether
/// it will be used for an inbound or outbound upgrade.
pub fn client() -> Self {
let mut cfg = Self::default();
cfg.mode = Some(yamux::Mode::Client);
cfg
}
/// Override the connection mode.
///
/// This will always use the provided mode during the connection upgrade,
/// irrespective of whether an inbound or outbound upgrade happens.
pub fn override_mode(&mut self, mode: yamux::Mode) {
self.mode = Some(mode)
/// Creates a new `YamuxConfig` in server mode, regardless of whether
/// it will be used for an inbound or outbound upgrade.
pub fn server() -> Self {
let mut cfg = Self::default();
cfg.mode = Some(yamux::Mode::Server);
cfg
}
/// Turn this into a [`LocalConfig`] for use with upgrades of ![`Send`] resources.
pub fn local(self) -> LocalConfig {
LocalConfig(self)
/// Sets the size (in bytes) of the receive window per substream.
pub fn set_receive_window_size(&mut self, num_bytes: u32) -> &mut Self {
self.inner.set_receive_window(num_bytes);
self
}
/// Sets the maximum size (in bytes) of the receive buffer per substream.
pub fn set_max_buffer_size(&mut self, num_bytes: usize) -> &mut Self {
self.inner.set_max_buffer_size(num_bytes);
self
}
/// Sets the maximum number of concurrent substreams.
pub fn set_max_num_streams(&mut self, num_streams: usize) -> &mut Self {
self.inner.set_max_num_streams(num_streams);
self
}
/// Sets the window update mode that determines when the remote
/// is given new credit for sending more data.
pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self {
self.inner.set_window_update_mode(mode.0);
self
}
/// Converts the config into a [`YamuxLocalConfig`] for use with upgrades
/// of I/O streams that are ![`Send`].
pub fn into_local(self) -> YamuxLocalConfig {
YamuxLocalConfig(self)
}
}
impl Default for Config {
impl Default for YamuxConfig {
fn default() -> Self {
Config::new(yamux::Config::default())
let mut inner = yamux::Config::default();
// For conformity with mplex, read-after-close on a multiplexed
// connection is never permitted and not configurable.
inner.set_read_after_close(false);
YamuxConfig { inner, mode: None }
}
}
impl Deref for Config {
type Target = yamux::Config;
fn deref(&self) -> &Self::Target {
&self.config
}
}
impl DerefMut for Config {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.config
}
}
impl UpgradeInfo for Config {
impl UpgradeInfo for YamuxConfig {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
@ -222,7 +288,7 @@ impl UpgradeInfo for Config {
}
}
impl UpgradeInfo for LocalConfig {
impl UpgradeInfo for YamuxLocalConfig {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
@ -231,7 +297,7 @@ impl UpgradeInfo for LocalConfig {
}
}
impl<C> InboundUpgrade<C> for Config
impl<C> InboundUpgrade<C> for YamuxConfig
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
@ -240,11 +306,12 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Server))))
let mode = self.mode.unwrap_or(yamux::Mode::Server);
future::ready(Ok(Yamux::new(io, self.inner, mode)))
}
}
impl<C> InboundUpgrade<C> for LocalConfig
impl<C> InboundUpgrade<C> for YamuxLocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static
{
@ -254,11 +321,12 @@ where
fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Server))))
let mode = cfg.mode.unwrap_or(yamux::Mode::Server);
future::ready(Ok(Yamux::local(io, cfg.inner, mode)))
}
}
impl<C> OutboundUpgrade<C> for Config
impl<C> OutboundUpgrade<C> for YamuxConfig
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
@ -267,11 +335,12 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Client))))
let mode = self.mode.unwrap_or(yamux::Mode::Client);
future::ready(Ok(Yamux::new(io, self.inner, mode)))
}
}
impl<C> OutboundUpgrade<C> for LocalConfig
impl<C> OutboundUpgrade<C> for YamuxLocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static
{
@ -281,18 +350,22 @@ where
fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Client))))
let mode = cfg.mode.unwrap_or(yamux::Mode::Client);
future::ready(Ok(Yamux::local(io, cfg.inner, mode)))
}
}
/// The Yamux [`libp2p_core::StreamMuxer`] error type.
/// The Yamux [`StreamMuxer`] error type.
#[derive(Debug, Error)]
#[error("yamux error: {0}")]
pub struct YamuxError(#[from] pub yamux::ConnectionError);
pub struct YamuxError(#[from] yamux::ConnectionError);
impl Into<io::Error> for YamuxError {
fn into(self: YamuxError) -> io::Error {
io::Error::new(io::ErrorKind::Other, self.to_string())
match self.0 {
yamux::ConnectionError::Io(e) => e,
e => io::Error::new(io::ErrorKind::Other, e)
}
}
}

View File

@ -150,7 +150,7 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
.authenticate(PlainText2Config {
local_public_key: public_key.clone(),
})
.multiplex(yamux::Config::default())
.multiplex(yamux::YamuxConfig::default())
.boxed();
let peer_id = public_key.clone().into_peer_id();

View File

@ -61,7 +61,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(yamux::Config::default())
.multiplex(yamux::YamuxConfig::default())
.boxed();
let local_id = local_public_key.clone().into_peer_id();

View File

@ -207,7 +207,7 @@ fn mk_transport(muxer: MuxerChoice) -> (
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(match muxer {
MuxerChoice::Yamux =>
upgrade::EitherUpgrade::A(yamux::Config::default()),
upgrade::EitherUpgrade::A(yamux::YamuxConfig::default()),
MuxerChoice::Mplex =>
upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
})

View File

@ -221,7 +221,7 @@ fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) {
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_yamux::Config::default())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed())
}

View File

@ -91,7 +91,7 @@
//! let id_keys = Keypair::generate_ed25519();
//! let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
//! let noise = noise::NoiseConfig::xx(noise_keys).into_authenticated();
//! let yamux = yamux::Config::default();
//! let yamux = yamux::YamuxConfig::default();
//! let transport = tcp.upgrade(upgrade::Version::V1).authenticate(noise).multiplex(yamux);
//! # }
//! ```
@ -302,7 +302,7 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair)
Ok(transport
.upgrade(core::upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
.multiplex(core::upgrade::SelectUpgrade::new(yamux::YamuxConfig::default(), mplex::MplexConfig::default()))
.timeout(std::time::Duration::from_secs(20))
.boxed())
}
@ -334,7 +334,7 @@ pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreS
.and_then(move |socket, _| PnetConfig::new(psk).handshake(socket))
.upgrade(core::upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
.multiplex(core::upgrade::SelectUpgrade::new(yamux::YamuxConfig::default(), mplex::MplexConfig::default()))
.timeout(std::time::Duration::from_secs(20))
.boxed())
}