swarm/src/connection: Extend log when exceeding streams limit (#2716)

Log peer ID and stream limit as well as reference config option when limit is
exceeded. This should help folks running into this limit debug what is going on.
This commit is contained in:
Max Inden 2022-06-22 06:02:55 +02:00 committed by GitHub
parent e2bef93bd7
commit 7eaa9c7bb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 29 additions and 7 deletions

View File

@ -1,3 +1,9 @@
# 0.36.2 [unreleased]
- Extend log message when exceeding inbound negotiating streams with peer ID and limit. See [PR 2716].
[PR 2716]: https://github.com/libp2p/rust-libp2p/pull/2716/
# 0.36.1 # 0.36.1
- Limit negotiating inbound substreams per connection. See [PR 2697]. - Limit negotiating inbound substreams per connection. See [PR 2697].

View File

@ -3,7 +3,7 @@ name = "libp2p-swarm"
edition = "2021" edition = "2021"
rust-version = "1.56.1" rust-version = "1.56.1"
description = "The libp2p swarm" description = "The libp2p swarm"
version = "0.36.1" version = "0.36.2"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -35,6 +35,7 @@ pub use pool::{EstablishedConnection, PendingConnection};
pub use substream::{Close, Substream, SubstreamEndpoint}; pub use substream::{Close, Substream, SubstreamEndpoint};
use crate::handler::ConnectionHandler; use crate::handler::ConnectionHandler;
use crate::IntoConnectionHandler;
use handler_wrapper::HandlerWrapper; use handler_wrapper::HandlerWrapper;
use libp2p_core::connection::ConnectedPoint; use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr; use libp2p_core::multiaddr::Multiaddr;
@ -94,12 +95,16 @@ where
/// Builds a new `Connection` from the given substream multiplexer /// Builds a new `Connection` from the given substream multiplexer
/// and connection handler. /// and connection handler.
pub fn new( pub fn new(
peer_id: PeerId,
endpoint: ConnectedPoint,
muxer: StreamMuxerBox, muxer: StreamMuxerBox,
handler: THandler, handler: impl IntoConnectionHandler<Handler = THandler>,
substream_upgrade_protocol_override: Option<upgrade::Version>, substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize, max_negotiating_inbound_streams: usize,
) -> Self { ) -> Self {
let wrapped_handler = HandlerWrapper::new( let wrapped_handler = HandlerWrapper::new(
peer_id,
endpoint,
handler, handler,
substream_upgrade_protocol_override, substream_upgrade_protocol_override,
max_negotiating_inbound_streams, max_negotiating_inbound_streams,

View File

@ -23,6 +23,7 @@ use crate::handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
}; };
use crate::upgrade::SendWrapper; use crate::upgrade::SendWrapper;
use crate::IntoConnectionHandler;
use futures::prelude::*; use futures::prelude::*;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
@ -33,6 +34,7 @@ use libp2p_core::{
upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError},
Multiaddr, Multiaddr,
}; };
use libp2p_core::{ConnectedPoint, PeerId};
use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration};
/// A wrapper for an underlying [`ConnectionHandler`]. /// A wrapper for an underlying [`ConnectionHandler`].
@ -46,6 +48,7 @@ pub struct HandlerWrapper<TConnectionHandler>
where where
TConnectionHandler: ConnectionHandler, TConnectionHandler: ConnectionHandler,
{ {
remote_peer_id: PeerId,
/// The underlying handler. /// The underlying handler.
handler: TConnectionHandler, handler: TConnectionHandler,
/// Futures that upgrade incoming substreams. /// Futures that upgrade incoming substreams.
@ -106,12 +109,15 @@ impl<TConnectionHandler: ConnectionHandler> std::fmt::Debug for HandlerWrapper<T
impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> { impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
pub(crate) fn new( pub(crate) fn new(
handler: TConnectionHandler, remote_peer_id: PeerId,
endpoint: ConnectedPoint,
handler: impl IntoConnectionHandler<Handler = TConnectionHandler>,
substream_upgrade_protocol_override: Option<upgrade::Version>, substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize, max_negotiating_inbound_streams: usize,
) -> Self { ) -> Self {
Self { Self {
handler, remote_peer_id,
handler: handler.into_handler(&remote_peer_id, &endpoint),
negotiating_in: Default::default(), negotiating_in: Default::default(),
negotiating_out: Default::default(), negotiating_out: Default::default(),
queued_dial_upgrades: Vec::new(), queued_dial_upgrades: Vec::new(),
@ -257,8 +263,11 @@ where
SubstreamEndpoint::Listener => { SubstreamEndpoint::Listener => {
if self.negotiating_in.len() == self.max_negotiating_inbound_streams { if self.negotiating_in.len() == self.max_negotiating_inbound_streams {
log::warn!( log::warn!(
"Incoming substream exceeding maximum number of \ "Incoming substream from {} exceeding maximum number \
negotiating inbound streams. Dropping." of negotiating inbound streams {} on connection. \
Dropping. See PoolConfig::with_max_negotiating_inbound_streams.",
self.remote_peer_id,
self.max_negotiating_inbound_streams,
); );
return; return;
} }

View File

@ -747,8 +747,10 @@ where
); );
let connection = super::Connection::new( let connection = super::Connection::new(
obtained_peer_id,
endpoint,
muxer, muxer,
handler.into_handler(&obtained_peer_id, &endpoint), handler,
self.substream_upgrade_protocol_override, self.substream_upgrade_protocol_override,
self.max_negotiating_inbound_streams, self.max_negotiating_inbound_streams,
); );