From c271f6f56b7b5c33b1ced135e01291b1d48e8eb8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 15 May 2020 14:40:10 +0200 Subject: [PATCH] Make the number of events buffered to/from tasks configurable (#1574) * Make the number of events buffered to/from tasks configurable * Assign a PR number * Fix comment * Apply suggestions from code review Co-authored-by: Roman Borschel * Rename variables * Apply suggestions from code review Co-authored-by: Roman Borschel Co-authored-by: Roman Borschel --- CHANGELOG.md | 8 +++++-- core/src/connection/manager.rs | 40 +++++++++++++++++++++++++++---- core/src/connection/pool.rs | 7 +++--- core/src/network.rs | 41 ++++++++++++++++++++++++++----- swarm/src/lib.rs | 44 +++++++++++++++++++++++++++++++++- 5 files changed, 122 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c75aef9..b18df2c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ attempts per peer, with a configurable limit. [PR 1506](https://github.com/libp2p/rust-libp2p/pull/1506) +- `libp2p-core`: Updated to multihash 0.11.0. + [PR 1566](https://github.com/libp2p/rust-libp2p/pull/1566) + +- `libp2p-core`: Make the number of events buffered to/from tasks configurable. + [PR 1574](https://github.com/libp2p/rust-libp2p/pull/1574) + - `libp2p-noise`: Added the `X25519Spec` protocol suite which uses libp2p-noise-spec compliant signatures on static keys as well as the `/noise` protocol upgrade, hence providing a libp2p-noise-spec compliant @@ -27,8 +33,6 @@ be supported. IPv4 listener addresses are not affected by this change. [PR 1555](https://github.com/libp2p/rust-libp2p/pull/1555) -- `libp2p-core`: Updated to multihash 0.11.0. - # Version 0.18.1 (2020-04-17) - `libp2p-swarm`: Make sure inject_dial_failure is called in all situations. diff --git a/core/src/connection/manager.rs b/core/src/connection/manager.rs index c366a071..9dbb644a 100644 --- a/core/src/connection/manager.rs +++ b/core/src/connection/manager.rs @@ -99,6 +99,9 @@ pub struct Manager { /// Next available identifier for a new connection / task. next_task_id: TaskId, + /// Size of the task command buffer (per task). + task_command_buffer_size: usize, + /// The executor to use for running the background tasks. If `None`, /// the tasks are kept in `local_spawns` instead and polled on the /// current thread when the manager is polled for new events. @@ -127,6 +130,32 @@ where } } +/// Configuration options when creating a [`Manager`]. +/// +/// The default configuration specifies no dedicated task executor, a +/// task event buffer size of 32, and a task command buffer size of 7. +#[non_exhaustive] +pub struct ManagerConfig { + /// Executor to use to spawn tasks. + pub executor: Option>, + + /// Size of the task command buffer (per task). + pub task_command_buffer_size: usize, + + /// Size of the task event buffer (for all tasks). + pub task_event_buffer_size: usize, +} + +impl Default for ManagerConfig { + fn default() -> Self { + ManagerConfig { + executor: None, + task_event_buffer_size: 32, + task_command_buffer_size: 7, + } + } +} + /// Internal information about a running task. /// /// Contains the sender to deliver event messages to the task, and @@ -196,12 +225,13 @@ pub enum Event<'a, I, O, H, TE, HE, C> { impl Manager { /// Creates a new connection manager. - pub fn new(executor: Option>) -> Self { - let (tx, rx) = mpsc::channel(1); + pub fn new(config: ManagerConfig) -> Self { + let (tx, rx) = mpsc::channel(config.task_event_buffer_size); Self { tasks: FnvHashMap::default(), next_task_id: TaskId(0), - executor, + task_command_buffer_size: config.task_command_buffer_size, + executor: config.executor, local_spawns: FuturesUnordered::new(), events_tx: tx, events_rx: rx @@ -234,7 +264,7 @@ impl Manager { let task_id = self.next_task_id; self.next_task_id.0 += 1; - let (tx, rx) = mpsc::channel(4); + let (tx, rx) = mpsc::channel(self.task_command_buffer_size); self.tasks.insert(task_id, TaskInfo { sender: tx, state: TaskState::Pending }); let task = Box::pin(Task::pending(task_id, self.events_tx.clone(), rx, future, handler)); @@ -269,7 +299,7 @@ impl Manager { let task_id = self.next_task_id; self.next_task_id.0 += 1; - let (tx, rx) = mpsc::channel(4); + let (tx, rx) = mpsc::channel(self.task_command_buffer_size); self.tasks.insert(task_id, TaskInfo { sender: tx, state: TaskState::Established(info) }); diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 7da7efa8..b319ca76 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - Executor, ConnectedPoint, PeerId, connection::{ @@ -36,7 +35,7 @@ use crate::{ OutgoingInfo, Substream, PendingConnectionError, - manager::{self, Manager}, + manager::{self, Manager, ManagerConfig}, }, muxing::StreamMuxer, }; @@ -175,13 +174,13 @@ where /// Creates a new empty `Pool`. pub fn new( local_id: TPeerId, - executor: Option>, + manager_config: ManagerConfig, limits: PoolLimits ) -> Self { Pool { local_id, limits, - manager: Manager::new(executor), + manager: Manager::new(manager_config), established: Default::default(), pending: Default::default(), } diff --git a/core/src/network.rs b/core/src/network.rs index 73240abd..1e89dcd7 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -43,6 +43,7 @@ use crate::{ ListenersStream, PendingConnectionError, Substream, + manager::ManagerConfig, pool::{Pool, PoolEvent, PoolLimits}, }, muxing::StreamMuxer, @@ -57,6 +58,7 @@ use std::{ error, fmt, hash::Hash, + num::NonZeroUsize, pin::Pin, task::{Context, Poll}, }; @@ -154,7 +156,7 @@ where Network { local_peer_id, listeners: ListenersStream::new(transport), - pool: Pool::new(pool_local_id, config.executor, config.pool_limits), + pool: Pool::new(pool_local_id, config.manager_config, config.pool_limits), dialing: Default::default(), } } @@ -598,17 +600,21 @@ pub struct NetworkInfo { /// The (optional) configuration for a [`Network`]. /// -/// The default configuration specifies no dedicated task executor -/// and no connection limits. +/// The default configuration specifies no dedicated task executor, no +/// connection limits, a connection event buffer size of 32, and a +/// `notify_handler` buffer size of 8. #[derive(Default)] pub struct NetworkConfig { - executor: Option>, + /// Note that the `ManagerConfig`s task command buffer always provides + /// one "free" slot per task. Thus the given total `notify_handler_buffer_size` + /// exposed for configuration on the `Network` is reduced by one. + manager_config: ManagerConfig, pool_limits: PoolLimits, } impl NetworkConfig { pub fn set_executor(&mut self, e: Box) -> &mut Self { - self.executor = Some(e); + self.manager_config.executor = Some(e); self } @@ -625,7 +631,30 @@ impl NetworkConfig { } pub fn executor(&self) -> Option<&Box> { - self.executor.as_ref() + self.manager_config.executor.as_ref() + } + + /// Sets the maximum number of events sent to a connection's background task + /// that may be buffered, if the task cannot keep up with their consumption and + /// delivery to the connection handler. + /// + /// When the buffer for a particular connection is full, `notify_handler` will no + /// longer be able to deliver events to the associated `ConnectionHandler`, + /// thus exerting back-pressure on the connection and peer API. + pub fn set_notify_handler_buffer_size(&mut self, n: NonZeroUsize) -> &mut Self { + self.manager_config.task_command_buffer_size = n.get() - 1; + self + } + + /// Sets the maximum number of buffered connection events (beyond a guaranteed + /// buffer of 1 event per connection). + /// + /// When the buffer is full, the background tasks of all connections will stall. + /// In this way, the consumers of network events exert back-pressure on + /// the network connection I/O. + pub fn set_connection_event_buffer_size(&mut self, n: usize) -> &mut Self { + self.manager_config.task_event_buffer_size = n; + self } pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index d1d3ea6a..f848f64f 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -123,7 +123,7 @@ use registry::{Addresses, AddressIntoIter}; use smallvec::SmallVec; use std::{error, fmt, hash::Hash, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}}; use std::collections::HashSet; -use std::num::NonZeroU32; +use std::num::{NonZeroU32, NonZeroUsize}; use upgrade::UpgradeInfoSend as _; /// Contains the state of the network, plus the way it should behave. @@ -1002,6 +1002,48 @@ where TBehaviour: NetworkBehaviour, self } + /// Configures the number of events from the [`NetworkBehaviour`] in + /// destination to the [`ProtocolsHandler`] that can be buffered before + /// the [`Swarm`] has to wait. An individual buffer with this number of + /// events exists for each individual connection. + /// + /// The ideal value depends on the executor used, the CPU speed, and the + /// volume of events. If this value is too low, then the [`Swarm`] will + /// be sleeping more often than necessary. Increasing this value increases + /// the overall memory usage. + pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self { + self.network_config.set_notify_handler_buffer_size(n); + self + } + + /// Configures the number of extra events from the [`ProtocolsHandler`] in + /// destination to the [`NetworkBehaviour`] that can be buffered before + /// the [`ProtocolsHandler`] has to go to sleep. + /// + /// There exists a buffer of events received from [`ProtocolsHandler`]s + /// that the [`NetworkBehaviour`] has yet to process. This buffer is + /// shared between all instances of [`ProtocolsHandler`]. Each instance of + /// [`ProtocolsHandler`] is guaranteed one slot in this buffer, meaning + /// that delivering an event for the first time is guaranteed to be + /// instantaneous. Any extra event delivery, however, must wait for that + /// first event to be delivered or for an "extra slot" to be available. + /// + /// This option configures the number of such "extra slots" in this + /// shared buffer. These extra slots are assigned in a first-come, + /// first-served basis. + /// + /// The ideal value depends on the executor used, the CPU speed, the + /// average number of connections, and the volume of events. If this value + /// is too low, then the [`ProtocolsHandler`]s will be sleeping more often + /// than necessary. Increasing this value increases the overall memory + /// usage, and more importantly the latency between the moment when an + /// event is emitted and the moment when it is received by the + /// [`NetworkBehaviour`]. + pub fn connection_event_buffer_size(mut self, n: usize) -> Self { + self.network_config.set_connection_event_buffer_size(n); + self + } + /// Configures a limit for the number of simultaneous incoming /// connection attempts. pub fn incoming_connection_limit(mut self, n: usize) -> Self {