mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-03 05:01:24 +00:00
swarm/src/connection: Test max_negotiating_inbound_streams (#2785)
Test that `HandlerWrapper` upholds the provided `max_negotiating_inbound_streams` limit.
This commit is contained in:
parent
6a9fa3d930
commit
8dc0188a1d
@ -64,6 +64,7 @@ mod error;
|
|||||||
mod from_fn;
|
mod from_fn;
|
||||||
mod map;
|
mod map;
|
||||||
mod optional;
|
mod optional;
|
||||||
|
mod pending;
|
||||||
mod select;
|
mod select;
|
||||||
mod transfer;
|
mod transfer;
|
||||||
|
|
||||||
@ -77,6 +78,7 @@ pub use self::{
|
|||||||
from_fn::{from_fn, FromFnUpgrade},
|
from_fn::{from_fn, FromFnUpgrade},
|
||||||
map::{MapInboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgrade, MapOutboundUpgradeErr},
|
map::{MapInboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgrade, MapOutboundUpgradeErr},
|
||||||
optional::OptionalUpgrade,
|
optional::OptionalUpgrade,
|
||||||
|
pending::PendingUpgrade,
|
||||||
select::SelectUpgrade,
|
select::SelectUpgrade,
|
||||||
transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint},
|
transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint},
|
||||||
};
|
};
|
||||||
|
76
core/src/upgrade/pending.rs
Normal file
76
core/src/upgrade/pending.rs
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
// Copyright 2022 Protocol Labs.
|
||||||
|
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
|
||||||
|
use futures::future;
|
||||||
|
use std::iter;
|
||||||
|
use void::Void;
|
||||||
|
|
||||||
|
/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that always
|
||||||
|
/// returns a pending upgrade.
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub struct PendingUpgrade<P> {
|
||||||
|
protocol_name: P,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P> PendingUpgrade<P> {
|
||||||
|
pub fn new(protocol_name: P) -> Self {
|
||||||
|
Self { protocol_name }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P> UpgradeInfo for PendingUpgrade<P>
|
||||||
|
where
|
||||||
|
P: ProtocolName + Clone,
|
||||||
|
{
|
||||||
|
type Info = P;
|
||||||
|
type InfoIter = iter::Once<P>;
|
||||||
|
|
||||||
|
fn protocol_info(&self) -> Self::InfoIter {
|
||||||
|
iter::once(self.protocol_name.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C, P> InboundUpgrade<C> for PendingUpgrade<P>
|
||||||
|
where
|
||||||
|
P: ProtocolName + Clone,
|
||||||
|
{
|
||||||
|
type Output = Void;
|
||||||
|
type Error = Void;
|
||||||
|
type Future = future::Pending<Result<Self::Output, Self::Error>>;
|
||||||
|
|
||||||
|
fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future {
|
||||||
|
future::pending()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C, P> OutboundUpgrade<C> for PendingUpgrade<P>
|
||||||
|
where
|
||||||
|
P: ProtocolName + Clone,
|
||||||
|
{
|
||||||
|
type Output = Void;
|
||||||
|
type Error = Void;
|
||||||
|
type Future = future::Pending<Result<Self::Output, Self::Error>>;
|
||||||
|
|
||||||
|
fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future {
|
||||||
|
future::pending()
|
||||||
|
}
|
||||||
|
}
|
@ -440,3 +440,82 @@ pub enum Event<TOutboundOpenInfo, TCustom> {
|
|||||||
/// Other event.
|
/// Other event.
|
||||||
Custom(TCustom),
|
Custom(TCustom),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::handler::PendingConnectionHandler;
|
||||||
|
use quickcheck::*;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn max_negotiating_inbound_streams() {
|
||||||
|
fn prop(max_negotiating_inbound_streams: u8) {
|
||||||
|
let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into();
|
||||||
|
let mut wrapper = HandlerWrapper::new(
|
||||||
|
PeerId::random(),
|
||||||
|
ConnectedPoint::Listener {
|
||||||
|
local_addr: Multiaddr::empty(),
|
||||||
|
send_back_addr: Multiaddr::empty(),
|
||||||
|
},
|
||||||
|
PendingConnectionHandler::new("test".to_string()),
|
||||||
|
None,
|
||||||
|
max_negotiating_inbound_streams,
|
||||||
|
);
|
||||||
|
let alive_substreams_counter = Arc::new(());
|
||||||
|
|
||||||
|
for _ in 0..max_negotiating_inbound_streams {
|
||||||
|
let substream =
|
||||||
|
SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
|
||||||
|
wrapper.inject_substream(substream, SubstreamEndpoint::Listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
Arc::strong_count(&alive_substreams_counter),
|
||||||
|
max_negotiating_inbound_streams + 1,
|
||||||
|
"Expect none of the substreams up to the limit to be dropped."
|
||||||
|
);
|
||||||
|
|
||||||
|
let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
|
||||||
|
wrapper.inject_substream(substream, SubstreamEndpoint::Listener);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
Arc::strong_count(&alive_substreams_counter),
|
||||||
|
max_negotiating_inbound_streams + 1,
|
||||||
|
"Expect substream exceeding the limit to be dropped."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
QuickCheck::new().quickcheck(prop as fn(_));
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PendingSubstream(Arc<()>);
|
||||||
|
|
||||||
|
impl AsyncRead for PendingSubstream {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
_buf: &mut [u8],
|
||||||
|
) -> Poll<std::io::Result<usize>> {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWrite for PendingSubstream {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
_buf: &[u8],
|
||||||
|
) -> Poll<std::io::Result<usize>> {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -44,6 +44,7 @@ mod map_in;
|
|||||||
mod map_out;
|
mod map_out;
|
||||||
pub mod multi;
|
pub mod multi;
|
||||||
mod one_shot;
|
mod one_shot;
|
||||||
|
mod pending;
|
||||||
mod select;
|
mod select;
|
||||||
|
|
||||||
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
|
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
|
||||||
@ -56,6 +57,7 @@ pub use dummy::DummyConnectionHandler;
|
|||||||
pub use map_in::MapInEvent;
|
pub use map_in::MapInEvent;
|
||||||
pub use map_out::MapOutEvent;
|
pub use map_out::MapOutEvent;
|
||||||
pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
|
pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
|
||||||
|
pub use pending::PendingConnectionHandler;
|
||||||
pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect};
|
pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect};
|
||||||
|
|
||||||
/// A handler for a set of protocols used on a connection with a remote.
|
/// A handler for a set of protocols used on a connection with a remote.
|
||||||
|
120
swarm/src/handler/pending.rs
Normal file
120
swarm/src/handler/pending.rs
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
// Copyright 2022 Protocol Labs.
|
||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use crate::handler::{
|
||||||
|
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
|
||||||
|
SubstreamProtocol,
|
||||||
|
};
|
||||||
|
use crate::NegotiatedSubstream;
|
||||||
|
use libp2p_core::{
|
||||||
|
upgrade::{InboundUpgrade, OutboundUpgrade, PendingUpgrade},
|
||||||
|
Multiaddr,
|
||||||
|
};
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use void::Void;
|
||||||
|
|
||||||
|
/// Implementation of [`ConnectionHandler`] that returns a pending upgrade.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct PendingConnectionHandler {
|
||||||
|
protocol_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PendingConnectionHandler {
|
||||||
|
pub fn new(protocol_name: String) -> Self {
|
||||||
|
PendingConnectionHandler { protocol_name }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionHandler for PendingConnectionHandler {
|
||||||
|
type InEvent = Void;
|
||||||
|
type OutEvent = Void;
|
||||||
|
type Error = Void;
|
||||||
|
type InboundProtocol = PendingUpgrade<String>;
|
||||||
|
type OutboundProtocol = PendingUpgrade<String>;
|
||||||
|
type OutboundOpenInfo = Void;
|
||||||
|
type InboundOpenInfo = ();
|
||||||
|
|
||||||
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||||
|
SubstreamProtocol::new(PendingUpgrade::new(self.protocol_name.clone()), ())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_fully_negotiated_inbound(
|
||||||
|
&mut self,
|
||||||
|
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
|
_: Self::InboundOpenInfo,
|
||||||
|
) {
|
||||||
|
void::unreachable(protocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_fully_negotiated_outbound(
|
||||||
|
&mut self,
|
||||||
|
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
|
_info: Self::OutboundOpenInfo,
|
||||||
|
) {
|
||||||
|
void::unreachable(protocol);
|
||||||
|
#[allow(unreachable_code)]
|
||||||
|
{
|
||||||
|
void::unreachable(_info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_event(&mut self, v: Self::InEvent) {
|
||||||
|
void::unreachable(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_address_change(&mut self, _: &Multiaddr) {}
|
||||||
|
|
||||||
|
fn inject_dial_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
_: Self::OutboundOpenInfo,
|
||||||
|
_: ConnectionHandlerUpgrErr<
|
||||||
|
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_listen_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
_: Self::InboundOpenInfo,
|
||||||
|
_: ConnectionHandlerUpgrErr<
|
||||||
|
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
|
KeepAlive::No
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
) -> Poll<
|
||||||
|
ConnectionHandlerEvent<
|
||||||
|
Self::OutboundProtocol,
|
||||||
|
Self::OutboundOpenInfo,
|
||||||
|
Self::OutEvent,
|
||||||
|
Self::Error,
|
||||||
|
>,
|
||||||
|
> {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user