mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-11 09:01:22 +00:00
[mplex] Tweak default config and yield before exceeding buffer limits. (#1825)
* [mplex] Tweak default config and yield before exceeding buffer limits. * Update CHANGELOG
This commit is contained in:
@ -1,5 +1,13 @@
|
||||
# 0.24.0 [unreleased]
|
||||
|
||||
- Change the default configuration to use `MaxBufferBehaviour::Block`
|
||||
and yield from waiting for the next substream or reading from a
|
||||
particular substream whenever the current read loop may have
|
||||
already filled a substream buffer, to give the current task a
|
||||
chance to read from the buffer(s) before the `MaxBufferBehaviour`
|
||||
takes effect. This is primarily relevant for
|
||||
`MaxBufferBehaviour::ResetStream`.
|
||||
|
||||
- Tweak the naming in the `MplexConfig` API for better
|
||||
consistency with `libp2p-yamux`.
|
||||
[PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822).
|
||||
|
@ -90,14 +90,26 @@ impl MplexConfig {
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum MaxBufferBehaviour {
|
||||
/// Reset the substream whose frame buffer overflowed.
|
||||
ResetStream,
|
||||
/// No new message can be read from any substream as long as the buffer
|
||||
/// for a single substream is full.
|
||||
///
|
||||
/// This can potentially introduce a deadlock if you are waiting for a
|
||||
/// message from a substream before processing the messages received
|
||||
/// on another substream, i.e. if there are data dependencies across
|
||||
/// substreams.
|
||||
/// > **Note**: If more than [`MplexConfig::set_max_buffer_size()`] frames
|
||||
/// > are received in succession for a substream in the context of
|
||||
/// > trying to read data from a different substream, the former substream
|
||||
/// > may be reset before application code had a chance to read from the
|
||||
/// > buffer. The max. buffer size needs to be sized appropriately when
|
||||
/// > using this option to balance maximum resource usage and the
|
||||
/// > probability of premature termination of a substream.
|
||||
ResetStream,
|
||||
/// No new message can be read from the underlying connection from any
|
||||
/// substream as long as the buffer for a single substream is full,
|
||||
/// i.e. application code is expected to read from the full buffer.
|
||||
///
|
||||
/// > **Note**: To avoid blocking without making progress, application
|
||||
/// > tasks should ensure that, when woken, always try to read (i.e.
|
||||
/// > make progress) from every substream on which data is expected.
|
||||
/// > This is imperative in general, as a woken task never knows for
|
||||
/// > which substream it has been woken, but failure to do so with
|
||||
/// > [`MaxBufferBehaviour::Block`] in particular may lead to stalled
|
||||
/// > execution or spinning of a task without progress.
|
||||
Block,
|
||||
}
|
||||
|
||||
@ -106,9 +118,8 @@ impl Default for MplexConfig {
|
||||
MplexConfig {
|
||||
max_substreams: 128,
|
||||
max_buffer_len: 32,
|
||||
max_buffer_behaviour: MaxBufferBehaviour::ResetStream,
|
||||
max_buffer_behaviour: MaxBufferBehaviour::Block,
|
||||
split_send_size: 1024,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -214,8 +214,18 @@ where
|
||||
}
|
||||
|
||||
debug_assert!(self.open_buffer.is_empty());
|
||||
let mut num_buffered = 0;
|
||||
|
||||
loop {
|
||||
// Whenever we may have completely filled a substream
|
||||
// buffer while waiting for the next inbound stream,
|
||||
// yield to give the current task a chance to read
|
||||
// from the respective substreams.
|
||||
if num_buffered == self.config.max_buffer_len {
|
||||
cx.waker().clone().wake();
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
// Wait for the next inbound `Open` frame.
|
||||
match ready!(self.poll_read_frame(cx, None))? {
|
||||
Frame::Open { stream_id } => {
|
||||
@ -225,6 +235,7 @@ where
|
||||
}
|
||||
Frame::Data { stream_id, data } => {
|
||||
self.buffer(stream_id.into_local(), data)?;
|
||||
num_buffered += 1;
|
||||
}
|
||||
Frame::Close { stream_id } => {
|
||||
self.on_close(stream_id.into_local())?;
|
||||
@ -406,7 +417,18 @@ where
|
||||
buf.shrink_to_fit();
|
||||
}
|
||||
|
||||
let mut num_buffered = 0;
|
||||
|
||||
loop {
|
||||
// Whenever we may have completely filled a substream
|
||||
// buffer of another substream while waiting for the
|
||||
// next frame for `id`, yield to give the current task
|
||||
// a chance to read from the other substream(s).
|
||||
if num_buffered == self.config.max_buffer_len {
|
||||
cx.waker().clone().wake();
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
// Check if the targeted substream (if any) reached EOF.
|
||||
if !self.can_read(&id) {
|
||||
// Note: Contrary to what is recommended by the spec, we must
|
||||
@ -427,6 +449,7 @@ where
|
||||
// currently being polled, so it needs to be buffered and
|
||||
// the interested tasks notified.
|
||||
self.buffer(stream_id.into_local(), data)?;
|
||||
num_buffered += 1;
|
||||
}
|
||||
frame @ Frame::Open { .. } => {
|
||||
if let Some(id) = self.on_open(frame.remote_id())? {
|
||||
@ -1106,15 +1129,30 @@ mod tests {
|
||||
let id = LocalStreamId::listener(0);
|
||||
match m.poll_next_stream(cx) {
|
||||
Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
|
||||
Poll::Pending => {}
|
||||
Poll::Pending => {
|
||||
// We expect the implementation to yield when the buffer
|
||||
// is full but before it is exceeded and the max buffer
|
||||
// behaviour takes effect, giving the current task a
|
||||
// chance to read from the buffer. Here we just read
|
||||
// again to provoke the max buffer behaviour.
|
||||
assert_eq!(
|
||||
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
|
||||
cfg.max_buffer_len
|
||||
);
|
||||
match m.poll_next_stream(cx) {
|
||||
Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
|
||||
Poll::Pending => {
|
||||
// Expect the buffer for stream 0 to be exceeded, triggering
|
||||
// the max. buffer behaviour.
|
||||
assert_eq!(
|
||||
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
|
||||
cfg.max_buffer_len + 1
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Expect the buffer for stream 0 to be just 1 over the limit.
|
||||
assert_eq!(
|
||||
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
|
||||
cfg.max_buffer_len + 1
|
||||
);
|
||||
|
||||
// Expect either a `Reset` to be sent or all reads to be
|
||||
// blocked `Pending`, depending on the `MaxBufferBehaviour`.
|
||||
match cfg.max_buffer_behaviour {
|
||||
|
Reference in New Issue
Block a user