diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs
index 2cb3c060..34a27cdf 100644
--- a/core/src/upgrade.rs
+++ b/core/src/upgrade.rs
@@ -64,6 +64,7 @@ mod error;
mod from_fn;
mod map;
mod optional;
+mod pending;
mod select;
mod transfer;
@@ -77,6 +78,7 @@ pub use self::{
from_fn::{from_fn, FromFnUpgrade},
map::{MapInboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgrade, MapOutboundUpgradeErr},
optional::OptionalUpgrade,
+ pending::PendingUpgrade,
select::SelectUpgrade,
transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint},
};
diff --git a/core/src/upgrade/pending.rs b/core/src/upgrade/pending.rs
new file mode 100644
index 00000000..15d3c31d
--- /dev/null
+++ b/core/src/upgrade/pending.rs
@@ -0,0 +1,76 @@
+// Copyright 2022 Protocol Labs.
+// Copyright 2017-2018 Parity Technologies (UK) Ltd.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the "Software"),
+// to deal in the Software without restriction, including without limitation
+// the rights to use, copy, modify, merge, publish, distribute, sublicense,
+// and/or sell copies of the Software, and to permit persons to whom the
+// Software is furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
+use futures::future;
+use std::iter;
+use void::Void;
+
+/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that always
+/// returns a pending upgrade.
+#[derive(Debug, Copy, Clone)]
+pub struct PendingUpgrade
{
+ protocol_name: P,
+}
+
+impl
PendingUpgrade
{
+ pub fn new(protocol_name: P) -> Self {
+ Self { protocol_name }
+ }
+}
+
+impl
UpgradeInfo for PendingUpgrade
+where
+ P: ProtocolName + Clone,
+{
+ type Info = P;
+ type InfoIter = iter::Once
;
+
+ fn protocol_info(&self) -> Self::InfoIter {
+ iter::once(self.protocol_name.clone())
+ }
+}
+
+impl InboundUpgrade for PendingUpgrade
+where
+ P: ProtocolName + Clone,
+{
+ type Output = Void;
+ type Error = Void;
+ type Future = future::Pending>;
+
+ fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future {
+ future::pending()
+ }
+}
+
+impl OutboundUpgrade for PendingUpgrade
+where
+ P: ProtocolName + Clone,
+{
+ type Output = Void;
+ type Error = Void;
+ type Future = future::Pending>;
+
+ fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future {
+ future::pending()
+ }
+}
diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs
index 77eb1d79..03d09b3f 100644
--- a/swarm/src/connection/handler_wrapper.rs
+++ b/swarm/src/connection/handler_wrapper.rs
@@ -440,3 +440,82 @@ pub enum Event {
/// Other event.
Custom(TCustom),
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::handler::PendingConnectionHandler;
+ use quickcheck::*;
+ use std::sync::Arc;
+
+ #[test]
+ fn max_negotiating_inbound_streams() {
+ fn prop(max_negotiating_inbound_streams: u8) {
+ let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into();
+ let mut wrapper = HandlerWrapper::new(
+ PeerId::random(),
+ ConnectedPoint::Listener {
+ local_addr: Multiaddr::empty(),
+ send_back_addr: Multiaddr::empty(),
+ },
+ PendingConnectionHandler::new("test".to_string()),
+ None,
+ max_negotiating_inbound_streams,
+ );
+ let alive_substreams_counter = Arc::new(());
+
+ for _ in 0..max_negotiating_inbound_streams {
+ let substream =
+ SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
+ wrapper.inject_substream(substream, SubstreamEndpoint::Listener);
+ }
+
+ assert_eq!(
+ Arc::strong_count(&alive_substreams_counter),
+ max_negotiating_inbound_streams + 1,
+ "Expect none of the substreams up to the limit to be dropped."
+ );
+
+ let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
+ wrapper.inject_substream(substream, SubstreamEndpoint::Listener);
+
+ assert_eq!(
+ Arc::strong_count(&alive_substreams_counter),
+ max_negotiating_inbound_streams + 1,
+ "Expect substream exceeding the limit to be dropped."
+ );
+ }
+
+ QuickCheck::new().quickcheck(prop as fn(_));
+ }
+
+ struct PendingSubstream(Arc<()>);
+
+ impl AsyncRead for PendingSubstream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ _buf: &mut [u8],
+ ) -> Poll> {
+ Poll::Pending
+ }
+ }
+
+ impl AsyncWrite for PendingSubstream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ _buf: &[u8],
+ ) -> Poll> {
+ Poll::Pending
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> {
+ Poll::Pending
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> {
+ Poll::Pending
+ }
+ }
+}
diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs
index 7060d33b..c6125f27 100644
--- a/swarm/src/handler.rs
+++ b/swarm/src/handler.rs
@@ -44,6 +44,7 @@ mod map_in;
mod map_out;
pub mod multi;
mod one_shot;
+mod pending;
mod select;
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
@@ -56,6 +57,7 @@ pub use dummy::DummyConnectionHandler;
pub use map_in::MapInEvent;
pub use map_out::MapOutEvent;
pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
+pub use pending::PendingConnectionHandler;
pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect};
/// A handler for a set of protocols used on a connection with a remote.
diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs
new file mode 100644
index 00000000..04c16965
--- /dev/null
+++ b/swarm/src/handler/pending.rs
@@ -0,0 +1,120 @@
+// Copyright 2022 Protocol Labs.
+// Copyright 2018 Parity Technologies (UK) Ltd.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the "Software"),
+// to deal in the Software without restriction, including without limitation
+// the rights to use, copy, modify, merge, publish, distribute, sublicense,
+// and/or sell copies of the Software, and to permit persons to whom the
+// Software is furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use crate::handler::{
+ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
+ SubstreamProtocol,
+};
+use crate::NegotiatedSubstream;
+use libp2p_core::{
+ upgrade::{InboundUpgrade, OutboundUpgrade, PendingUpgrade},
+ Multiaddr,
+};
+use std::task::{Context, Poll};
+use void::Void;
+
+/// Implementation of [`ConnectionHandler`] that returns a pending upgrade.
+#[derive(Clone, Debug)]
+pub struct PendingConnectionHandler {
+ protocol_name: String,
+}
+
+impl PendingConnectionHandler {
+ pub fn new(protocol_name: String) -> Self {
+ PendingConnectionHandler { protocol_name }
+ }
+}
+
+impl ConnectionHandler for PendingConnectionHandler {
+ type InEvent = Void;
+ type OutEvent = Void;
+ type Error = Void;
+ type InboundProtocol = PendingUpgrade;
+ type OutboundProtocol = PendingUpgrade;
+ type OutboundOpenInfo = Void;
+ type InboundOpenInfo = ();
+
+ fn listen_protocol(&self) -> SubstreamProtocol {
+ SubstreamProtocol::new(PendingUpgrade::new(self.protocol_name.clone()), ())
+ }
+
+ fn inject_fully_negotiated_inbound(
+ &mut self,
+ protocol: >::Output,
+ _: Self::InboundOpenInfo,
+ ) {
+ void::unreachable(protocol)
+ }
+
+ fn inject_fully_negotiated_outbound(
+ &mut self,
+ protocol: >::Output,
+ _info: Self::OutboundOpenInfo,
+ ) {
+ void::unreachable(protocol);
+ #[allow(unreachable_code)]
+ {
+ void::unreachable(_info);
+ }
+ }
+
+ fn inject_event(&mut self, v: Self::InEvent) {
+ void::unreachable(v)
+ }
+
+ fn inject_address_change(&mut self, _: &Multiaddr) {}
+
+ fn inject_dial_upgrade_error(
+ &mut self,
+ _: Self::OutboundOpenInfo,
+ _: ConnectionHandlerUpgrErr<
+ >::Error,
+ >,
+ ) {
+ }
+
+ fn inject_listen_upgrade_error(
+ &mut self,
+ _: Self::InboundOpenInfo,
+ _: ConnectionHandlerUpgrErr<
+ >::Error,
+ >,
+ ) {
+ }
+
+ fn connection_keep_alive(&self) -> KeepAlive {
+ KeepAlive::No
+ }
+
+ fn poll(
+ &mut self,
+ _: &mut Context<'_>,
+ ) -> Poll<
+ ConnectionHandlerEvent<
+ Self::OutboundProtocol,
+ Self::OutboundOpenInfo,
+ Self::OutEvent,
+ Self::Error,
+ >,
+ > {
+ Poll::Pending
+ }
+}