mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-20 21:36:31 +00:00
refactor(swarm)!: don't share event buffer for established connections (#3188)
Currently, we only have a single channel for all established connections. This requires us to construct the channel ahead of time, before we even have a connection. As it turns out, sharing this buffer across all connections actually has downsides. In particular, this means a single, very busy connection can starve others by filling up this buffer, forcing other connections to wait until they can emit an event.
This commit is contained in:
@ -3,6 +3,7 @@
|
|||||||
- Update to `libp2p-core` `v0.39.0`.
|
- Update to `libp2p-core` `v0.39.0`.
|
||||||
|
|
||||||
- Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170].
|
- Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170].
|
||||||
|
|
||||||
- Deprecate functions on `PollParameters` in preparation for `PollParameters` to be removed entirely eventually. See [PR 3153].
|
- Deprecate functions on `PollParameters` in preparation for `PollParameters` to be removed entirely eventually. See [PR 3153].
|
||||||
|
|
||||||
- Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134].
|
- Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134].
|
||||||
@ -15,11 +16,18 @@
|
|||||||
- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`.
|
- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`.
|
||||||
These two types are always used with `std::io::Error`. See [PR 3272].
|
These two types are always used with `std::io::Error`. See [PR 3272].
|
||||||
|
|
||||||
|
- Replace `SwarmBuilder::connection_event_buffer_size` with `SwarmBuilder::per_connection_event_buffer_size` .
|
||||||
|
The configured value now applies _per_ connection.
|
||||||
|
The default values remains 7.
|
||||||
|
If you have previously set `connection_event_buffer_size` you should re-evaluate what a good size for a _per connection_ buffer is.
|
||||||
|
See [PR 3188].
|
||||||
|
|
||||||
[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
|
[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
|
||||||
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
|
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
|
||||||
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
|
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
|
||||||
[PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264
|
[PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264
|
||||||
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272
|
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272
|
||||||
|
[PR 3188]: https://github.com/libp2p/rust-libp2p/pull/3188
|
||||||
|
|
||||||
# 0.41.1
|
# 0.41.1
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ use crate::{
|
|||||||
use concurrent_dial::ConcurrentDial;
|
use concurrent_dial::ConcurrentDial;
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use futures::stream::SelectAll;
|
||||||
use futures::{
|
use futures::{
|
||||||
channel::{mpsc, oneshot},
|
channel::{mpsc, oneshot},
|
||||||
future::{poll_fn, BoxFuture, Either},
|
future::{poll_fn, BoxFuture, Either},
|
||||||
@ -41,6 +42,7 @@ use futures::{
|
|||||||
use instant::Instant;
|
use instant::Instant;
|
||||||
use libp2p_core::connection::Endpoint;
|
use libp2p_core::connection::Endpoint;
|
||||||
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
|
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
|
||||||
|
use std::task::Waker;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{hash_map, HashMap},
|
collections::{hash_map, HashMap},
|
||||||
convert::TryFrom as _,
|
convert::TryFrom as _,
|
||||||
@ -117,6 +119,9 @@ where
|
|||||||
/// See [`Connection::max_negotiating_inbound_streams`].
|
/// See [`Connection::max_negotiating_inbound_streams`].
|
||||||
max_negotiating_inbound_streams: usize,
|
max_negotiating_inbound_streams: usize,
|
||||||
|
|
||||||
|
/// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured.
|
||||||
|
per_connection_event_buffer_size: usize,
|
||||||
|
|
||||||
/// The executor to use for running connection tasks. Can either be a global executor
|
/// The executor to use for running connection tasks. Can either be a global executor
|
||||||
/// or a local queue.
|
/// or a local queue.
|
||||||
executor: ExecSwitch,
|
executor: ExecSwitch,
|
||||||
@ -128,14 +133,12 @@ where
|
|||||||
/// Receiver for events reported from pending tasks.
|
/// Receiver for events reported from pending tasks.
|
||||||
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
|
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
|
||||||
|
|
||||||
/// Sender distributed to established tasks for reporting events back
|
/// Waker in case we haven't established any connections yet.
|
||||||
/// to the pool.
|
no_established_connections_waker: Option<Waker>,
|
||||||
established_connection_events_tx:
|
|
||||||
mpsc::Sender<task::EstablishedConnectionEvent<THandler::Handler>>,
|
|
||||||
|
|
||||||
/// Receiver for events reported from established tasks.
|
/// Receivers for events reported from established connections.
|
||||||
established_connection_events_rx:
|
established_connection_events:
|
||||||
mpsc::Receiver<task::EstablishedConnectionEvent<THandler::Handler>>,
|
SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::Handler>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -315,8 +318,6 @@ where
|
|||||||
/// Creates a new empty `Pool`.
|
/// Creates a new empty `Pool`.
|
||||||
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
|
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
|
||||||
let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
|
let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
|
||||||
let (established_connection_events_tx, established_connection_events_rx) =
|
|
||||||
mpsc::channel(config.task_event_buffer_size);
|
|
||||||
let executor = match config.executor {
|
let executor = match config.executor {
|
||||||
Some(exec) => ExecSwitch::Executor(exec),
|
Some(exec) => ExecSwitch::Executor(exec),
|
||||||
None => ExecSwitch::LocalSpawn(Default::default()),
|
None => ExecSwitch::LocalSpawn(Default::default()),
|
||||||
@ -331,11 +332,12 @@ where
|
|||||||
dial_concurrency_factor: config.dial_concurrency_factor,
|
dial_concurrency_factor: config.dial_concurrency_factor,
|
||||||
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
|
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
|
||||||
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
|
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
|
||||||
|
per_connection_event_buffer_size: config.per_connection_event_buffer_size,
|
||||||
executor,
|
executor,
|
||||||
pending_connection_events_tx,
|
pending_connection_events_tx,
|
||||||
pending_connection_events_rx,
|
pending_connection_events_rx,
|
||||||
established_connection_events_tx,
|
no_established_connections_waker: None,
|
||||||
established_connection_events_rx,
|
established_connection_events: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -547,9 +549,11 @@ where
|
|||||||
//
|
//
|
||||||
// Note that established connections are polled before pending connections, thus
|
// Note that established connections are polled before pending connections, thus
|
||||||
// prioritizing established connections over pending connections.
|
// prioritizing established connections over pending connections.
|
||||||
match self.established_connection_events_rx.poll_next_unpin(cx) {
|
match self.established_connection_events.poll_next_unpin(cx) {
|
||||||
Poll::Pending => {}
|
Poll::Pending => {}
|
||||||
Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
|
Poll::Ready(None) => {
|
||||||
|
self.no_established_connections_waker = Some(cx.waker().clone());
|
||||||
|
}
|
||||||
|
|
||||||
Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
|
Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
|
||||||
return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
|
return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
|
||||||
@ -750,6 +754,9 @@ where
|
|||||||
|
|
||||||
let (command_sender, command_receiver) =
|
let (command_sender, command_receiver) =
|
||||||
mpsc::channel(self.task_command_buffer_size);
|
mpsc::channel(self.task_command_buffer_size);
|
||||||
|
let (event_sender, event_receiver) =
|
||||||
|
mpsc::channel(self.per_connection_event_buffer_size);
|
||||||
|
|
||||||
conns.insert(
|
conns.insert(
|
||||||
id,
|
id,
|
||||||
EstablishedConnection {
|
EstablishedConnection {
|
||||||
@ -757,6 +764,10 @@ where
|
|||||||
sender: command_sender,
|
sender: command_sender,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
self.established_connection_events.push(event_receiver);
|
||||||
|
if let Some(waker) = self.no_established_connections_waker.take() {
|
||||||
|
waker.wake();
|
||||||
|
}
|
||||||
|
|
||||||
let connection = Connection::new(
|
let connection = Connection::new(
|
||||||
muxer,
|
muxer,
|
||||||
@ -764,13 +775,14 @@ where
|
|||||||
self.substream_upgrade_protocol_override,
|
self.substream_upgrade_protocol_override,
|
||||||
self.max_negotiating_inbound_streams,
|
self.max_negotiating_inbound_streams,
|
||||||
);
|
);
|
||||||
|
|
||||||
self.spawn(
|
self.spawn(
|
||||||
task::new_for_established_connection(
|
task::new_for_established_connection(
|
||||||
id,
|
id,
|
||||||
obtained_peer_id,
|
obtained_peer_id,
|
||||||
connection,
|
connection,
|
||||||
command_receiver,
|
command_receiver,
|
||||||
self.established_connection_events_tx.clone(),
|
event_sender,
|
||||||
)
|
)
|
||||||
.boxed(),
|
.boxed(),
|
||||||
);
|
);
|
||||||
@ -1069,7 +1081,7 @@ pub struct PoolConfig {
|
|||||||
|
|
||||||
/// Size of the pending connection task event buffer and the established connection task event
|
/// Size of the pending connection task event buffer and the established connection task event
|
||||||
/// buffer.
|
/// buffer.
|
||||||
pub task_event_buffer_size: usize,
|
pub per_connection_event_buffer_size: usize,
|
||||||
|
|
||||||
/// Number of addresses concurrently dialed for a single outbound connection attempt.
|
/// Number of addresses concurrently dialed for a single outbound connection attempt.
|
||||||
pub dial_concurrency_factor: NonZeroU8,
|
pub dial_concurrency_factor: NonZeroU8,
|
||||||
@ -1088,7 +1100,7 @@ impl PoolConfig {
|
|||||||
Self {
|
Self {
|
||||||
executor,
|
executor,
|
||||||
task_command_buffer_size: 32,
|
task_command_buffer_size: 32,
|
||||||
task_event_buffer_size: 7,
|
per_connection_event_buffer_size: 7,
|
||||||
dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
|
dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
|
||||||
substream_upgrade_protocol_override: None,
|
substream_upgrade_protocol_override: None,
|
||||||
max_negotiating_inbound_streams: 128,
|
max_negotiating_inbound_streams: 128,
|
||||||
@ -1113,8 +1125,8 @@ impl PoolConfig {
|
|||||||
/// When the buffer is full, the background tasks of all connections will stall.
|
/// When the buffer is full, the background tasks of all connections will stall.
|
||||||
/// In this way, the consumers of network events exert back-pressure on
|
/// In this way, the consumers of network events exert back-pressure on
|
||||||
/// the network connection I/O.
|
/// the network connection I/O.
|
||||||
pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
|
pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
|
||||||
self.task_event_buffer_size = n;
|
self.per_connection_event_buffer_size = n;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1489,31 +1489,19 @@ where
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Configures the number of extra events from the [`ConnectionHandler`] in
|
/// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
|
||||||
/// destination to the [`NetworkBehaviour`] that can be buffered before
|
/// [`NetworkBehaviour`].
|
||||||
/// the [`ConnectionHandler`] has to go to sleep.
|
|
||||||
///
|
///
|
||||||
/// There exists a buffer of events received from [`ConnectionHandler`]s
|
/// Each connection has its own buffer.
|
||||||
/// that the [`NetworkBehaviour`] has yet to process. This buffer is
|
|
||||||
/// shared between all instances of [`ConnectionHandler`]. Each instance of
|
|
||||||
/// [`ConnectionHandler`] is guaranteed one slot in this buffer, meaning
|
|
||||||
/// that delivering an event for the first time is guaranteed to be
|
|
||||||
/// instantaneous. Any extra event delivery, however, must wait for that
|
|
||||||
/// first event to be delivered or for an "extra slot" to be available.
|
|
||||||
///
|
///
|
||||||
/// This option configures the number of such "extra slots" in this
|
/// The ideal value depends on the executor used, the CPU speed and the volume of events.
|
||||||
/// shared buffer. These extra slots are assigned in a first-come,
|
/// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
|
||||||
/// first-served basis.
|
|
||||||
///
|
|
||||||
/// The ideal value depends on the executor used, the CPU speed, the
|
|
||||||
/// average number of connections, and the volume of events. If this value
|
|
||||||
/// is too low, then the [`ConnectionHandler`]s will be sleeping more often
|
|
||||||
/// than necessary. Increasing this value increases the overall memory
|
/// than necessary. Increasing this value increases the overall memory
|
||||||
/// usage, and more importantly the latency between the moment when an
|
/// usage, and more importantly the latency between the moment when an
|
||||||
/// event is emitted and the moment when it is received by the
|
/// event is emitted and the moment when it is received by the
|
||||||
/// [`NetworkBehaviour`].
|
/// [`NetworkBehaviour`].
|
||||||
pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
|
pub fn per_connection_event_buffer_size(mut self, n: usize) -> Self {
|
||||||
self.pool_config = self.pool_config.with_connection_event_buffer_size(n);
|
self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user