From b766d45b1c8659a41b9f7f6f1ae8894b6811084e Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Tue, 13 Oct 2020 17:58:18 +0200 Subject: [PATCH] [mplex] Split the receive buffer per substream. (#1784) * Split the receive buffer per substream. This split allows more efficient reading from the buffer for a particular substream and to reset only the offending substream if it reaches its buffer limit with `MaxBufferBehaviour::ResetStream`. Previously this was implemented as `MaxBufferBehaviour::CloseAll` and resulted in the entire connection closing. The buffer split should be advantageous whenever not all substreams are read at the same pace and some temporarily fall behind in consuming inbound data frames. * Tweak logging. * Oops. * Update muxers/mplex/src/io.rs Co-authored-by: Max Inden * Rename field as per review suggestion. * Adjust and clarify max-buffer-behaviour. * Set max_buffer_len to 32. Since the limit is now per substream and the default `max_substreams` is `128`, this new limit retains the previous overall resource bounds for the buffers. * Expand tests and small cleanup. Co-authored-by: Max Inden --- muxers/mplex/Cargo.toml | 5 + muxers/mplex/src/codec.rs | 23 +- muxers/mplex/src/config.rs | 44 ++- muxers/mplex/src/io.rs | 747 +++++++++++++++++++++++++++---------- 4 files changed, 591 insertions(+), 228 deletions(-) diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 39262631..79a144dd 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -17,8 +17,13 @@ futures_codec = "0.4" libp2p-core = { version = "0.22.0", path = "../../core" } log = "0.4" parking_lot = "0.11" +smallvec = "1.4" unsigned-varint = { version = "0.5", features = ["futures-codec"] } [dev-dependencies] async-std = "1.6.2" +env_logger = "0.6" +futures = "0.3" libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +quickcheck = "0.9" +rand = "0.7" diff --git a/muxers/mplex/src/codec.rs b/muxers/mplex/src/codec.rs index 22316c3b..268860a5 100644 --- a/muxers/mplex/src/codec.rs +++ b/muxers/mplex/src/codec.rs @@ -77,12 +77,25 @@ impl LocalStreamId { Self { num, role: Endpoint::Dialer } } + #[cfg(test)] + pub fn listener(num: u32) -> Self { + Self { num, role: Endpoint::Listener } + } + pub fn next(self) -> Self { Self { num: self.num.checked_add(1).expect("Mplex substream ID overflowed"), .. self } } + + #[cfg(test)] + pub fn into_remote(self) -> RemoteStreamId { + RemoteStreamId { + num: self.num, + role: !self.role, + } + } } impl RemoteStreamId { @@ -105,7 +118,7 @@ impl RemoteStreamId { } /// An Mplex protocol frame. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum Frame { Open { stream_id: T }, Data { stream_id: T, data: Bytes }, @@ -114,7 +127,7 @@ pub enum Frame { } impl Frame { - fn remote_id(&self) -> RemoteStreamId { + pub fn remote_id(&self) -> RemoteStreamId { match *self { Frame::Open { stream_id } => stream_id, Frame::Data { stream_id, .. } => stream_id, @@ -122,12 +135,6 @@ impl Frame { Frame::Reset { stream_id, .. } => stream_id, } } - - /// Gets the `LocalStreamId` corresponding to the `RemoteStreamId` - /// received with this frame. - pub fn local_id(&self) -> LocalStreamId { - self.remote_id().into_local() - } } pub struct Codec { diff --git a/muxers/mplex/src/config.rs b/muxers/mplex/src/config.rs index 04d7d0b3..e5f9d807 100644 --- a/muxers/mplex/src/config.rs +++ b/muxers/mplex/src/config.rs @@ -24,11 +24,11 @@ use std::cmp; /// Configuration for the multiplexer. #[derive(Debug, Clone)] pub struct MplexConfig { - /// Maximum number of simultaneously-open substreams. + /// Maximum number of simultaneously used substreams. pub(crate) max_substreams: usize, - /// Maximum number of frames in the internal buffer. + /// Maximum number of frames buffered per substream. pub(crate) max_buffer_len: usize, - /// Behaviour when the buffer size limit is reached. + /// Behaviour when the buffer size limit is reached for a substream. pub(crate) max_buffer_behaviour: MaxBufferBehaviour, /// When sending data, split it into frames whose maximum size is this value /// (max 1MByte, as per the Mplex spec). @@ -41,22 +41,26 @@ impl MplexConfig { Default::default() } - /// Sets the maximum number of simultaneously open substreams. + /// Sets the maximum number of simultaneously used substreams. + /// + /// A substream is used as long as it has not been dropped, + /// even if it may already be closed or reset at the protocol + /// level (in which case it may still have buffered data that + /// can be read before the `StreamMuxer` API signals EOF). /// /// When the limit is reached, opening of outbound substreams - /// is delayed until another substream closes, whereas new + /// is delayed until another substream is dropped, whereas new /// inbound substreams are immediately answered with a `Reset`. /// If the number of inbound substreams that need to be reset /// accumulates too quickly (judged by internal bounds), the - /// connection is closed, the connection is closed with an error - /// due to the misbehaved remote. + /// connection is closed with an error due to the misbehaved + /// remote. pub fn max_substreams(&mut self, max: usize) -> &mut Self { self.max_substreams = max; self } - /// Sets the maximum number of frames buffered that have - /// not yet been consumed. + /// Sets the maximum number of frames buffered per substream. /// /// A limit is necessary in order to avoid DoS attacks. pub fn max_buffer_len(&mut self, max: usize) -> &mut Self { @@ -64,9 +68,10 @@ impl MplexConfig { self } - /// Sets the behaviour when the maximum buffer length has been reached. + /// Sets the behaviour when the maximum buffer size is reached + /// for a substream. /// - /// See the documentation of `MaxBufferBehaviour`. + /// See the documentation of [`MaxBufferBehaviour`]. pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self { self.max_buffer_behaviour = behaviour; self @@ -84,12 +89,15 @@ impl MplexConfig { /// Behaviour when the maximum length of the buffer is reached. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum MaxBufferBehaviour { - /// Produce an error on all the substreams. - CloseAll, - /// No new message will be read from the underlying connection if the buffer is full. + /// Reset the substream whose frame buffer overflowed. + ResetStream, + /// No new message can be read from any substream as long as the buffer + /// for a single substream is full. /// - /// This can potentially introduce a deadlock if you are waiting for a message from a substream - /// before processing the messages received on another substream. + /// This can potentially introduce a deadlock if you are waiting for a + /// message from a substream before processing the messages received + /// on another substream, i.e. if there are data dependencies across + /// substreams. Block, } @@ -97,8 +105,8 @@ impl Default for MplexConfig { fn default() -> MplexConfig { MplexConfig { max_substreams: 128, - max_buffer_len: 4096, - max_buffer_behaviour: MaxBufferBehaviour::CloseAll, + max_buffer_len: 32, + max_buffer_behaviour: MaxBufferBehaviour::ResetStream, split_send_size: 1024, } } diff --git a/muxers/mplex/src/io.rs b/muxers/mplex/src/io.rs index eb324da2..223641a5 100644 --- a/muxers/mplex/src/io.rs +++ b/muxers/mplex/src/io.rs @@ -24,10 +24,11 @@ use crate::codec::{Codec, Frame, LocalStreamId, RemoteStreamId}; use log::{debug, trace}; use fnv::FnvHashMap; use futures::{prelude::*, ready, stream::Fuse}; -use futures::task::{ArcWake, waker_ref, WakerRef}; +use futures::task::{AtomicWaker, ArcWake, waker_ref, WakerRef}; use futures_codec::Framed; use parking_lot::Mutex; -use std::collections::{VecDeque, hash_map::Entry}; +use smallvec::SmallVec; +use std::collections::VecDeque; use std::{cmp, io, mem, sync::Arc, task::{Context, Poll, Waker}}; pub use std::io::{Result, Error, ErrorKind}; @@ -40,11 +41,17 @@ pub struct Multiplexed { io: Fuse>, /// The configuration. config: MplexConfig, - /// Buffer of received frames that have not yet been consumed. - buffer: Vec>, + /// The buffer of new inbound substreams that have not yet + /// been drained by `poll_next_stream`. This buffer is + /// effectively bounded by `max_substreams - substreams.len()`. + open_buffer: VecDeque, /// Whether a flush is pending due to one or more new outbound /// `Open` frames, before reading frames can proceed. pending_flush_open: bool, + /// The stream that currently blocks reading for all streams + /// due to a full buffer, if any. Only applicable for use + /// with [`MaxBufferBehaviour::Block`]. + blocking_stream: Option, /// Pending frames to send at the next opportunity. /// /// An opportunity for sending pending frames is every flush @@ -53,8 +60,8 @@ pub struct Multiplexed { /// In the latter case, the read operation can proceed even /// if some or all of the pending frames cannot be sent. pending_frames: VecDeque>, - /// The substreams that are considered at least half-open. - open_substreams: FnvHashMap, + /// The managed substreams. + substreams: FnvHashMap, /// The ID for the next outbound substream. next_outbound_stream_id: LocalStreamId, /// Registry of wakers for pending tasks interested in reading. @@ -63,7 +70,10 @@ pub struct Multiplexed { notifier_write: Arc, /// Registry of wakers for pending tasks interested in opening /// an outbound substream, when the configured limit is reached. - notifier_open: Arc, + /// + /// As soon as the number of substreams drops below this limit, + /// these tasks are woken. + notifier_open: NotifierOpen, } /// The operation status of a `Multiplexed` I/O stream. @@ -83,25 +93,26 @@ where { /// Creates a new multiplexed I/O stream. pub fn new(io: C, config: MplexConfig) -> Self { - let max_buffer_len = config.max_buffer_len; Multiplexed { config, status: Status::Open, io: Framed::new(io, Codec::new()).fuse(), - buffer: Vec::with_capacity(cmp::min(max_buffer_len, 512)), - open_substreams: Default::default(), + open_buffer: Default::default(), + substreams: Default::default(), pending_flush_open: false, pending_frames: Default::default(), + blocking_stream: None, next_outbound_stream_id: LocalStreamId::dialer(0), notifier_read: Arc::new(NotifierRead { - pending: Mutex::new(Default::default()), + read_stream: Mutex::new(Default::default()), + next_stream: AtomicWaker::new(), }), notifier_write: Arc::new(NotifierWrite { pending: Mutex::new(Default::default()), }), - notifier_open: Arc::new(NotifierOpen { - pending: Mutex::new(Default::default()) - }) + notifier_open: NotifierOpen { + pending: Default::default() + } } } @@ -150,8 +161,8 @@ where self.pending_frames = VecDeque::new(); // We do not support read-after-close on the underlying // I/O stream, hence clearing the buffer and substreams. - self.buffer = Default::default(); - self.open_substreams = Default::default(); + self.open_buffer = Default::default(); + self.substreams = Default::default(); self.status = Status::Closed; Poll::Ready(Ok(())) } @@ -159,46 +170,38 @@ where } /// Waits for a new inbound substream, returning the corresponding `LocalStreamId`. + /// + /// If the number of already used substreams (i.e. substreams that have not + /// yet been dropped via `drop_substream`) reaches the configured + /// `max_substreams`, any further inbound substreams are immediately reset + /// until existing substreams are dropped. + /// + /// Data frames read for existing substreams in the context of this + /// method call are buffered and tasks interested in reading from + /// these substreams are woken. If a substream buffer is full and + /// [`MaxBufferBehaviour::Block`] is used, this method is blocked + /// (i.e. `Pending`) on some task reading from the substream whose + /// buffer is full. pub fn poll_next_stream(&mut self, cx: &mut Context<'_>) -> Poll> { self.guard_open()?; // Try to read from the buffer first. - while let Some((pos, stream_id)) = self.buffer.iter() - .enumerate() - .find_map(|(pos, frame)| match frame { - Frame::Open { stream_id } => Some((pos, stream_id.into_local())), - _ => None - }) - { - if self.buffer.len() == self.config.max_buffer_len { - // The buffer is full and no longer will be, so notify all pending readers. - ArcWake::wake_by_ref(&self.notifier_read); - } - self.buffer.remove(pos); - if let Some(id) = self.on_open(stream_id)? { - log::debug!("New inbound stream: {}", id); - return Poll::Ready(Ok(id)); - } + if let Some(stream_id) = self.open_buffer.pop_back() { + return Poll::Ready(Ok(stream_id)); } + debug_assert!(self.open_buffer.is_empty()); + loop { // Wait for the next inbound `Open` frame. match ready!(self.poll_read_frame(cx, None))? { Frame::Open { stream_id } => { - if let Some(id) = self.on_open(stream_id.into_local())? { - log::debug!("New inbound stream: {}", id); + if let Some(id) = self.on_open(stream_id)? { return Poll::Ready(Ok(id)) } } - frame @ Frame::Data { .. } => { - let id = frame.local_id(); - if self.can_read(&id) { - trace!("Buffering {:?} (total: {})", frame, self.buffer.len() + 1); - self.buffer.push(frame); - self.notifier_read.wake_by_id(id); - } else { - trace!("Dropping {:?} for closed or unknown substream {}", frame, id); - } + Frame::Data { stream_id, data } => { + self.buffer(stream_id.into_local(), data)?; } Frame::Close { stream_id } => { self.on_close(stream_id.into_local())?; @@ -215,9 +218,9 @@ where self.guard_open()?; // Check the stream limits. - if self.open_substreams.len() >= self.config.max_substreams { + if self.substreams.len() >= self.config.max_substreams { debug!("Maximum number of substreams reached: {}", self.config.max_substreams); - let _ = NotifierOpen::register(&self.notifier_open, cx.waker()); + self.notifier_open.register(cx.waker()); return Poll::Pending } @@ -229,7 +232,11 @@ where let frame = Frame::Open { stream_id }; match self.io.start_send_unpin(frame) { Ok(()) => { - self.open_substreams.insert(stream_id, SubstreamState::Open); + self.substreams.insert(stream_id, SubstreamState::Open { + buf: Default::default() + }); + log::debug!("New outbound substream: {} (total {})", + stream_id, self.substreams.len()); // The flush is delayed and the `Open` frame may be sent // together with other frames in the same transport packet. self.pending_flush_open = true; @@ -261,9 +268,9 @@ where /// an error earlier, or there is no known substream with /// the given ID, this is a no-op. /// - /// > **Note**: If a substream is not read until EOF, - /// > `drop_substream` _must_ eventually be called to avoid - /// > leaving unread frames in the receive buffer. + /// > **Note**: All substreams obtained via `poll_next_stream` + /// > or `poll_open_stream` must eventually be "dropped" by + /// > calling this method when they are no longer used. pub fn drop_stream(&mut self, id: LocalStreamId) { // Check if the underlying stream is ok. match self.status { @@ -271,38 +278,35 @@ where Status::Open => {}, } - // Remove any frames still buffered for that stream. The stream - // may already be fully closed (i.e. not in `open_substreams`) - // but still have unread buffered frames. - self.buffer.retain(|frame| frame.local_id() != id); - // If there is still a task waker interested in reading from that // stream, wake it to avoid leaving it dangling and notice that // the stream is gone. In contrast, wakers for write operations // are all woken on every new write opportunity. - self.notifier_read.wake_by_id(id); + self.notifier_read.wake_read_stream(id); // Remove the substream, scheduling pending frames as necessary. - match self.open_substreams.remove(&id) { + match self.substreams.remove(&id) { None => return, Some(state) => { // If we fell below the substream limit, notify tasks that had - // interest in opening a substream earlier. - let below_limit = self.open_substreams.len() == self.config.max_substreams - 1; + // interest in opening an outbound substream earlier. + let below_limit = self.substreams.len() == self.config.max_substreams - 1; if below_limit { - ArcWake::wake_by_ref(&self.notifier_open); + self.notifier_open.wake_all(); } // Schedule any pending final frames to send, if necessary. match state { - SubstreamState::SendClosed => {} - SubstreamState::RecvClosed => { + SubstreamState::Closed { .. } => {} + SubstreamState::SendClosed { .. } => {} + SubstreamState::Reset { .. } => {} + SubstreamState::RecvClosed { .. } => { if self.check_max_pending_frames().is_err() { return } log::trace!("Pending close for stream {}", id); self.pending_frames.push_front(Frame::Close { stream_id: id }); } - SubstreamState::Open => { + SubstreamState::Open { .. } => { if self.check_max_pending_frames().is_err() { return } @@ -321,10 +325,14 @@ where self.guard_open()?; // Check if the stream is open for writing. - match self.open_substreams.get(&id) { - None => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())), - Some(SubstreamState::SendClosed) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())), - _ => {} + match self.substreams.get(&id) { + None | Some(SubstreamState::Reset { .. }) => + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())), + Some(SubstreamState::SendClosed { .. }) | Some(SubstreamState::Closed { .. }) => + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())), + Some(SubstreamState::Open { .. }) | Some(SubstreamState::RecvClosed { .. }) => { + // Substream is writeable. Continue. + } } // Determine the size of the frame to send. @@ -340,43 +348,68 @@ where } /// Reads data from a substream. + /// + /// Data frames read for substreams other than `id` in the context + /// of this method call are buffered and tasks interested in reading + /// from these substreams are woken. If a substream buffer is full + /// and [`MaxBufferBehaviour::Block`] is used, reading the next data + /// frame for `id` is blocked on some task reading from the blocking + /// stream's full buffer first. + /// + /// New inbound substreams (i.e. `Open` frames) read in the context of + /// this method call are buffered up to the configured `max_substreams` + /// and under consideration of the number of already used substreams, + /// thereby waking the task that last called `poll_next_stream`, if any. + /// Inbound substreams received in excess of that limit are immediately reset. pub fn poll_read_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId) -> Poll>> { self.guard_open()?; // Try to read from the buffer first. - if let Some((pos, data)) = self.buffer.iter() - .enumerate() - .find_map(|(pos, frame)| match frame { - Frame::Data { stream_id, data } - if stream_id.into_local() == id => Some((pos, data.clone())), - _ => None - }) - { - if self.buffer.len() == self.config.max_buffer_len { - // The buffer is full and no longer will be, so notify all pending readers. - ArcWake::wake_by_ref(&self.notifier_read); + if let Some(state) = self.substreams.get_mut(&id) { + let buf = state.recv_buf(); + if !buf.is_empty() { + if self.blocking_stream == Some(id) { + // Unblock reading new frames. + self.blocking_stream = None; + ArcWake::wake_by_ref(&self.notifier_read); + } + let data = buf.remove(0); + return Poll::Ready(Ok(Some(data))) } - self.buffer.remove(pos); - return Poll::Ready(Ok(Some(data))); + // If the stream buffer "spilled" onto the heap, free that memory. + buf.shrink_to_fit(); } loop { // Check if the targeted substream (if any) reached EOF. if !self.can_read(&id) { + // Note: Contrary to what is recommended by the spec, we must + // return "EOF" also when the stream has been reset by the + // remote, as the `StreamMuxer::read_substream` contract only + // permits errors on "terminal" conditions, e.g. if the connection + // has been closed or on protocol misbehaviour. return Poll::Ready(Ok(None)) } + // Read the next frame. match ready!(self.poll_read_frame(cx, Some(id)))? { Frame::Data { data, stream_id } if stream_id.into_local() == id => { return Poll::Ready(Ok(Some(data.clone()))) }, - frame @ Frame::Open { .. } | frame @ Frame::Data { .. } => { - let id = frame.local_id(); - trace!("Buffering {:?} (total: {})", frame, self.buffer.len() + 1); - self.buffer.push(frame); - self.notifier_read.wake_by_id(id); + Frame::Data { stream_id, data } => { + // The data frame is for a different stream than the one + // currently being polled, so it needs to be buffered and + // the interested tasks notified. + self.buffer(stream_id.into_local(), data)?; + } + frame @ Frame::Open { .. } => { + if let Some(id) = self.on_open(frame.remote_id())? { + self.open_buffer.push_front(id); + trace!("Buffered new inbound stream {} (total: {})", id, self.open_buffer.len()); + self.notifier_read.wake_next_stream(); + } } Frame::Close { stream_id } => { let stream_id = stream_id.into_local(); @@ -420,23 +453,40 @@ where { self.guard_open()?; - match self.open_substreams.get(&id) { - None | Some(SubstreamState::SendClosed) => Poll::Ready(Ok(())), - Some(&state) => { - ready!(self.poll_send_frame(cx, || Frame::Close { stream_id: id }))?; - if state == SubstreamState::Open { - debug!("Closed substream {} (half-close)", id); - self.open_substreams.insert(id, SubstreamState::SendClosed); - } else if state == SubstreamState::RecvClosed { - debug!("Closed substream {}", id); - self.open_substreams.remove(&id); - let below_limit = self.open_substreams.len() == self.config.max_substreams - 1; - if below_limit { - ArcWake::wake_by_ref(&self.notifier_open); - } - } + match self.substreams.remove(&id) { + None => Poll::Ready(Ok(())), + Some(SubstreamState::SendClosed { buf }) => { + self.substreams.insert(id, SubstreamState::SendClosed { buf }); Poll::Ready(Ok(())) } + Some(SubstreamState::Closed { buf }) => { + self.substreams.insert(id, SubstreamState::Closed { buf }); + Poll::Ready(Ok(())) + } + Some(SubstreamState::Reset { buf }) => { + self.substreams.insert(id, SubstreamState::Reset { buf }); + Poll::Ready(Ok(())) + } + Some(SubstreamState::Open { buf }) => { + if self.poll_send_frame(cx, || Frame::Close { stream_id: id })?.is_pending() { + self.substreams.insert(id, SubstreamState::Open { buf }); + Poll::Pending + } else { + debug!("Closed substream {} (half-close)", id); + self.substreams.insert(id, SubstreamState::SendClosed { buf }); + Poll::Ready(Ok(())) + } + } + Some(SubstreamState::RecvClosed { buf }) => { + if self.poll_send_frame(cx, || Frame::Close { stream_id: id })?.is_pending() { + self.substreams.insert(id, SubstreamState::RecvClosed { buf }); + Poll::Pending + } else { + debug!("Closed substream {}", id); + self.substreams.insert(id, SubstreamState::Closed { buf }); + Poll::Ready(Ok(())) + } + } } } @@ -484,41 +534,40 @@ where debug_assert!(!self.pending_flush_open); } - // Check if the inbound frame buffer is full. - debug_assert!(self.buffer.len() <= self.config.max_buffer_len); - if self.buffer.len() == self.config.max_buffer_len { - debug!("Frame buffer full ({} frames).", self.buffer.len()); - match self.config.max_buffer_behaviour { - MaxBufferBehaviour::CloseAll => { - return Poll::Ready(self.on_error(io::Error::new(io::ErrorKind::Other, - format!("Frame buffer full ({} frames).", self.buffer.len())))) - }, - MaxBufferBehaviour::Block => { - // If there are any pending tasks for frames in the buffer, - // use this opportunity to try to wake one of them. - let mut woken = false; - for frame in self.buffer.iter() { - woken = self.notifier_read.wake_by_id(frame.local_id()); - if woken { - // The current task is still interested in another frame, - // so we register it for a wakeup some time after the - // already `woken` task. - let _ = NotifierRead::register(&self.notifier_read, cx.waker(), stream_id); - break - } - } - if !woken { - // No task was woken, thus the current task _must_ poll - // again to guarantee (an attempt at) making progress. - cx.waker().clone().wake(); - } - return Poll::Pending - }, + // Check if there is a blocked stream. + if let Some(blocked_id) = &self.blocking_stream { + // We have a blocked stream and cannot continue reading + // new frames until frames are taken from the blocked stream's + // buffer. + + // Try to wake a pending reader of the blocked stream. + if !self.notifier_read.wake_read_stream(*blocked_id) { + // No task dedicated to the blocked stream woken, so schedule + // this task again to have a chance at progress. + trace!("No task to read from blocked stream. Waking current task."); + cx.waker().clone().wake(); + } else { + if let Some(id) = stream_id { + // We woke some other task, but are still interested in + // reading `Data` frames from the current stream when unblocked. + debug_assert!(blocked_id != &id, "Unexpected attempt at reading a new \ + frame from a substream with a full buffer."); + let _ = NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id); + } else { + // We woke some other task but are still interested in + // reading new `Open` frames when unblocked. + let _ = NotifierRead::register_next_stream(&self.notifier_read, cx.waker()); + } } + + return Poll::Pending } // Try to read another frame from the underlying I/O stream. - let waker = NotifierRead::register(&self.notifier_read, cx.waker(), stream_id); + let waker = match stream_id { + Some(id) => NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id), + None => NotifierRead::register_next_stream(&self.notifier_read, cx.waker()) + }; match ready!(self.io.poll_next_unpin(&mut Context::from_waker(&waker))) { Some(Ok(frame)) => { trace!("Received {:?}", frame); @@ -530,14 +579,16 @@ where } /// Processes an inbound `Open` frame. - fn on_open(&mut self, id: LocalStreamId) -> io::Result> { - if self.open_substreams.contains_key(&id) { + fn on_open(&mut self, id: RemoteStreamId) -> io::Result> { + let id = id.into_local(); + + if self.substreams.contains_key(&id) { debug!("Received unexpected `Open` frame for open substream {}", id); return self.on_error(io::Error::new(io::ErrorKind::Other, "Protocol error: Received `Open` frame for open substream.")) } - if self.open_substreams.len() >= self.config.max_substreams { + if self.substreams.len() >= self.config.max_substreams { debug!("Maximum number of substreams exceeded: {}", self.config.max_substreams); self.check_max_pending_frames()?; debug!("Pending reset for new stream {}", id); @@ -547,57 +598,69 @@ where return Ok(None) } - self.open_substreams.insert(id, SubstreamState::Open); + self.substreams.insert(id, SubstreamState::Open { + buf: Default::default() + }); + + log::debug!("New inbound substream: {} (total {})", id, self.substreams.len()); Ok(Some(id)) } /// Processes an inbound `Reset` frame. fn on_reset(&mut self, id: LocalStreamId) { - if let Some(state) = self.open_substreams.remove(&id) { - debug!("Substream {} in state {:?} reset by remote.", id, state); - let below_limit = self.open_substreams.len() == self.config.max_substreams - 1; - if below_limit { - ArcWake::wake_by_ref(&self.notifier_open); + if let Some(state) = self.substreams.remove(&id) { + match state { + SubstreamState::Closed { .. } => { + trace!("Ignoring reset for mutually closed substream {}.", id); + } + SubstreamState::Reset { .. } => { + trace!("Ignoring redundant reset for already reset substream {}", id); + } + SubstreamState::RecvClosed { buf } | + SubstreamState::SendClosed { buf } | + SubstreamState::Open { buf } => { + debug!("Substream {} reset by remote.", id); + self.substreams.insert(id, SubstreamState::Reset { buf }); + // Notify tasks interested in reading from that stream, + // so they may read the EOF. + NotifierRead::wake_read_stream(&self.notifier_read, id); + } } - // Notify tasks interested in reading, so they may read the EOF. - NotifierRead::wake_by_id(&self.notifier_read, id); } else { - trace!("Ignoring `Reset` for unknown stream {}. Possibly dropped earlier.", id); + trace!("Ignoring `Reset` for unknown substream {}. Possibly dropped earlier.", id); } } /// Processes an inbound `Close` frame. fn on_close(&mut self, id: LocalStreamId) -> io::Result<()> { - if let Entry::Occupied(mut e) = self.open_substreams.entry(id) { - match e.get() { - SubstreamState::RecvClosed => { + if let Some(state) = self.substreams.remove(&id) { + match state { + SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => { debug!("Received unexpected `Close` frame for closed substream {}", id); return self.on_error( io::Error::new(io::ErrorKind::Other, "Protocol error: Received `Close` frame for closed substream.")) }, - SubstreamState::SendClosed => { + SubstreamState::Reset { buf } => { + debug!("Ignoring `Close` frame for already reset substream {}", id); + self.substreams.insert(id, SubstreamState::Reset { buf }); + } + SubstreamState::SendClosed { buf } => { debug!("Substream {} closed by remote (SendClosed -> Closed).", id); - e.remove(); - // Notify tasks interested in opening new streams, if we fell - // below the limit. - let below_limit = self.open_substreams.len() == self.config.max_substreams - 1; - if below_limit { - ArcWake::wake_by_ref(&self.notifier_open); - } + self.substreams.insert(id, SubstreamState::Closed { buf }); // Notify tasks interested in reading, so they may read the EOF. - NotifierRead::wake_by_id(&self.notifier_read, id); + self.notifier_read.wake_read_stream(id); }, - SubstreamState::Open => { + SubstreamState::Open { buf } => { debug!("Substream {} closed by remote (Open -> RecvClosed)", id); - e.insert(SubstreamState::RecvClosed); + self.substreams.insert(id, SubstreamState::RecvClosed { buf }); // Notify tasks interested in reading, so they may read the EOF. - NotifierRead::wake_by_id(&self.notifier_read, id); + self.notifier_read.wake_read_stream(id); }, } } else { - trace!("Ignoring `Close` for unknown stream {}. Possibly dropped earlier.", id); + trace!("Ignoring `Close` for unknown substream {}. Possibly dropped earlier.", id); } Ok(()) @@ -612,8 +675,8 @@ where /// Checks whether a substream is open for reading. fn can_read(&self, id: &LocalStreamId) -> bool { - match self.open_substreams.get(id) { - Some(SubstreamState::Open) | Some(SubstreamState::SendClosed) => true, + match self.substreams.get(id) { + Some(SubstreamState::Open { .. }) | Some(SubstreamState::SendClosed { .. }) => true, _ => false, } } @@ -637,8 +700,8 @@ where log::debug!("Multiplexed connection failed: {:?}", e); self.status = Status::Err(io::Error::new(e.kind(), e.to_string())); self.pending_frames = Default::default(); - self.open_substreams = Default::default(); - self.buffer = Default::default(); + self.substreams = Default::default(); + self.open_buffer = Default::default(); Err(e) } @@ -661,70 +724,164 @@ where } Ok(()) } + + /// Buffers a data frame for a particular substream, if possible. + /// + /// If the new data frame exceeds the `max_buffer_len` for the buffer + /// of the substream, the behaviour depends on the configured + /// [`MaxBufferBehaviour`]. Note that the excess frame is still + /// buffered in that case (but no further frames will be). + /// + /// Fails the entire multiplexed stream if too many pending `Reset` + /// frames accumulate when using [`MaxBufferBehaviour::ResetStream`]. + fn buffer(&mut self, id: LocalStreamId, data: Bytes) -> io::Result<()> { + let state = if let Some(state) = self.substreams.get_mut(&id) { + state + } else { + trace!("Dropping data {:?} for unknown substream {}", data, id); + return Ok(()) + }; + + let buf = if let Some(buf) = state.recv_buf_open() { + buf + } else { + trace!("Dropping data {:?} for closed or reset substream {}", data, id); + return Ok(()) + }; + + debug_assert!(buf.len() <= self.config.max_buffer_len); + trace!("Buffering {:?} for stream {} (total: {})", data, id, buf.len() + 1); + buf.push(data); + self.notifier_read.wake_read_stream(id); + if buf.len() > self.config.max_buffer_len { + debug!("Frame buffer of stream {} is full.", id); + match self.config.max_buffer_behaviour { + MaxBufferBehaviour::ResetStream => { + let buf = buf.clone(); + self.check_max_pending_frames()?; + self.substreams.insert(id, SubstreamState::Reset { buf }); + debug!("Pending reset for stream {}", id); + self.pending_frames.push_front(Frame::Reset { + stream_id: id + }); + } + MaxBufferBehaviour::Block => { + self.blocking_stream = Some(id); + } + } + } + + Ok(()) + } } +type RecvBuf = SmallVec<[Bytes; 10]>; + /// The operating states of a substream. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] enum SubstreamState { /// An `Open` frame has been received or sent. - Open, + Open { buf: RecvBuf }, /// A `Close` frame has been sent, but the stream is still open /// for reading (half-close). - SendClosed, + SendClosed { buf: RecvBuf }, /// A `Close` frame has been received but the stream is still /// open for writing (remote half-close). - RecvClosed + RecvClosed { buf: RecvBuf }, + /// A `Close` frame has been sent and received but the stream + /// has not yet been dropped and may still have buffered + /// frames to read. + Closed { buf: RecvBuf }, + /// The stream has been reset by the local or remote peer but has + /// not yet been dropped and may still have buffered frames to read. + Reset { buf: RecvBuf } +} + +impl SubstreamState { + /// Mutably borrows the substream's receive buffer. + fn recv_buf(&mut self) -> &mut RecvBuf { + match self { + SubstreamState::Open { buf } => buf, + SubstreamState::SendClosed { buf } => buf, + SubstreamState::RecvClosed { buf } => buf, + SubstreamState::Closed { buf } => buf, + SubstreamState::Reset { buf } => buf, + } + } + + /// Mutably borrows the substream's receive buffer if the substream + /// is still open for reading, `None` otherwise. + fn recv_buf_open(&mut self) -> Option<&mut RecvBuf> { + match self { + SubstreamState::Open { buf } => Some(buf), + SubstreamState::SendClosed { buf } => Some(buf), + SubstreamState::RecvClosed { .. } => None, + SubstreamState::Closed { .. } => None, + SubstreamState::Reset { .. } => None, + } + } } struct NotifierRead { - /// List of wakers to wake when read operations can proceed - /// on a substream (or in general, for the key `None`). - pending: Mutex, Waker>>, + /// The waker of the currently pending task that last + /// called `poll_next_stream`, if any. + next_stream: AtomicWaker, + /// The wakers of currently pending tasks that last + /// called `poll_read_stream` for a particular substream. + read_stream: Mutex>, } impl NotifierRead { - /// Registers interest of a task in reading from a particular - /// stream, or any stream if `stream` is `None`. + /// Registers a task to be woken up when new `Data` frames for a particular + /// stream can be read. /// /// The returned waker should be passed to an I/O read operation - /// that schedules a wakeup, if necessary. + /// that schedules a wakeup, if the operation is pending. #[must_use] - fn register<'a>(self: &'a Arc, waker: &Waker, stream: Option) + fn register_read_stream<'a>(self: &'a Arc, waker: &Waker, id: LocalStreamId) -> WakerRef<'a> { - let mut pending = self.pending.lock(); - pending.insert(stream, waker.clone()); + let mut pending = self.read_stream.lock(); + pending.insert(id, waker.clone()); waker_ref(self) } - /// Wakes the last task that has previously registered interest - /// in reading data from a particular stream (or any stream). + /// Registers a task to be woken up when new `Open` frames can be read. /// - /// Returns `true` if a task has been woken. - fn wake_by_id(&self, id: LocalStreamId) -> bool { - let mut woken = false; - let mut pending = self.pending.lock(); + /// The returned waker should be passed to an I/O read operation + /// that schedules a wakeup, if the operation is pending. + #[must_use] + fn register_next_stream<'a>(self: &'a Arc, waker: &Waker) -> WakerRef<'a> { + self.next_stream.register(waker); + waker_ref(self) + } - if let Some(waker) = pending.remove(&None) { + /// Wakes the task pending on `poll_read_stream` for the + /// specified stream, if any. + fn wake_read_stream(&self, id: LocalStreamId) -> bool { + let mut pending = self.read_stream.lock(); + + if let Some(waker) = pending.remove(&id) { waker.wake(); - woken = true; + return true } - if let Some(waker) = pending.remove(&Some(id)) { - waker.wake(); - woken = true; - } + false + } - woken + /// Wakes the task pending on `poll_next_stream`, if any. + fn wake_next_stream(&self) { + self.next_stream.wake(); } } impl ArcWake for NotifierRead { fn wake_by_ref(this: &Arc) { - let wakers = mem::replace(&mut *this.pending.lock(), Default::default()); + let wakers = mem::replace(&mut *this.read_stream.lock(), Default::default()); for (_, waker) in wakers { waker.wake(); } + this.wake_next_stream(); } } @@ -738,7 +895,7 @@ impl NotifierWrite { /// Registers interest of a task in writing to some substream. /// /// The returned waker should be passed to an I/O write operation - /// that schedules a wakeup, if necessary. + /// that schedules a wakeup if the operation is pending. #[must_use] fn register<'a>(self: &'a Arc, waker: &Waker) -> WakerRef<'a> { let mut pending = self.pending.lock(); @@ -759,24 +916,21 @@ impl ArcWake for NotifierWrite { } struct NotifierOpen { - /// List of wakers to wake when a new substream can be opened. - pending: Mutex>, + /// Wakers of pending tasks interested in creating new + /// outbound substreams. + pending: Vec, } impl NotifierOpen { - /// Registers interest of a task in opening a new substream. - fn register<'a>(self: &'a Arc, waker: &Waker) -> WakerRef<'a> { - let mut pending = self.pending.lock(); - if pending.iter().all(|w| !w.will_wake(waker)) { - pending.push(waker.clone()); + /// Registers interest of a task in opening a new outbound substream. + fn register(&mut self, waker: &Waker) { + if self.pending.iter().all(|w| !w.will_wake(waker)) { + self.pending.push(waker.clone()); } - waker_ref(self) } -} -impl ArcWake for NotifierOpen { - fn wake_by_ref(this: &Arc) { - let wakers = mem::replace(&mut *this.pending.lock(), Default::default()); + fn wake_all(&mut self) { + let wakers = mem::replace(&mut self.pending, Default::default()); for waker in wakers { waker.wake(); } @@ -791,3 +945,192 @@ impl ArcWake for NotifierOpen { /// If too many pending frames accumulate, the multiplexed stream is /// considered unhealthy and terminates with an error. const EXTRA_PENDING_FRAMES: usize = 1000; + +#[cfg(test)] +mod tests { + use async_std::task; + use bytes::BytesMut; + use futures::prelude::*; + use futures_codec::{Decoder, Encoder}; + use quickcheck::*; + use rand::prelude::*; + use std::num::NonZeroU8; + use std::ops::DerefMut; + use std::pin::Pin; + use super::*; + + impl Arbitrary for MaxBufferBehaviour { + fn arbitrary(g: &mut G) -> MaxBufferBehaviour { + *[MaxBufferBehaviour::Block, MaxBufferBehaviour::ResetStream].choose(g).unwrap() + } + } + + impl Arbitrary for MplexConfig { + fn arbitrary(g: &mut G) -> MplexConfig { + MplexConfig { + max_substreams: g.gen_range(1, 100), + max_buffer_len: g.gen_range(1, 1000), + max_buffer_behaviour: MaxBufferBehaviour::arbitrary(g), + split_send_size: g.gen_range(1, 10000), + } + } + } + + /// Memory-backed "connection". + struct Connection { + /// The buffer that the `Multiplexed` stream reads from. + r_buf: BytesMut, + /// The buffer that the `Multiplexed` stream writes to. + w_buf: BytesMut, + } + + impl AsyncRead for Connection { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut [u8] + ) -> Poll> { + let n = std::cmp::min(buf.len(), self.r_buf.len()); + let data = self.r_buf.split_to(n); + buf[..n].copy_from_slice(&data[..]); + if n == 0 { + Poll::Pending + } else { + Poll::Ready(Ok(n)) + } + } + } + + impl AsyncWrite for Connection { + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8] + ) -> Poll> { + self.w_buf.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close( + self: Pin<&mut Self>, + _: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } + } + + #[test] + fn max_buffer_behaviour() { + let _ = env_logger::try_init(); + + fn prop(cfg: MplexConfig, overflow: NonZeroU8) { + let mut r_buf = BytesMut::new(); + let mut codec = Codec::new(); + + // Open the maximum number of inbound streams. + for i in 0 .. cfg.max_substreams { + let stream_id = LocalStreamId::dialer(i as u32); + codec.encode(Frame::Open { stream_id }, &mut r_buf).unwrap(); + } + + // Send more data on stream 0 than the buffer permits. + let stream_id = LocalStreamId::dialer(0); + let data = Bytes::from("Hello world"); + for _ in 0 .. cfg.max_buffer_len + overflow.get() as usize { + codec.encode(Frame::Data { stream_id, data: data.clone() }, &mut r_buf).unwrap(); + } + + // Setup the multiplexed connection. + let conn = Connection { r_buf, w_buf: BytesMut::new() }; + let mut m = Multiplexed::new(conn, cfg.clone()); + + task::block_on(future::poll_fn(move |cx| { + // Receive all inbound streams. + for i in 0 .. cfg.max_substreams { + match m.poll_next_stream(cx) { + Poll::Pending => panic!("Expected new inbound stream."), + Poll::Ready(Err(e)) => panic!("{:?}", e), + Poll::Ready(Ok(id)) => { + assert_eq!(id, LocalStreamId::listener(i as u32)); + } + }; + } + + // Polling again for an inbound stream should yield `Pending` + // after reading and buffering data frames up to the limit. + let id = LocalStreamId::listener(0); + match m.poll_next_stream(cx) { + Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r), + Poll::Pending => {} + } + + // Expect the buffer for stream 0 to be just 1 over the limit. + assert_eq!( + m.substreams.get_mut(&id).unwrap().recv_buf().len(), + cfg.max_buffer_len + 1 + ); + + // Expect either a `Reset` to be sent or all reads to be + // blocked `Pending`, depending on the `MaxBufferBehaviour`. + match cfg.max_buffer_behaviour { + MaxBufferBehaviour::ResetStream => { + let _ = m.poll_flush_stream(cx, id); + let w_buf = &mut m.io.get_mut().deref_mut().w_buf; + let frame = codec.decode(w_buf).unwrap(); + let stream_id = stream_id.into_remote(); + assert_eq!(frame, Some(Frame::Reset { stream_id })); + } + MaxBufferBehaviour::Block => { + assert!(m.poll_next_stream(cx).is_pending()); + for i in 1 .. cfg.max_substreams { + let id = LocalStreamId::listener(i as u32); + assert!(m.poll_read_stream(cx, id).is_pending()); + } + } + } + + // Drain the buffer by reading from the stream. + for _ in 0 .. cfg.max_buffer_len + 1 { + match m.poll_read_stream(cx, id) { + Poll::Ready(Ok(Some(bytes))) => { + assert_eq!(bytes, data); + } + x => panic!("Unexpected: {:?}", x) + } + } + + // Read from the stream after the buffer has been drained, + // expecting either EOF or further data, depending on + // the `MaxBufferBehaviour`. + match cfg.max_buffer_behaviour { + MaxBufferBehaviour::ResetStream => { + // Expect to read EOF + match m.poll_read_stream(cx, id) { + Poll::Ready(Ok(None)) => {}, + poll => panic!("Unexpected: {:?}", poll) + } + } + MaxBufferBehaviour::Block => { + // Expect to be able to continue reading. + match m.poll_read_stream(cx, id) { + Poll::Ready(Ok(Some(bytes))) => assert_eq!(bytes, data), + Poll::Pending => assert_eq!(overflow.get(), 1), + poll => panic!("Unexpected: {:?}", poll) + } + } + } + + Poll::Ready(()) + })); + } + + quickcheck(prop as fn(_,_)) + } +}