mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-07-31 00:41:59 +00:00
[mplex] Split the receive buffer per substream. (#1784)
* Split the receive buffer per substream. This split allows more efficient reading from the buffer for a particular substream and to reset only the offending substream if it reaches its buffer limit with `MaxBufferBehaviour::ResetStream`. Previously this was implemented as `MaxBufferBehaviour::CloseAll` and resulted in the entire connection closing. The buffer split should be advantageous whenever not all substreams are read at the same pace and some temporarily fall behind in consuming inbound data frames. * Tweak logging. * Oops. * Update muxers/mplex/src/io.rs Co-authored-by: Max Inden <mail@max-inden.de> * Rename field as per review suggestion. * Adjust and clarify max-buffer-behaviour. * Set max_buffer_len to 32. Since the limit is now per substream and the default `max_substreams` is `128`, this new limit retains the previous overall resource bounds for the buffers. * Expand tests and small cleanup. Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@@ -17,8 +17,13 @@ futures_codec = "0.4"
|
||||
libp2p-core = { version = "0.22.0", path = "../../core" }
|
||||
log = "0.4"
|
||||
parking_lot = "0.11"
|
||||
smallvec = "1.4"
|
||||
unsigned-varint = { version = "0.5", features = ["futures-codec"] }
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = "1.6.2"
|
||||
env_logger = "0.6"
|
||||
futures = "0.3"
|
||||
libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] }
|
||||
quickcheck = "0.9"
|
||||
rand = "0.7"
|
||||
|
@@ -77,12 +77,25 @@ impl LocalStreamId {
|
||||
Self { num, role: Endpoint::Dialer }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn listener(num: u32) -> Self {
|
||||
Self { num, role: Endpoint::Listener }
|
||||
}
|
||||
|
||||
pub fn next(self) -> Self {
|
||||
Self {
|
||||
num: self.num.checked_add(1).expect("Mplex substream ID overflowed"),
|
||||
.. self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn into_remote(self) -> RemoteStreamId {
|
||||
RemoteStreamId {
|
||||
num: self.num,
|
||||
role: !self.role,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteStreamId {
|
||||
@@ -105,7 +118,7 @@ impl RemoteStreamId {
|
||||
}
|
||||
|
||||
/// An Mplex protocol frame.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Frame<T> {
|
||||
Open { stream_id: T },
|
||||
Data { stream_id: T, data: Bytes },
|
||||
@@ -114,7 +127,7 @@ pub enum Frame<T> {
|
||||
}
|
||||
|
||||
impl Frame<RemoteStreamId> {
|
||||
fn remote_id(&self) -> RemoteStreamId {
|
||||
pub fn remote_id(&self) -> RemoteStreamId {
|
||||
match *self {
|
||||
Frame::Open { stream_id } => stream_id,
|
||||
Frame::Data { stream_id, .. } => stream_id,
|
||||
@@ -122,12 +135,6 @@ impl Frame<RemoteStreamId> {
|
||||
Frame::Reset { stream_id, .. } => stream_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the `LocalStreamId` corresponding to the `RemoteStreamId`
|
||||
/// received with this frame.
|
||||
pub fn local_id(&self) -> LocalStreamId {
|
||||
self.remote_id().into_local()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Codec {
|
||||
|
@@ -24,11 +24,11 @@ use std::cmp;
|
||||
/// Configuration for the multiplexer.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MplexConfig {
|
||||
/// Maximum number of simultaneously-open substreams.
|
||||
/// Maximum number of simultaneously used substreams.
|
||||
pub(crate) max_substreams: usize,
|
||||
/// Maximum number of frames in the internal buffer.
|
||||
/// Maximum number of frames buffered per substream.
|
||||
pub(crate) max_buffer_len: usize,
|
||||
/// Behaviour when the buffer size limit is reached.
|
||||
/// Behaviour when the buffer size limit is reached for a substream.
|
||||
pub(crate) max_buffer_behaviour: MaxBufferBehaviour,
|
||||
/// When sending data, split it into frames whose maximum size is this value
|
||||
/// (max 1MByte, as per the Mplex spec).
|
||||
@@ -41,22 +41,26 @@ impl MplexConfig {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
/// Sets the maximum number of simultaneously open substreams.
|
||||
/// Sets the maximum number of simultaneously used substreams.
|
||||
///
|
||||
/// A substream is used as long as it has not been dropped,
|
||||
/// even if it may already be closed or reset at the protocol
|
||||
/// level (in which case it may still have buffered data that
|
||||
/// can be read before the `StreamMuxer` API signals EOF).
|
||||
///
|
||||
/// When the limit is reached, opening of outbound substreams
|
||||
/// is delayed until another substream closes, whereas new
|
||||
/// is delayed until another substream is dropped, whereas new
|
||||
/// inbound substreams are immediately answered with a `Reset`.
|
||||
/// If the number of inbound substreams that need to be reset
|
||||
/// accumulates too quickly (judged by internal bounds), the
|
||||
/// connection is closed, the connection is closed with an error
|
||||
/// due to the misbehaved remote.
|
||||
/// connection is closed with an error due to the misbehaved
|
||||
/// remote.
|
||||
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
|
||||
self.max_substreams = max;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum number of frames buffered that have
|
||||
/// not yet been consumed.
|
||||
/// Sets the maximum number of frames buffered per substream.
|
||||
///
|
||||
/// A limit is necessary in order to avoid DoS attacks.
|
||||
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
|
||||
@@ -64,9 +68,10 @@ impl MplexConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the behaviour when the maximum buffer length has been reached.
|
||||
/// Sets the behaviour when the maximum buffer size is reached
|
||||
/// for a substream.
|
||||
///
|
||||
/// See the documentation of `MaxBufferBehaviour`.
|
||||
/// See the documentation of [`MaxBufferBehaviour`].
|
||||
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
|
||||
self.max_buffer_behaviour = behaviour;
|
||||
self
|
||||
@@ -84,12 +89,15 @@ impl MplexConfig {
|
||||
/// Behaviour when the maximum length of the buffer is reached.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum MaxBufferBehaviour {
|
||||
/// Produce an error on all the substreams.
|
||||
CloseAll,
|
||||
/// No new message will be read from the underlying connection if the buffer is full.
|
||||
/// Reset the substream whose frame buffer overflowed.
|
||||
ResetStream,
|
||||
/// No new message can be read from any substream as long as the buffer
|
||||
/// for a single substream is full.
|
||||
///
|
||||
/// This can potentially introduce a deadlock if you are waiting for a message from a substream
|
||||
/// before processing the messages received on another substream.
|
||||
/// This can potentially introduce a deadlock if you are waiting for a
|
||||
/// message from a substream before processing the messages received
|
||||
/// on another substream, i.e. if there are data dependencies across
|
||||
/// substreams.
|
||||
Block,
|
||||
}
|
||||
|
||||
@@ -97,8 +105,8 @@ impl Default for MplexConfig {
|
||||
fn default() -> MplexConfig {
|
||||
MplexConfig {
|
||||
max_substreams: 128,
|
||||
max_buffer_len: 4096,
|
||||
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
|
||||
max_buffer_len: 32,
|
||||
max_buffer_behaviour: MaxBufferBehaviour::ResetStream,
|
||||
split_send_size: 1024,
|
||||
}
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user