From 8dc0188a1d6f62c59b1ea07088d6e1c651419006 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 16 Aug 2022 10:15:31 +0200 Subject: [PATCH] swarm/src/connection: Test max_negotiating_inbound_streams (#2785) Test that `HandlerWrapper` upholds the provided `max_negotiating_inbound_streams` limit. --- core/src/upgrade.rs | 2 + core/src/upgrade/pending.rs | 76 +++++++++++++++ swarm/src/connection/handler_wrapper.rs | 79 ++++++++++++++++ swarm/src/handler.rs | 2 + swarm/src/handler/pending.rs | 120 ++++++++++++++++++++++++ 5 files changed, 279 insertions(+) create mode 100644 core/src/upgrade/pending.rs create mode 100644 swarm/src/handler/pending.rs diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 2cb3c060..34a27cdf 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -64,6 +64,7 @@ mod error; mod from_fn; mod map; mod optional; +mod pending; mod select; mod transfer; @@ -77,6 +78,7 @@ pub use self::{ from_fn::{from_fn, FromFnUpgrade}, map::{MapInboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgrade, MapOutboundUpgradeErr}, optional::OptionalUpgrade, + pending::PendingUpgrade, select::SelectUpgrade, transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint}, }; diff --git a/core/src/upgrade/pending.rs b/core/src/upgrade/pending.rs new file mode 100644 index 00000000..15d3c31d --- /dev/null +++ b/core/src/upgrade/pending.rs @@ -0,0 +1,76 @@ +// Copyright 2022 Protocol Labs. +// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use futures::future; +use std::iter; +use void::Void; + +/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that always +/// returns a pending upgrade. +#[derive(Debug, Copy, Clone)] +pub struct PendingUpgrade

{ + protocol_name: P, +} + +impl

PendingUpgrade

{ + pub fn new(protocol_name: P) -> Self { + Self { protocol_name } + } +} + +impl

UpgradeInfo for PendingUpgrade

+where + P: ProtocolName + Clone, +{ + type Info = P; + type InfoIter = iter::Once

; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol_name.clone()) + } +} + +impl InboundUpgrade for PendingUpgrade

+where + P: ProtocolName + Clone, +{ + type Output = Void; + type Error = Void; + type Future = future::Pending>; + + fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future { + future::pending() + } +} + +impl OutboundUpgrade for PendingUpgrade

+where + P: ProtocolName + Clone, +{ + type Output = Void; + type Error = Void; + type Future = future::Pending>; + + fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { + future::pending() + } +} diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 77eb1d79..03d09b3f 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -440,3 +440,82 @@ pub enum Event { /// Other event. Custom(TCustom), } + +#[cfg(test)] +mod tests { + use super::*; + use crate::handler::PendingConnectionHandler; + use quickcheck::*; + use std::sync::Arc; + + #[test] + fn max_negotiating_inbound_streams() { + fn prop(max_negotiating_inbound_streams: u8) { + let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); + let mut wrapper = HandlerWrapper::new( + PeerId::random(), + ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + PendingConnectionHandler::new("test".to_string()), + None, + max_negotiating_inbound_streams, + ); + let alive_substreams_counter = Arc::new(()); + + for _ in 0..max_negotiating_inbound_streams { + let substream = + SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); + wrapper.inject_substream(substream, SubstreamEndpoint::Listener); + } + + assert_eq!( + Arc::strong_count(&alive_substreams_counter), + max_negotiating_inbound_streams + 1, + "Expect none of the substreams up to the limit to be dropped." + ); + + let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); + wrapper.inject_substream(substream, SubstreamEndpoint::Listener); + + assert_eq!( + Arc::strong_count(&alive_substreams_counter), + max_negotiating_inbound_streams + 1, + "Expect substream exceeding the limit to be dropped." + ); + } + + QuickCheck::new().quickcheck(prop as fn(_)); + } + + struct PendingSubstream(Arc<()>); + + impl AsyncRead for PendingSubstream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll> { + Poll::Pending + } + } + + impl AsyncWrite for PendingSubstream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Pending + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + } +} diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 7060d33b..c6125f27 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -44,6 +44,7 @@ mod map_in; mod map_out; pub mod multi; mod one_shot; +mod pending; mod select; pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend}; @@ -56,6 +57,7 @@ pub use dummy::DummyConnectionHandler; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; +pub use pending::PendingConnectionHandler; pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect}; /// A handler for a set of protocols used on a connection with a remote. diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs new file mode 100644 index 00000000..04c16965 --- /dev/null +++ b/swarm/src/handler/pending.rs @@ -0,0 +1,120 @@ +// Copyright 2022 Protocol Labs. +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::handler::{ + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + SubstreamProtocol, +}; +use crate::NegotiatedSubstream; +use libp2p_core::{ + upgrade::{InboundUpgrade, OutboundUpgrade, PendingUpgrade}, + Multiaddr, +}; +use std::task::{Context, Poll}; +use void::Void; + +/// Implementation of [`ConnectionHandler`] that returns a pending upgrade. +#[derive(Clone, Debug)] +pub struct PendingConnectionHandler { + protocol_name: String, +} + +impl PendingConnectionHandler { + pub fn new(protocol_name: String) -> Self { + PendingConnectionHandler { protocol_name } + } +} + +impl ConnectionHandler for PendingConnectionHandler { + type InEvent = Void; + type OutEvent = Void; + type Error = Void; + type InboundProtocol = PendingUpgrade; + type OutboundProtocol = PendingUpgrade; + type OutboundOpenInfo = Void; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(PendingUpgrade::new(self.protocol_name.clone()), ()) + } + + fn inject_fully_negotiated_inbound( + &mut self, + protocol: >::Output, + _: Self::InboundOpenInfo, + ) { + void::unreachable(protocol) + } + + fn inject_fully_negotiated_outbound( + &mut self, + protocol: >::Output, + _info: Self::OutboundOpenInfo, + ) { + void::unreachable(protocol); + #[allow(unreachable_code)] + { + void::unreachable(_info); + } + } + + fn inject_event(&mut self, v: Self::InEvent) { + void::unreachable(v) + } + + fn inject_address_change(&mut self, _: &Multiaddr) {} + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + _: ConnectionHandlerUpgrErr< + >::Error, + >, + ) { + } + + fn inject_listen_upgrade_error( + &mut self, + _: Self::InboundOpenInfo, + _: ConnectionHandlerUpgrErr< + >::Error, + >, + ) { + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::No + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + Poll::Pending + } +}