swarm/: Limit negotiating inbound substreams per connection (#2697)

This limit is shared across all `ConnectionHandler`s on a single connection. It
only enforces a limit on the number of negotiating substreams. Once negotiated a
`ConnectionHandler` manages the lifecycle of the substream and has to enforce
limits themselves.
This commit is contained in:
Max Inden 2022-06-08 11:48:46 +02:00 committed by GitHub
parent 59a74b4083
commit 2acbb457cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 2 deletions

View File

@ -1,3 +1,9 @@
# 0.36.1 - unreleased
- Limit negotiating inbound substreams per connection. See [PR 2697].
[PR 2697]: https://github.com/libp2p/rust-libp2p/pull/2697
# 0.36.0
- Don't require `Transport` to be `Clone`. See [PR 2529].

View File

@ -3,7 +3,7 @@ name = "libp2p-swarm"
edition = "2021"
rust-version = "1.56.1"
description = "The libp2p swarm"
version = "0.36.0"
version = "0.36.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -97,8 +97,13 @@ where
muxer: StreamMuxerBox,
handler: THandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
) -> Self {
let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override);
let wrapped_handler = HandlerWrapper::new(
handler,
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
);
Connection {
muxing: Muxing::new(muxer),
handler: wrapped_handler,

View File

@ -77,6 +77,16 @@ where
shutdown: Shutdown,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
/// The maximum number of inbound streams concurrently negotiating on a
/// connection. New inbound streams exceeding the limit are dropped and thus
/// reset.
///
/// Note: This only enforces a limit on the number of concurrently
/// negotiating inbound streams. The total number of inbound streams on a
/// connection is the sum of negotiating and negotiated streams. A limit on
/// the total number of streams can be enforced at the [`StreamMuxerBox`]
/// level.
max_negotiating_inbound_streams: usize,
}
impl<TConnectionHandler: ConnectionHandler> std::fmt::Debug for HandlerWrapper<TConnectionHandler> {
@ -98,6 +108,7 @@ impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
pub(crate) fn new(
handler: TConnectionHandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
) -> Self {
Self {
handler,
@ -107,6 +118,7 @@ impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
}
}
@ -243,6 +255,14 @@ where
) {
match endpoint {
SubstreamEndpoint::Listener => {
if self.negotiating_in.len() == self.max_negotiating_inbound_streams {
log::warn!(
"Incoming substream exceeding maximum number of \
negotiating inbound streams. Dropping."
);
return;
}
let protocol = self.handler.listen_protocol();
let timeout = *protocol.timeout();
let (upgrade, user_data) = protocol.into_upgrade();

View File

@ -87,6 +87,11 @@ where
/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`].
max_negotiating_inbound_streams: usize,
/// The executor to use for running the background tasks. If `None`,
/// the tasks are kept in `local_spawns` instead and polled on the
/// current thread when the [`Pool`] is polled for new events.
@ -263,6 +268,7 @@ where
task_command_buffer_size: config.task_command_buffer_size,
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
executor: config.executor,
local_spawns: FuturesUnordered::new(),
pending_connection_events_tx,
@ -744,6 +750,7 @@ where
muxer,
handler.into_handler(&obtained_peer_id, &endpoint),
self.substream_upgrade_protocol_override,
self.max_negotiating_inbound_streams,
);
self.spawn(
task::new_for_established_connection(
@ -1153,6 +1160,11 @@ pub struct PoolConfig {
/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams].
max_negotiating_inbound_streams: usize,
}
impl Default for PoolConfig {
@ -1164,6 +1176,7 @@ impl Default for PoolConfig {
// By default, addresses of a single connection attempt are dialed in sequence.
dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"),
substream_upgrade_protocol_override: None,
max_negotiating_inbound_streams: 128,
}
}
}
@ -1222,6 +1235,14 @@ impl PoolConfig {
self.substream_upgrade_protocol_override = Some(v);
self
}
/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`].
pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.max_negotiating_inbound_streams = v;
self
}
}
trait EntryExt<'a, K, V> {

View File

@ -1364,6 +1364,14 @@ where
self
}
/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`PoolConfig::with_max_negotiating_inbound_streams`].
pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
self
}
/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self