Custom yamux mode (#1691)

* Allow override the yamux connection mode.

* Add `multiplex_ext` to transport `Builder`.

This method exposes the connection info and connected point to a provided
function which creates the upgrade and can base the decision on `PeerId`
or other connection information such as IP address.

* Re-export `yamux::Mode`.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
Toralf Wittner 2020-08-17 10:07:41 +02:00 committed by GitHub
parent d1024d272c
commit 91d50b2723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 10 deletions

View File

@ -168,6 +168,37 @@ where
Multiplex { info: Some(i), upgrade }
})
}
/// Like [`Builder::multiplex`] but accepts a function which returns the upgrade.
///
/// The supplied function is applied to [`ConnectionInfo`] and [`ConnectedPoint`]
/// and returns an upgrade which receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process, yielding the underlying,
/// configured transport.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex_ext<C, M, U, I, E, F>(self, up: F)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
}
/// An upgrade that authenticates the remote peer, typically

View File

@ -28,7 +28,7 @@ use parking_lot::Mutex;
use std::{fmt, io, iter, ops::{Deref, DerefMut}, pin::Pin, task::Context};
use thiserror::Error;
pub use yamux::WindowUpdateMode;
pub use yamux::{Mode, WindowUpdateMode};
/// A Yamux connection.
///
@ -165,7 +165,10 @@ where
/// The yamux configuration.
#[derive(Clone)]
pub struct Config(yamux::Config);
pub struct Config {
config: yamux::Config,
mode: Option<yamux::Mode>
}
/// The yamux configuration for upgrading I/O resources which are ![`Send`].
#[derive(Clone)]
@ -173,7 +176,15 @@ pub struct LocalConfig(Config);
impl Config {
pub fn new(cfg: yamux::Config) -> Self {
Config(cfg)
Config { config: cfg, mode: None }
}
/// Override the connection mode.
///
/// This will always use the provided mode during the connection upgrade,
/// irrespective of whether an inbound or outbound upgrade happens.
pub fn override_mode(&mut self, mode: yamux::Mode) {
self.mode = Some(mode)
}
/// Turn this into a [`LocalConfig`] for use with upgrades of ![`Send`] resources.
@ -184,7 +195,7 @@ impl Config {
impl Default for Config {
fn default() -> Self {
Config(yamux::Config::default())
Config::new(yamux::Config::default())
}
}
@ -192,13 +203,13 @@ impl Deref for Config {
type Target = yamux::Config;
fn deref(&self) -> &Self::Target {
&self.0
&self.config
}
}
impl DerefMut for Config {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
&mut self.config
}
}
@ -229,7 +240,7 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Server)))
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Server))))
}
}
@ -242,7 +253,8 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Server)))
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Server))))
}
}
@ -255,7 +267,7 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Client)))
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Client))))
}
}
@ -268,7 +280,8 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Client)))
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Client))))
}
}