[mplex] Refactoring with Patches (#1769)

* Refactor Mplex.

Thereby addressing the following issues:

  * Send a `Reset` frame when open substreams get dropped (313).
  * Avoid stalls caused by a read operation on one substream
    reading (and buffering) frames for another substream without
    notifying the corresponding task. I.e. the tracked read-interest
    must be scoped to a substream.
  * Remove dropped substreams from the tracked set of open
    substreams, to avoid artificially running into substream
    limits.

* Update CHANGELOG.

* Refine behaviour of dropping substreams.

By taking the substream state into account. The refined
behaviour is modeled after the behaviour of Yamux.

* Tweak docs and recv buffer retention.

* Further small tweaks.

 * Make the pending frames a FIFO queue.
 * Take more care to avoid keeping read-wakers around
   and to notify them when streams close.

* Prefer wake over unregister.

It is probably safer to always wake pending wakers.

* Update muxers/mplex/src/codec.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Update muxers/mplex/src/io.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Some review feedback and cosmetics.

* Update muxers/mplex/src/io.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Revise read control flow for clarity.

While seemingly duplicating some control flow between
`poll_next_strean` and `poll_read_stream`, the individual
control flow of each read operation is easier to follow.

* CI

* Rename Status::Ok to Status::Open.

* Rename pending_flush to pending_flush_open.

* Finishing touches.

* Tweak changelog.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Roman Borschel 2020-09-28 10:30:49 +02:00 committed by GitHub
parent 9365be711f
commit 0b18b864f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1106 additions and 614 deletions

View File

@ -68,7 +68,7 @@ libp2p-floodsub = { version = "0.22.0", path = "protocols/floodsub", optional =
libp2p-gossipsub = { version = "0.22.1", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.22.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.23.1", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.22.1", path = "muxers/mplex", optional = true }
libp2p-mplex = { version = "0.23.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.24.1", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }

View File

@ -1,4 +1,20 @@
# 0.22.1 [unreleased]
# 0.23.0 [unreleased]
- Address a potential stall when reading from substreams.
- Send a `Reset` or `Close` to the remote when a substream is dropped,
as appropriate for the current state of the substream,
removing that substream from the tracked open substreams,
to avoid artificially running into substream limits.
- Change the semantics of the `max_substreams` configuration. Now,
outbound substream attempts beyond the configured limit are delayed,
with a task wakeup once an existing substream closes, i.e. the limit
results in back-pressure for new outbound substreams. New inbound
substreams beyond the limit are immediately answered with a `Reset`.
If too many (by some internal threshold) pending frames accumulate,
e.g. as a result of an aggressive number of inbound substreams being
opened beyond the configured limit, the connection is closed ("DoS protection").
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-mplex"
edition = "2018"
description = "Mplex multiplexing protocol for libp2p"
version = "0.22.1"
version = "0.23.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -21,7 +21,7 @@
use libp2p_core::Endpoint;
use futures_codec::{Decoder, Encoder};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::{fmt, mem};
use bytes::{BufMut, Bytes, BytesMut};
use unsigned_varint::{codec, encode};
@ -30,51 +30,103 @@ use unsigned_varint::{codec, encode};
// send a 4 TB-long packet full of zeroes that we kill our process with an OOM error.
pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024;
#[derive(Debug, Clone)]
pub enum Elem {
Open { substream_id: u32 },
Data { substream_id: u32, endpoint: Endpoint, data: Bytes },
Close { substream_id: u32, endpoint: Endpoint },
Reset { substream_id: u32, endpoint: Endpoint },
/// A unique identifier used by the local node for a substream.
///
/// `LocalStreamId`s are sent with frames to the remote, where
/// they are received as `RemoteStreamId`s.
///
/// > **Note**: Streams are identified by a number and a role encoded as a flag
/// > on each frame that is either odd (for receivers) or even (for initiators).
/// > `Open` frames do not have a flag, but are sent unidirectionally. As a
/// > consequence, we need to remember if a stream was initiated by us or remotely
/// > and we store the information from our point of view as a `LocalStreamId`,
/// > i.e. receiving an `Open` frame results in a local ID with role `Endpoint::Listener`,
/// > whilst sending an `Open` frame results in a local ID with role `Endpoint::Dialer`.
/// > Receiving a frame with a flag identifying the remote as a "receiver" means that
/// > we initiated the stream, so the local ID has the role `Endpoint::Dialer`.
/// > Conversely, when receiving a frame with a flag identifying the remote as a "sender",
/// > the corresponding local ID has the role `Endpoint::Listener`.
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct LocalStreamId {
num: u32,
role: Endpoint,
}
impl Elem {
/// Returns the ID of the substream of the message.
pub fn substream_id(&self) -> u32 {
impl fmt::Display for LocalStreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.role {
Endpoint::Dialer => write!(f, "({}/initiator)", self.num),
Endpoint::Listener => write!(f, "({}/receiver)", self.num),
}
}
}
/// A unique identifier used by the remote node for a substream.
///
/// `RemoteStreamId`s are received with frames from the remote
/// and mapped by the receiver to `LocalStreamId`s via
/// [`RemoteStreamId::into_local()`].
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct RemoteStreamId {
num: u32,
role: Endpoint,
}
impl LocalStreamId {
pub fn dialer(num: u32) -> Self {
Self { num, role: Endpoint::Dialer }
}
pub fn next(self) -> Self {
Self {
num: self.num.checked_add(1).expect("Mplex substream ID overflowed"),
.. self
}
}
}
impl RemoteStreamId {
fn dialer(num: u32) -> Self {
Self { num, role: Endpoint::Dialer }
}
fn listener(num: u32) -> Self {
Self { num, role: Endpoint::Listener }
}
/// Converts this `RemoteStreamId` into the corresponding `LocalStreamId`
/// that identifies the same substream.
pub fn into_local(self) -> LocalStreamId {
LocalStreamId {
num: self.num,
role: !self.role,
}
}
}
/// An Mplex protocol frame.
#[derive(Debug, Clone)]
pub enum Frame<T> {
Open { stream_id: T },
Data { stream_id: T, data: Bytes },
Close { stream_id: T },
Reset { stream_id: T },
}
impl Frame<RemoteStreamId> {
fn remote_id(&self) -> RemoteStreamId {
match *self {
Elem::Open { substream_id } => substream_id,
Elem::Data { substream_id, .. } => substream_id,
Elem::Close { substream_id, .. } => substream_id,
Elem::Reset { substream_id, .. } => substream_id,
Frame::Open { stream_id } => stream_id,
Frame::Data { stream_id, .. } => stream_id,
Frame::Close { stream_id, .. } => stream_id,
Frame::Reset { stream_id, .. } => stream_id,
}
}
pub fn endpoint(&self) -> Option<Endpoint> {
match *self {
Elem::Open { .. } => None,
Elem::Data { endpoint, .. } => Some(endpoint),
Elem::Close { endpoint, .. } => Some(endpoint),
Elem::Reset { endpoint, .. } => Some(endpoint)
}
}
/// Returns true if this message is `Close` or `Reset`.
#[inline]
pub fn is_close_or_reset_msg(&self) -> bool {
match self {
Elem::Close { .. } | Elem::Reset { .. } => true,
_ => false,
}
}
/// Returns true if this message is `Open`.
#[inline]
pub fn is_open_msg(&self) -> bool {
if let Elem::Open { .. } = self {
true
} else {
false
}
/// Gets the `LocalStreamId` corresponding to the `RemoteStreamId`
/// received with this frame.
pub fn local_id(&self) -> LocalStreamId {
self.remote_id().into_local()
}
}
@ -101,7 +153,7 @@ impl Codec {
}
impl Decoder for Codec {
type Item = Elem;
type Item = Frame<RemoteStreamId>;
type Error = IoError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@ -143,15 +195,15 @@ impl Decoder for Codec {
}
let buf = src.split_to(len);
let substream_id = (header >> 3) as u32;
let num = (header >> 3) as u32;
let out = match header & 7 {
0 => Elem::Open { substream_id },
1 => Elem::Data { substream_id, endpoint: Endpoint::Listener, data: buf.freeze() },
2 => Elem::Data { substream_id, endpoint: Endpoint::Dialer, data: buf.freeze() },
3 => Elem::Close { substream_id, endpoint: Endpoint::Listener },
4 => Elem::Close { substream_id, endpoint: Endpoint::Dialer },
5 => Elem::Reset { substream_id, endpoint: Endpoint::Listener },
6 => Elem::Reset { substream_id, endpoint: Endpoint::Dialer },
0 => Frame::Open { stream_id: RemoteStreamId::dialer(num) },
1 => Frame::Data { stream_id: RemoteStreamId::listener(num), data: buf.freeze() },
2 => Frame::Data { stream_id: RemoteStreamId::dialer(num), data: buf.freeze() },
3 => Frame::Close { stream_id: RemoteStreamId::listener(num) },
4 => Frame::Close { stream_id: RemoteStreamId::dialer(num) },
5 => Frame::Reset { stream_id: RemoteStreamId::listener(num) },
6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) },
_ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
@ -171,31 +223,31 @@ impl Decoder for Codec {
}
impl Encoder for Codec {
type Item = Elem;
type Item = Frame<LocalStreamId>;
type Error = IoError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Elem::Open { substream_id } => {
(u64::from(substream_id) << 3, Bytes::new())
Frame::Open { stream_id } => {
(u64::from(stream_id.num) << 3, Bytes::new())
},
Elem::Data { substream_id, endpoint: Endpoint::Listener, data } => {
(u64::from(substream_id) << 3 | 1, data)
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Listener }, data } => {
(u64::from(num) << 3 | 1, data)
},
Elem::Data { substream_id, endpoint: Endpoint::Dialer, data } => {
(u64::from(substream_id) << 3 | 2, data)
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Dialer }, data } => {
(u64::from(num) << 3 | 2, data)
},
Elem::Close { substream_id, endpoint: Endpoint::Listener } => {
(u64::from(substream_id) << 3 | 3, Bytes::new())
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 3, Bytes::new())
},
Elem::Close { substream_id, endpoint: Endpoint::Dialer } => {
(u64::from(substream_id) << 3 | 4, Bytes::new())
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 4, Bytes::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Listener } => {
(u64::from(substream_id) << 3 | 5, Bytes::new())
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 5, Bytes::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Dialer } => {
(u64::from(substream_id) << 3 | 6, Bytes::new())
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 6, Bytes::new())
},
};
@ -225,9 +277,9 @@ mod tests {
#[test]
fn encode_large_messages_fails() {
let mut enc = Codec::new();
let endpoint = Endpoint::Dialer;
let role = Endpoint::Dialer;
let data = Bytes::from(&[123u8; MAX_FRAME_SIZE + 1][..]);
let bad_msg = Elem::Data{ substream_id: 123, endpoint, data };
let bad_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
let mut out = BytesMut::new();
match enc.encode(bad_msg, &mut out) {
Err(e) => assert_eq!(e.to_string(), "data size exceed maximum"),
@ -235,7 +287,7 @@ mod tests {
}
let data = Bytes::from(&[123u8; MAX_FRAME_SIZE][..]);
let ok_msg = Elem::Data{ substream_id: 123, endpoint, data };
let ok_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
assert!(enc.encode(ok_msg, &mut out).is_ok());
}
}

106
muxers/mplex/src/config.rs Normal file
View File

@ -0,0 +1,106 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::MAX_FRAME_SIZE;
use std::cmp;
/// Configuration for the multiplexer.
#[derive(Debug, Clone)]
pub struct MplexConfig {
/// Maximum number of simultaneously-open substreams.
pub(crate) max_substreams: usize,
/// Maximum number of frames in the internal buffer.
pub(crate) max_buffer_len: usize,
/// Behaviour when the buffer size limit is reached.
pub(crate) max_buffer_behaviour: MaxBufferBehaviour,
/// When sending data, split it into frames whose maximum size is this value
/// (max 1MByte, as per the Mplex spec).
pub(crate) split_send_size: usize,
}
impl MplexConfig {
/// Builds the default configuration.
pub fn new() -> MplexConfig {
Default::default()
}
/// Sets the maximum number of simultaneously open substreams.
///
/// When the limit is reached, opening of outbound substreams
/// is delayed until another substream closes, whereas new
/// inbound substreams are immediately answered with a `Reset`.
/// If the number of inbound substreams that need to be reset
/// accumulates too quickly (judged by internal bounds), the
/// connection is closed, the connection is closed with an error
/// due to the misbehaved remote.
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
}
/// Sets the maximum number of frames buffered that have
/// not yet been consumed.
///
/// A limit is necessary in order to avoid DoS attacks.
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
}
/// Sets the behaviour when the maximum buffer length has been reached.
///
/// See the documentation of `MaxBufferBehaviour`.
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
}
/// Sets the frame size used when sending data. Capped at 1Mbyte as per the
/// Mplex spec.
pub fn split_send_size(&mut self, size: usize) -> &mut Self {
let size = cmp::min(size, MAX_FRAME_SIZE);
self.split_send_size = size;
self
}
}
/// Behaviour when the maximum length of the buffer is reached.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MaxBufferBehaviour {
/// Produce an error on all the substreams.
CloseAll,
/// No new message will be read from the underlying connection if the buffer is full.
///
/// This can potentially introduce a deadlock if you are waiting for a message from a substream
/// before processing the messages received on another substream.
Block,
}
impl Default for MplexConfig {
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
max_buffer_len: 4096,
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
split_send_size: 1024,
}
}
}

793
muxers/mplex/src/io.rs Normal file
View File

@ -0,0 +1,793 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use crate::{MplexConfig, MaxBufferBehaviour};
use crate::codec::{Codec, Frame, LocalStreamId, RemoteStreamId};
use log::{debug, trace};
use fnv::FnvHashMap;
use futures::{prelude::*, ready, stream::Fuse};
use futures::task::{ArcWake, waker_ref, WakerRef};
use futures_codec::Framed;
use parking_lot::Mutex;
use std::collections::{VecDeque, hash_map::Entry};
use std::{cmp, io, mem, sync::Arc, task::{Context, Poll, Waker}};
pub use std::io::{Result, Error, ErrorKind};
/// A multiplexed I/O stream.
pub struct Multiplexed<C> {
/// The current operating status.
status: Status,
/// The underlying I/O stream.
io: Fuse<Framed<C, Codec>>,
/// The configuration.
config: MplexConfig,
/// Buffer of received frames that have not yet been consumed.
buffer: Vec<Frame<RemoteStreamId>>,
/// Whether a flush is pending due to one or more new outbound
/// `Open` frames, before reading frames can proceed.
pending_flush_open: bool,
/// Pending frames to send at the next opportunity.
///
/// An opportunity for sending pending frames is every flush
/// or read operation. In the former case, sending of all
/// pending frames must complete before the flush can complete.
/// In the latter case, the read operation can proceed even
/// if some or all of the pending frames cannot be sent.
pending_frames: VecDeque<Frame<LocalStreamId>>,
/// The substreams that are considered at least half-open.
open_substreams: FnvHashMap<LocalStreamId, SubstreamState>,
/// The ID for the next outbound substream.
next_outbound_stream_id: LocalStreamId,
/// Registry of wakers for pending tasks interested in reading.
notifier_read: Arc<NotifierRead>,
/// Registry of wakers for pending tasks interested in writing.
notifier_write: Arc<NotifierWrite>,
/// Registry of wakers for pending tasks interested in opening
/// an outbound substream, when the configured limit is reached.
notifier_open: Arc<NotifierOpen>,
}
/// The operation status of a `Multiplexed` I/O stream.
#[derive(Debug)]
enum Status {
/// The stream is considered open and healthy.
Open,
/// The stream has been actively closed.
Closed,
/// The stream has encountered a fatal error.
Err(io::Error),
}
impl<C> Multiplexed<C>
where
C: AsyncRead + AsyncWrite + Unpin
{
/// Creates a new multiplexed I/O stream.
pub fn new(io: C, config: MplexConfig) -> Self {
let max_buffer_len = config.max_buffer_len;
Multiplexed {
config,
status: Status::Open,
io: Framed::new(io, Codec::new()).fuse(),
buffer: Vec::with_capacity(cmp::min(max_buffer_len, 512)),
open_substreams: Default::default(),
pending_flush_open: false,
pending_frames: Default::default(),
next_outbound_stream_id: LocalStreamId::dialer(0),
notifier_read: Arc::new(NotifierRead {
pending: Mutex::new(Default::default()),
}),
notifier_write: Arc::new(NotifierWrite {
pending: Mutex::new(Default::default()),
}),
notifier_open: Arc::new(NotifierOpen {
pending: Mutex::new(Default::default())
})
}
}
/// Flushes the underlying I/O stream.
pub fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &self.status {
Status::Closed => return Poll::Ready(Ok(())),
Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
Status::Open => {}
}
// Send any pending frames.
ready!(self.send_pending_frames(cx))?;
// Flush the underlying I/O stream.
let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
match ready!(self.io.poll_flush_unpin(&mut Context::from_waker(&waker))) {
Err(e) => Poll::Ready(self.on_error(e)),
Ok(()) => {
self.pending_flush_open = false;
Poll::Ready(Ok(()))
}
}
}
/// Closes the underlying I/O stream.
///
/// > **Note**: No `Close` or `Reset` frames are sent on open substreams
/// > before closing the underlying connection. However, the connection
/// > close implies a flush of any frames already sent.
pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &self.status {
Status::Closed => return Poll::Ready(Ok(())),
Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
Status::Open => {}
}
// Note: We do not make the effort to send pending `Reset` frames
// here, we only close (and thus flush) the underlying I/O stream.
let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
match self.io.poll_close_unpin(&mut Context::from_waker(&waker)) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(self.on_error(e)),
Poll::Ready(Ok(())) => {
self.pending_frames = VecDeque::new();
// We do not support read-after-close on the underlying
// I/O stream, hence clearing the buffer and substreams.
self.buffer = Default::default();
self.open_substreams = Default::default();
self.status = Status::Closed;
Poll::Ready(Ok(()))
}
}
}
/// Waits for a new inbound substream, returning the corresponding `LocalStreamId`.
pub fn poll_next_stream(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<LocalStreamId>> {
self.guard_open()?;
// Try to read from the buffer first.
while let Some((pos, stream_id)) = self.buffer.iter()
.enumerate()
.find_map(|(pos, frame)| match frame {
Frame::Open { stream_id } => Some((pos, stream_id.into_local())),
_ => None
})
{
if self.buffer.len() == self.config.max_buffer_len {
// The buffer is full and no longer will be, so notify all pending readers.
ArcWake::wake_by_ref(&self.notifier_read);
}
self.buffer.remove(pos);
if let Some(id) = self.on_open(stream_id)? {
log::debug!("New inbound stream: {}", id);
return Poll::Ready(Ok(id));
}
}
loop {
// Wait for the next inbound `Open` frame.
match ready!(self.poll_read_frame(cx, None))? {
Frame::Open { stream_id } => {
if let Some(id) = self.on_open(stream_id.into_local())? {
log::debug!("New inbound stream: {}", id);
return Poll::Ready(Ok(id))
}
}
frame @ Frame::Data { .. } => {
let id = frame.local_id();
if self.can_read(&id) {
trace!("Buffering {:?} (total: {})", frame, self.buffer.len() + 1);
self.buffer.push(frame);
self.notifier_read.wake_by_id(id);
} else {
trace!("Dropping {:?} for closed or unknown substream {}", frame, id);
}
}
Frame::Close { stream_id } => {
self.on_close(stream_id.into_local())?;
}
Frame::Reset { stream_id } => {
self.on_reset(stream_id.into_local())
}
}
}
}
/// Creates a new (outbound) substream, returning the allocated stream ID.
pub fn poll_open_stream(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<LocalStreamId>> {
self.guard_open()?;
// Check the stream limits.
if self.open_substreams.len() >= self.config.max_substreams {
debug!("Maximum number of substreams reached: {}", self.config.max_substreams);
let _ = NotifierOpen::register(&self.notifier_open, cx.waker());
return Poll::Pending
}
// Send the `Open` frame.
let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
Ok(()) => {
let stream_id = self.next_outbound_stream_id();
let frame = Frame::Open { stream_id };
match self.io.start_send_unpin(frame) {
Ok(()) => {
self.open_substreams.insert(stream_id, SubstreamState::Open);
// The flush is delayed and the `Open` frame may be sent
// together with other frames in the same transport packet.
self.pending_flush_open = true;
Poll::Ready(Ok(stream_id))
}
Err(e) => Poll::Ready(self.on_error(e)),
}
},
Err(e) => Poll::Ready(self.on_error(e))
}
}
/// Immediately drops a substream.
///
/// All locally allocated resources for the dropped substream
/// are freed and the substream becomes unavailable for both
/// reading and writing immediately. The remote is informed
/// based on the current state of the substream:
///
/// * If the substream was open, a `Reset` frame is sent at
/// the next opportunity.
/// * If the substream was half-closed, i.e. a `Close` frame
/// has already been sent, nothing further happens.
/// * If the substream was half-closed by the remote, i.e.
/// a `Close` frame has already been received, a `Close`
/// frame is sent at the next opportunity.
///
/// If the multiplexed stream is closed or encountered
/// an error earlier, or there is no known substream with
/// the given ID, this is a no-op.
///
/// > **Note**: If a substream is not read until EOF,
/// > `drop_substream` _must_ eventually be called to avoid
/// > leaving unread frames in the receive buffer.
pub fn drop_stream(&mut self, id: LocalStreamId) {
// Check if the underlying stream is ok.
match self.status {
Status::Closed | Status::Err(_) => return,
Status::Open => {},
}
// Remove any frames still buffered for that stream. The stream
// may already be fully closed (i.e. not in `open_substreams`)
// but still have unread buffered frames.
self.buffer.retain(|frame| frame.local_id() != id);
// If there is still a task waker interested in reading from that
// stream, wake it to avoid leaving it dangling and notice that
// the stream is gone. In contrast, wakers for write operations
// are all woken on every new write opportunity.
self.notifier_read.wake_by_id(id);
// Remove the substream, scheduling pending frames as necessary.
match self.open_substreams.remove(&id) {
None => return,
Some(state) => {
// If we fell below the substream limit, notify tasks that had
// interest in opening a substream earlier.
let below_limit = self.open_substreams.len() == self.config.max_substreams - 1;
if below_limit {
ArcWake::wake_by_ref(&self.notifier_open);
}
// Schedule any pending final frames to send, if necessary.
match state {
SubstreamState::SendClosed => {}
SubstreamState::RecvClosed => {
if self.check_max_pending_frames().is_err() {
return
}
log::trace!("Pending close for stream {}", id);
self.pending_frames.push_front(Frame::Close { stream_id: id });
}
SubstreamState::Open => {
if self.check_max_pending_frames().is_err() {
return
}
log::trace!("Pending reset for stream {}", id);
self.pending_frames.push_front(Frame::Reset { stream_id: id });
}
}
}
}
}
/// Writes data to a substream.
pub fn poll_write_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId, buf: &[u8])
-> Poll<io::Result<usize>>
{
self.guard_open()?;
// Check if the stream is open for writing.
match self.open_substreams.get(&id) {
None => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
Some(SubstreamState::SendClosed) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
_ => {}
}
// Determine the size of the frame to send.
let frame_len = cmp::min(buf.len(), self.config.split_send_size);
// Send the data frame.
ready!(self.poll_send_frame(cx, || {
let data = Bytes::copy_from_slice(&buf[.. frame_len]);
Frame::Data { stream_id: id, data }
}))?;
Poll::Ready(Ok(frame_len))
}
/// Reads data from a substream.
pub fn poll_read_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId)
-> Poll<io::Result<Option<Bytes>>>
{
self.guard_open()?;
// Try to read from the buffer first.
if let Some((pos, data)) = self.buffer.iter()
.enumerate()
.find_map(|(pos, frame)| match frame {
Frame::Data { stream_id, data }
if stream_id.into_local() == id => Some((pos, data.clone())),
_ => None
})
{
if self.buffer.len() == self.config.max_buffer_len {
// The buffer is full and no longer will be, so notify all pending readers.
ArcWake::wake_by_ref(&self.notifier_read);
}
self.buffer.remove(pos);
return Poll::Ready(Ok(Some(data)));
}
loop {
// Check if the targeted substream (if any) reached EOF.
if !self.can_read(&id) {
return Poll::Ready(Ok(None))
}
match ready!(self.poll_read_frame(cx, Some(id)))? {
Frame::Data { data, stream_id } if stream_id.into_local() == id => {
return Poll::Ready(Ok(Some(data.clone())))
},
frame @ Frame::Open { .. } | frame @ Frame::Data { .. } => {
let id = frame.local_id();
trace!("Buffering {:?} (total: {})", frame, self.buffer.len() + 1);
self.buffer.push(frame);
self.notifier_read.wake_by_id(id);
}
Frame::Close { stream_id } => {
let stream_id = stream_id.into_local();
self.on_close(stream_id)?;
if id == stream_id {
return Poll::Ready(Ok(None))
}
}
Frame::Reset { stream_id } => {
let stream_id = stream_id.into_local();
self.on_reset(stream_id);
if id == stream_id {
return Poll::Ready(Ok(None))
}
}
}
}
}
/// Flushes a substream.
///
/// > **Note**: This is equivalent to `poll_flush()`, i.e. to flushing
/// > all substreams, except that this operation returns an error if
/// > the underlying I/O stream is already closed.
pub fn poll_flush_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId)
-> Poll<io::Result<()>>
{
self.guard_open()?;
ready!(self.poll_flush(cx))?;
trace!("Flushed substream {}", id);
Poll::Ready(Ok(()))
}
/// Closes a stream for writing.
///
/// > **Note**: As opposed to `poll_close()`, a flush it not implied.
pub fn poll_close_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId)
-> Poll<io::Result<()>>
{
self.guard_open()?;
match self.open_substreams.get(&id) {
None | Some(SubstreamState::SendClosed) => Poll::Ready(Ok(())),
Some(&state) => {
ready!(self.poll_send_frame(cx, || Frame::Close { stream_id: id }))?;
if state == SubstreamState::Open {
debug!("Closed substream {} (half-close)", id);
self.open_substreams.insert(id, SubstreamState::SendClosed);
} else if state == SubstreamState::RecvClosed {
debug!("Closed substream {}", id);
self.open_substreams.remove(&id);
let below_limit = self.open_substreams.len() == self.config.max_substreams - 1;
if below_limit {
ArcWake::wake_by_ref(&self.notifier_open);
}
}
Poll::Ready(Ok(()))
}
}
}
/// Sends a (lazily constructed) mplex frame on the underlying I/O stream.
///
/// The frame is only constructed if the underlying sink is ready to
/// send another frame.
fn poll_send_frame<F>(&mut self, cx: &mut Context<'_>, frame: F)
-> Poll<io::Result<()>>
where
F: FnOnce() -> Frame<LocalStreamId>
{
let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
Ok(()) => {
let frame = frame();
trace!("Sending {:?}", frame);
match self.io.start_send_unpin(frame) {
Ok(()) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(self.on_error(e))
}
},
Err(e) => Poll::Ready(self.on_error(e))
}
}
/// Reads the next frame from the underlying I/O stream.
///
/// The given `stream_id` identifies the substream in which
/// the current task is interested and wants to be woken up for,
/// in case new frames can be read. `None` means interest in
/// frames for any substream.
fn poll_read_frame(&mut self, cx: &mut Context<'_>, stream_id: Option<LocalStreamId>)
-> Poll<io::Result<Frame<RemoteStreamId>>>
{
// Try to send pending frames, if there are any, without blocking,
if let Poll::Ready(Err(e)) = self.send_pending_frames(cx) {
return Poll::Ready(Err(e))
}
// Perform any pending flush before reading.
if self.pending_flush_open {
trace!("Executing pending flush.");
ready!(self.poll_flush(cx))?;
debug_assert!(!self.pending_flush_open);
}
// Check if the inbound frame buffer is full.
debug_assert!(self.buffer.len() <= self.config.max_buffer_len);
if self.buffer.len() == self.config.max_buffer_len {
debug!("Frame buffer full ({} frames).", self.buffer.len());
match self.config.max_buffer_behaviour {
MaxBufferBehaviour::CloseAll => {
return Poll::Ready(self.on_error(io::Error::new(io::ErrorKind::Other,
format!("Frame buffer full ({} frames).", self.buffer.len()))))
},
MaxBufferBehaviour::Block => {
// If there are any pending tasks for frames in the buffer,
// use this opportunity to try to wake one of them.
let mut woken = false;
for frame in self.buffer.iter() {
woken = self.notifier_read.wake_by_id(frame.local_id());
if woken {
// The current task is still interested in another frame,
// so we register it for a wakeup some time after the
// already `woken` task.
let _ = NotifierRead::register(&self.notifier_read, cx.waker(), stream_id);
break
}
}
if !woken {
// No task was woken, thus the current task _must_ poll
// again to guarantee (an attempt at) making progress.
cx.waker().clone().wake();
}
return Poll::Pending
},
}
}
// Try to read another frame from the underlying I/O stream.
let waker = NotifierRead::register(&self.notifier_read, cx.waker(), stream_id);
match ready!(self.io.poll_next_unpin(&mut Context::from_waker(&waker))) {
Some(Ok(frame)) => {
trace!("Received {:?}", frame);
Poll::Ready(Ok(frame))
}
Some(Err(e)) => Poll::Ready(self.on_error(e)),
None => Poll::Ready(self.on_error(io::ErrorKind::UnexpectedEof.into()))
}
}
/// Processes an inbound `Open` frame.
fn on_open(&mut self, id: LocalStreamId) -> io::Result<Option<LocalStreamId>> {
if self.open_substreams.contains_key(&id) {
debug!("Received unexpected `Open` frame for open substream {}", id);
return self.on_error(io::Error::new(io::ErrorKind::Other,
"Protocol error: Received `Open` frame for open substream."))
}
if self.open_substreams.len() >= self.config.max_substreams {
debug!("Maximum number of substreams exceeded: {}", self.config.max_substreams);
self.check_max_pending_frames()?;
debug!("Pending reset for new stream {}", id);
self.pending_frames.push_front(Frame::Reset {
stream_id: id
});
return Ok(None)
}
self.open_substreams.insert(id, SubstreamState::Open);
Ok(Some(id))
}
/// Processes an inbound `Reset` frame.
fn on_reset(&mut self, id: LocalStreamId) {
if let Some(state) = self.open_substreams.remove(&id) {
debug!("Substream {} in state {:?} reset by remote.", id, state);
let below_limit = self.open_substreams.len() == self.config.max_substreams - 1;
if below_limit {
ArcWake::wake_by_ref(&self.notifier_open);
}
// Notify tasks interested in reading, so they may read the EOF.
NotifierRead::wake_by_id(&self.notifier_read, id);
} else {
trace!("Ignoring `Reset` for unknown stream {}. Possibly dropped earlier.", id);
}
}
/// Processes an inbound `Close` frame.
fn on_close(&mut self, id: LocalStreamId) -> io::Result<()> {
if let Entry::Occupied(mut e) = self.open_substreams.entry(id) {
match e.get() {
SubstreamState::RecvClosed => {
debug!("Received unexpected `Close` frame for closed substream {}", id);
return self.on_error(
io::Error::new(io::ErrorKind::Other,
"Protocol error: Received `Close` frame for closed substream."))
},
SubstreamState::SendClosed => {
debug!("Substream {} closed by remote (SendClosed -> Closed).", id);
e.remove();
// Notify tasks interested in opening new streams, if we fell
// below the limit.
let below_limit = self.open_substreams.len() == self.config.max_substreams - 1;
if below_limit {
ArcWake::wake_by_ref(&self.notifier_open);
}
// Notify tasks interested in reading, so they may read the EOF.
NotifierRead::wake_by_id(&self.notifier_read, id);
},
SubstreamState::Open => {
debug!("Substream {} closed by remote (Open -> RecvClosed)", id);
e.insert(SubstreamState::RecvClosed);
// Notify tasks interested in reading, so they may read the EOF.
NotifierRead::wake_by_id(&self.notifier_read, id);
},
}
} else {
trace!("Ignoring `Close` for unknown stream {}. Possibly dropped earlier.", id);
}
Ok(())
}
/// Generates the next outbound stream ID.
fn next_outbound_stream_id(&mut self) -> LocalStreamId {
let id = self.next_outbound_stream_id;
self.next_outbound_stream_id = self.next_outbound_stream_id.next();
id
}
/// Checks whether a substream is open for reading.
fn can_read(&self, id: &LocalStreamId) -> bool {
match self.open_substreams.get(id) {
Some(SubstreamState::Open) | Some(SubstreamState::SendClosed) => true,
_ => false,
}
}
/// Sends pending frames, without flushing.
fn send_pending_frames(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
while let Some(frame) = self.pending_frames.pop_back() {
if self.poll_send_frame(cx, || {
frame.clone()
})?.is_pending() {
self.pending_frames.push_back(frame);
return Poll::Pending
}
}
Poll::Ready(Ok(()))
}
/// Records a fatal error for the multiplexed I/O stream.
fn on_error<T>(&mut self, e: io::Error) -> io::Result<T> {
log::debug!("Multiplexed connection failed: {:?}", e);
self.status = Status::Err(io::Error::new(e.kind(), e.to_string()));
self.pending_frames = Default::default();
self.open_substreams = Default::default();
self.buffer = Default::default();
Err(e)
}
/// Checks that the multiplexed stream has status `Ok`,
/// i.e. is not closed and did not encounter a fatal error.
fn guard_open(&self) -> io::Result<()> {
match &self.status {
Status::Closed => Err(io::Error::new(io::ErrorKind::Other, "Connection is closed")),
Status::Err(e) => Err(io::Error::new(e.kind(), e.to_string())),
Status::Open => Ok(())
}
}
/// Checks that the permissible limit for pending outgoing frames
/// has not been reached.
fn check_max_pending_frames(&mut self) -> io::Result<()> {
if self.pending_frames.len() >= self.config.max_substreams + EXTRA_PENDING_FRAMES {
return self.on_error(io::Error::new(io::ErrorKind::Other,
"Too many pending frames."));
}
Ok(())
}
}
/// The operating states of a substream.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum SubstreamState {
/// An `Open` frame has been received or sent.
Open,
/// A `Close` frame has been sent, but the stream is still open
/// for reading (half-close).
SendClosed,
/// A `Close` frame has been received but the stream is still
/// open for writing (remote half-close).
RecvClosed
}
struct NotifierRead {
/// List of wakers to wake when read operations can proceed
/// on a substream (or in general, for the key `None`).
pending: Mutex<FnvHashMap<Option<LocalStreamId>, Waker>>,
}
impl NotifierRead {
/// Registers interest of a task in reading from a particular
/// stream, or any stream if `stream` is `None`.
///
/// The returned waker should be passed to an I/O read operation
/// that schedules a wakeup, if necessary.
#[must_use]
fn register<'a>(self: &'a Arc<Self>, waker: &Waker, stream: Option<LocalStreamId>)
-> WakerRef<'a>
{
let mut pending = self.pending.lock();
pending.insert(stream, waker.clone());
waker_ref(self)
}
/// Wakes the last task that has previously registered interest
/// in reading data from a particular stream (or any stream).
///
/// Returns `true` if a task has been woken.
fn wake_by_id(&self, id: LocalStreamId) -> bool {
let mut woken = false;
let mut pending = self.pending.lock();
if let Some(waker) = pending.remove(&None) {
waker.wake();
woken = true;
}
if let Some(waker) = pending.remove(&Some(id)) {
waker.wake();
woken = true;
}
woken
}
}
impl ArcWake for NotifierRead {
fn wake_by_ref(this: &Arc<Self>) {
let wakers = mem::replace(&mut *this.pending.lock(), Default::default());
for (_, waker) in wakers {
waker.wake();
}
}
}
struct NotifierWrite {
/// List of wakers to wake when write operations on the
/// underlying I/O stream can proceed.
pending: Mutex<Vec<Waker>>,
}
impl NotifierWrite {
/// Registers interest of a task in writing to some substream.
///
/// The returned waker should be passed to an I/O write operation
/// that schedules a wakeup, if necessary.
#[must_use]
fn register<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
let mut pending = self.pending.lock();
if pending.iter().all(|w| !w.will_wake(waker)) {
pending.push(waker.clone());
}
waker_ref(self)
}
}
impl ArcWake for NotifierWrite {
fn wake_by_ref(this: &Arc<Self>) {
let wakers = mem::replace(&mut *this.pending.lock(), Default::default());
for waker in wakers {
waker.wake();
}
}
}
struct NotifierOpen {
/// List of wakers to wake when a new substream can be opened.
pending: Mutex<Vec<Waker>>,
}
impl NotifierOpen {
/// Registers interest of a task in opening a new substream.
fn register<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
let mut pending = self.pending.lock();
if pending.iter().all(|w| !w.will_wake(waker)) {
pending.push(waker.clone());
}
waker_ref(self)
}
}
impl ArcWake for NotifierOpen {
fn wake_by_ref(this: &Arc<Self>) {
let wakers = mem::replace(&mut *this.pending.lock(), Default::default());
for waker in wakers {
waker.wake();
}
}
}
/// The maximum number of pending reset or close frames to send
/// we are willing to buffer beyond the configured substream limit.
/// This extra leeway bounds resource usage while allowing some
/// back-pressure when sending out these frames.
///
/// If too many pending frames accumulate, the multiplexed stream is
/// considered unhealthy and terminates with an error.
const EXTRA_PENDING_FRAMES: usize = 1000;

View File

@ -19,125 +19,21 @@
// DEALINGS IN THE SOFTWARE.
mod codec;
mod config;
mod io;
use std::{cmp, iter, mem, pin::Pin, task::Context, task::Poll};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use std::task::Waker;
pub use config::{MplexConfig, MaxBufferBehaviour};
use codec::LocalStreamId;
use std::{cmp, iter, task::Context, task::Poll};
use bytes::Bytes;
use libp2p_core::{
Endpoint,
StreamMuxer,
muxing::StreamMuxerEvent,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
};
use log::{debug, trace};
use parking_lot::Mutex;
use fnv::FnvHashSet;
use futures::{prelude::*, future, ready, stream::Fuse};
use futures::task::{ArcWake, waker_ref};
use futures_codec::Framed;
/// Configuration for the multiplexer.
#[derive(Debug, Clone)]
pub struct MplexConfig {
/// Maximum number of simultaneously-open substreams.
max_substreams: usize,
/// Maximum number of elements in the internal buffer.
max_buffer_len: usize,
/// Behaviour when the buffer size limit is reached.
max_buffer_behaviour: MaxBufferBehaviour,
/// When sending data, split it into frames whose maximum size is this value
/// (max 1MByte, as per the Mplex spec).
split_send_size: usize,
}
impl MplexConfig {
/// Builds the default configuration.
pub fn new() -> MplexConfig {
Default::default()
}
/// Sets the maximum number of simultaneously opened substreams, after which an error is
/// generated and the connection closes.
///
/// A limit is necessary in order to avoid DoS attacks.
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
}
/// Sets the maximum number of pending incoming messages.
///
/// A limit is necessary in order to avoid DoS attacks.
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
}
/// Sets the behaviour when the maximum buffer length has been reached.
///
/// See the documentation of `MaxBufferBehaviour`.
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
}
/// Sets the frame size used when sending data. Capped at 1Mbyte as per the
/// Mplex spec.
pub fn split_send_size(&mut self, size: usize) -> &mut Self {
let size = cmp::min(size, codec::MAX_FRAME_SIZE);
self.split_send_size = size;
self
}
fn upgrade<C>(self, i: C) -> Multiplex<C>
where
C: AsyncRead + AsyncWrite + Unpin
{
let max_buffer_len = self.max_buffer_len;
Multiplex {
inner: Mutex::new(MultiplexInner {
error: Ok(()),
inner: Framed::new(i, codec::Codec::new()).fuse(),
config: self,
buffer: Vec::with_capacity(cmp::min(max_buffer_len, 512)),
opened_substreams: Default::default(),
next_outbound_stream_id: 0,
notifier_read: Arc::new(Notifier {
to_wake: Mutex::new(Default::default()),
}),
notifier_write: Arc::new(Notifier {
to_wake: Mutex::new(Default::default()),
}),
is_shutdown: false,
})
}
}
}
impl Default for MplexConfig {
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
max_buffer_len: 4096,
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
split_send_size: 1024,
}
}
}
/// Behaviour when the maximum length of the buffer is reached.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MaxBufferBehaviour {
/// Produce an error on all the substreams.
CloseAll,
/// No new message will be read from the underlying connection if the buffer is full.
///
/// This can potentially introduce a deadlock if you are waiting for a message from a substream
/// before processing the messages received on another substream.
Block,
}
use futures::{prelude::*, future, ready};
impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
@ -153,11 +49,13 @@ where
C: AsyncRead + AsyncWrite + Unpin,
{
type Output = Multiplex<C>;
type Error = IoError;
type Future = future::Ready<Result<Self::Output, IoError>>;
type Error = io::Error;
type Future = future::Ready<Result<Self::Output, io::Error>>;
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(self.upgrade(socket)))
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self)),
}))
}
}
@ -166,11 +64,13 @@ where
C: AsyncRead + AsyncWrite + Unpin,
{
type Output = Multiplex<C>;
type Error = IoError;
type Future = future::Ready<Result<Self::Output, IoError>>;
type Error = io::Error;
type Future = future::Ready<Result<Self::Output, io::Error>>;
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(self.upgrade(socket)))
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self))
}))
}
}
@ -179,478 +79,103 @@ where
/// This implementation isn't capable of detecting when the underlying socket changes its address,
/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
pub struct Multiplex<C> {
inner: Mutex<MultiplexInner<C>>,
}
// Struct shared throughout the implementation.
struct MultiplexInner<C> {
// Error that happened earlier. Should poison any attempt to use this `MultiplexError`.
error: Result<(), IoError>,
// Underlying stream.
inner: Fuse<Framed<C, codec::Codec>>,
/// The original configuration.
config: MplexConfig,
// Buffer of elements pulled from the stream but not processed yet.
buffer: Vec<codec::Elem>,
// List of Ids of opened substreams. Used to filter out messages that don't belong to any
// substream. Note that this is handled exclusively by `next_match`.
// The `Endpoint` value denotes who initiated the substream from our point of view
// (see note [StreamId]).
opened_substreams: FnvHashSet<(u32, Endpoint)>,
// Id of the next outgoing substream.
next_outbound_stream_id: u32,
/// List of wakers to wake when a read event happens on the underlying stream.
notifier_read: Arc<Notifier>,
/// List of wakers to wake when a write event happens on the underlying stream.
notifier_write: Arc<Notifier>,
/// If true, the connection has been shut down. We need to be careful not to accidentally
/// call `Sink::poll_complete` or `Sink::start_send` after `Sink::close`.
is_shutdown: bool,
}
struct Notifier {
/// List of wakers to wake.
to_wake: Mutex<Vec<Waker>>,
}
impl Notifier {
fn insert(&self, waker: &Waker) {
let mut to_wake = self.to_wake.lock();
if to_wake.iter().all(|w| !w.will_wake(waker)) {
to_wake.push(waker.clone());
}
}
}
impl ArcWake for Notifier {
fn wake_by_ref(arc_self: &Arc<Self>) {
let wakers = mem::replace(&mut *arc_self.to_wake.lock(), Default::default());
for waker in wakers {
waker.wake();
}
}
}
// Note [StreamId]: mplex no longer partitions stream IDs into odd (for initiators) and
// even ones (for receivers). Streams are instead identified by a number and whether the flag
// is odd (for receivers) or even (for initiators). `Open` frames do not have a flag, but are
// sent unidirectional. As a consequence, we need to remember if the stream was initiated by us
// or remotely and we store the information from our point of view, i.e. receiving an `Open` frame
// is stored as `(<u32>, Listener)`, sending an `Open` frame as `(<u32>, Dialer)`. Receiving
// a `Data` frame with flag `MessageReceiver` (= 1) means that we initiated the stream, so the
// entry has been stored as `(<u32>, Dialer)`. So, when looking up streams based on frames
// received, we have to invert the `Endpoint`, except for `Open`.
/// Processes elements in `inner` until one matching `filter` is found.
///
/// If `Pending` is returned, the waker is kept and notified later, just like with any `Poll`.
/// `Ready(Ok())` is almost always returned. An error is returned if the stream is EOF.
fn next_match<C, F, O>(inner: &mut MultiplexInner<C>, cx: &mut Context<'_>, mut filter: F) -> Poll<Result<O, IoError>>
where C: AsyncRead + AsyncWrite + Unpin,
F: FnMut(&codec::Elem) -> Option<O>,
{
// If an error happened earlier, immediately return it.
if let Err(ref err) = inner.error {
return Poll::Ready(Err(IoError::new(err.kind(), err.to_string())));
}
if let Some((offset, out)) = inner.buffer.iter().enumerate().filter_map(|(n, v)| filter(v).map(|v| (n, v))).next() {
// Found a matching entry in the existing buffer!
// The buffer was full and no longer is, so let's notify everything.
if inner.buffer.len() == inner.config.max_buffer_len {
ArcWake::wake_by_ref(&inner.notifier_read);
}
inner.buffer.remove(offset);
return Poll::Ready(Ok(out));
}
loop {
// Check if we reached max buffer length first.
debug_assert!(inner.buffer.len() <= inner.config.max_buffer_len);
if inner.buffer.len() == inner.config.max_buffer_len {
debug!("Reached mplex maximum buffer length");
match inner.config.max_buffer_behaviour {
MaxBufferBehaviour::CloseAll => {
inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length"));
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")));
},
MaxBufferBehaviour::Block => {
inner.notifier_read.insert(cx.waker());
return Poll::Pending
},
}
}
inner.notifier_read.insert(cx.waker());
let elem = match Stream::poll_next(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_read))) {
Poll::Ready(Some(Ok(item))) => item,
Poll::Ready(None) => return Poll::Ready(Err(IoErrorKind::BrokenPipe.into())),
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Err(err))) => {
let err2 = IoError::new(err.kind(), err.to_string());
inner.error = Err(err);
return Poll::Ready(Err(err2));
},
};
trace!("Received message: {:?}", elem);
// Handle substreams opening/closing.
match elem {
codec::Elem::Open { substream_id } => {
if !inner.opened_substreams.insert((substream_id, Endpoint::Listener)) {
debug!("Received open message for substream {} which was already open", substream_id)
}
}
codec::Elem::Close { substream_id, endpoint, .. } | codec::Elem::Reset { substream_id, endpoint, .. } => {
inner.opened_substreams.remove(&(substream_id, !endpoint));
}
_ => ()
}
if let Some(out) = filter(&elem) {
return Poll::Ready(Ok(out));
} else {
let endpoint = elem.endpoint().unwrap_or(Endpoint::Dialer);
if inner.opened_substreams.contains(&(elem.substream_id(), !endpoint)) || elem.is_open_msg() {
inner.buffer.push(elem);
} else if !elem.is_close_or_reset_msg() {
debug!("Ignored message {:?} because the substream wasn't open", elem);
}
}
}
}
// Small convenience function that tries to write `elem` to the stream.
fn poll_send<C>(inner: &mut MultiplexInner<C>, cx: &mut Context<'_>, elem: codec::Elem) -> Poll<Result<(), IoError>>
where C: AsyncRead + AsyncWrite + Unpin
{
ensure_no_error_no_close(inner)?;
inner.notifier_write.insert(cx.waker());
match Sink::poll_ready(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) {
Poll::Ready(Ok(())) => {
match Sink::start_send(Pin::new(&mut inner.inner), elem) {
Ok(()) => Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(err))
}
},
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
Poll::Ready(Err(err))
}
}
}
fn ensure_no_error_no_close<C>(inner: &mut MultiplexInner<C>) -> Result<(), IoError>
where
C: AsyncRead + AsyncWrite + Unpin
{
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
if let Err(ref e) = inner.error {
return Err(IoError::new(e.kind(), e.to_string()))
}
Ok(())
io: Mutex<io::Multiplexed<C>>
}
impl<C> StreamMuxer for Multiplex<C>
where C: AsyncRead + AsyncWrite + Unpin
where
C: AsyncRead + AsyncWrite + Unpin
{
type Substream = Substream;
type OutboundSubstream = OutboundSubstream;
type Error = IoError;
type Error = io::Error;
fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, IoError>> {
let mut inner = self.inner.lock();
if inner.opened_substreams.len() >= inner.config.max_substreams {
debug!("Refused substream; reached maximum number of substreams {}", inner.config.max_substreams);
return Poll::Ready(Err(IoError::new(IoErrorKind::ConnectionRefused,
"exceeded maximum number of open substreams")));
}
let num = ready!(next_match(&mut inner, cx, |elem| {
match elem {
codec::Elem::Open { substream_id } => Some(*substream_id),
_ => None,
}
}));
let num = match num {
Ok(n) => n,
Err(err) => return Poll::Ready(Err(err)),
};
debug!("Successfully opened inbound substream {}", num);
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream {
current_data: Bytes::new(),
num,
endpoint: Endpoint::Listener,
local_open: true,
remote_open: true,
})))
fn poll_event(&self, cx: &mut Context<'_>)
-> Poll<io::Result<StreamMuxerEvent<Self::Substream>>>
{
let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
let stream = Substream::new(stream_id);
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
}
fn open_outbound(&self) -> Self::OutboundSubstream {
let mut inner = self.inner.lock();
// Assign a substream ID now.
let substream_id = {
let n = inner.next_outbound_stream_id;
inner.next_outbound_stream_id = inner.next_outbound_stream_id.checked_add(1)
.expect("Mplex substream ID overflowed");
n
};
inner.opened_substreams.insert((substream_id, Endpoint::Dialer));
OutboundSubstream {
num: substream_id,
state: OutboundSubstreamState::SendElem(codec::Elem::Open { substream_id }),
}
OutboundSubstream {}
}
fn poll_outbound(&self, cx: &mut Context<'_>, substream: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, IoError>> {
loop {
let mut inner = self.inner.lock();
let polling = match substream.state {
OutboundSubstreamState::SendElem(ref elem) => {
poll_send(&mut inner, cx, elem.clone())
},
OutboundSubstreamState::Flush => {
ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
},
OutboundSubstreamState::Done => {
panic!("Polling outbound substream after it's been succesfully open");
},
};
match polling {
Poll::Ready(Ok(())) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => {
debug!("Failed to open outbound substream {}", substream.num);
inner.buffer.retain(|elem| {
elem.substream_id() != substream.num || elem.endpoint() == Some(Endpoint::Dialer)
});
inner.error = Err(IoError::new(err.kind(), err.to_string()));
return Poll::Ready(Err(err));
},
};
drop(inner);
// Going to next step.
match substream.state {
OutboundSubstreamState::SendElem(_) => {
substream.state = OutboundSubstreamState::Flush;
},
OutboundSubstreamState::Flush => {
debug!("Successfully opened outbound substream {}", substream.num);
substream.state = OutboundSubstreamState::Done;
return Poll::Ready(Ok(Substream {
num: substream.num,
current_data: Bytes::new(),
endpoint: Endpoint::Dialer,
local_open: true,
remote_open: true,
}));
},
OutboundSubstreamState::Done => unreachable!(),
}
}
fn poll_outbound(&self, cx: &mut Context<'_>, _: &mut Self::OutboundSubstream)
-> Poll<Result<Self::Substream, io::Error>>
{
let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
return Poll::Ready(Ok(Substream::new(stream_id)))
}
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
// Nothing to do.
// Nothing to do, since `open_outbound` creates no new local state.
}
fn read_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
fn read_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &mut [u8])
-> Poll<Result<usize, io::Error>>
{
loop {
// First, transfer from `current_data`.
// Try to read from the current (i.e. last received) frame.
if !substream.current_data.is_empty() {
let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
return Poll::Ready(Ok(len));
}
// If the remote writing side is closed, return EOF.
if !substream.remote_open {
return Poll::Ready(Ok(0));
}
// Try to find a packet of data in the buffer.
let mut inner = self.inner.lock();
let next_data_poll = next_match(&mut inner, cx, |elem| {
match elem {
codec::Elem::Data { substream_id, endpoint, data, .. }
if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId]
{
Some(Some(data.clone()))
}
codec::Elem::Close { substream_id, endpoint }
if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId]
{
Some(None)
}
_ => None
}
});
// We're in a loop, so all we need to do is set `substream.current_data` to the data we
// just read and wait for the next iteration.
match next_data_poll {
Poll::Ready(Ok(Some(data))) => substream.current_data = data,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(None)) => {
substream.remote_open = false;
return Poll::Ready(Ok(0));
},
Poll::Pending => {
// There was no data packet in the buffer about this substream; maybe it's
// because it has been closed.
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
return Poll::Pending
} else {
return Poll::Ready(Ok(0))
}
},
// Read the next data frame from the multiplexed stream.
match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
Some(data) => { substream.current_data = data; }
None => { return Poll::Ready(Ok(0)) }
}
}
}
fn write_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, IoError>> {
if !substream.local_open {
return Poll::Ready(Err(IoErrorKind::BrokenPipe.into()));
}
let mut inner = self.inner.lock();
let to_write = cmp::min(buf.len(), inner.config.split_send_size);
let elem = codec::Elem::Data {
substream_id: substream.num,
data: Bytes::copy_from_slice(&buf[..to_write]),
endpoint: substream.endpoint,
};
match poll_send(&mut inner, cx, elem) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(to_write)),
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
fn write_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &[u8])
-> Poll<Result<usize, io::Error>>
{
self.io.lock().poll_write_stream(cx, substream.id, buf)
}
fn flush_substream(&self, cx: &mut Context<'_>, _substream: &mut Self::Substream) -> Poll<Result<(), IoError>> {
let mut inner = self.inner.lock();
ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
if let Poll::Ready(Err(err)) = &result {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
}
result
fn flush_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
-> Poll<Result<(), io::Error>>
{
self.io.lock().poll_flush_stream(cx, substream.id)
}
fn shutdown_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream) -> Poll<Result<(), IoError>> {
if !sub.local_open {
return Poll::Ready(Ok(()));
}
let elem = codec::Elem::Close {
substream_id: sub.num,
endpoint: sub.endpoint,
};
let mut inner = self.inner.lock();
let result = poll_send(&mut inner, cx, elem);
if let Poll::Ready(Ok(())) = result {
sub.local_open = false;
}
result
fn shutdown_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
-> Poll<Result<(), io::Error>>
{
self.io.lock().poll_close_stream(cx, substream.id)
}
fn destroy_substream(&self, sub: Self::Substream) {
self.inner.lock().buffer.retain(|elem| {
elem.substream_id() != sub.num || elem.endpoint() == Some(sub.endpoint)
})
self.io.lock().drop_stream(sub.id);
}
fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
match Sink::poll_close(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) {
Poll::Ready(Ok(())) => {
inner.is_shutdown = true;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(err)) => {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
Poll::Ready(Err(err))
}
Poll::Pending => Poll::Pending,
}
fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_close(cx)
}
fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
if let Poll::Ready(Err(err)) = &result {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
}
result
fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_flush(cx)
}
}
/// Active attempt to open an outbound substream.
pub struct OutboundSubstream {
/// Substream number.
num: u32,
state: OutboundSubstreamState,
}
enum OutboundSubstreamState {
/// We need to send `Elem` on the underlying stream.
SendElem(codec::Elem),
/// We need to flush the underlying stream.
Flush,
/// The substream is open and the `OutboundSubstream` is now useless.
Done,
}
pub struct OutboundSubstream {}
/// Active substream to the remote.
pub struct Substream {
/// Substream number.
num: u32,
// Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`.
/// The unique, local identifier of the substream.
id: LocalStreamId,
/// The current data frame the substream is reading from.
current_data: Bytes,
endpoint: Endpoint,
/// If true, our writing side is still open.
local_open: bool,
/// If true, the remote writing side is still open.
remote_open: bool,
}
impl Substream {
fn new(id: LocalStreamId) -> Self {
Self { id, current_data: Bytes::new() }
}
}