diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 71d0ea3e..bd37a53d 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.36.2 [unreleased] + +- Extend log message when exceeding inbound negotiating streams with peer ID and limit. See [PR 2716]. + +[PR 2716]: https://github.com/libp2p/rust-libp2p/pull/2716/ + # 0.36.1 - Limit negotiating inbound substreams per connection. See [PR 2697]. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index ddd8c82c..abe67a15 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.56.1" description = "The libp2p swarm" -version = "0.36.1" +version = "0.36.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index f55f0367..e49c2ae5 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -35,6 +35,7 @@ pub use pool::{EstablishedConnection, PendingConnection}; pub use substream::{Close, Substream, SubstreamEndpoint}; use crate::handler::ConnectionHandler; +use crate::IntoConnectionHandler; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; @@ -94,12 +95,16 @@ where /// Builds a new `Connection` from the given substream multiplexer /// and connection handler. pub fn new( + peer_id: PeerId, + endpoint: ConnectedPoint, muxer: StreamMuxerBox, - handler: THandler, + handler: impl IntoConnectionHandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, ) -> Self { let wrapped_handler = HandlerWrapper::new( + peer_id, + endpoint, handler, substream_upgrade_protocol_override, max_negotiating_inbound_streams, diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 05e389d6..a97b2a06 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -23,6 +23,7 @@ use crate::handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, }; use crate::upgrade::SendWrapper; +use crate::IntoConnectionHandler; use futures::prelude::*; use futures::stream::FuturesUnordered; @@ -33,6 +34,7 @@ use libp2p_core::{ upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, Multiaddr, }; +use libp2p_core::{ConnectedPoint, PeerId}; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; /// A wrapper for an underlying [`ConnectionHandler`]. @@ -46,6 +48,7 @@ pub struct HandlerWrapper where TConnectionHandler: ConnectionHandler, { + remote_peer_id: PeerId, /// The underlying handler. handler: TConnectionHandler, /// Futures that upgrade incoming substreams. @@ -106,12 +109,15 @@ impl std::fmt::Debug for HandlerWrapper HandlerWrapper { pub(crate) fn new( - handler: TConnectionHandler, + remote_peer_id: PeerId, + endpoint: ConnectedPoint, + handler: impl IntoConnectionHandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, ) -> Self { Self { - handler, + remote_peer_id, + handler: handler.into_handler(&remote_peer_id, &endpoint), negotiating_in: Default::default(), negotiating_out: Default::default(), queued_dial_upgrades: Vec::new(), @@ -257,8 +263,11 @@ where SubstreamEndpoint::Listener => { if self.negotiating_in.len() == self.max_negotiating_inbound_streams { log::warn!( - "Incoming substream exceeding maximum number of \ - negotiating inbound streams. Dropping." + "Incoming substream from {} exceeding maximum number \ + of negotiating inbound streams {} on connection. \ + Dropping. See PoolConfig::with_max_negotiating_inbound_streams.", + self.remote_peer_id, + self.max_negotiating_inbound_streams, ); return; } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index a8f28bee..10fc29a8 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -747,8 +747,10 @@ where ); let connection = super::Connection::new( + obtained_peer_id, + endpoint, muxer, - handler.into_handler(&obtained_peer_id, &endpoint), + handler, self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, );