mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-21 22:01:34 +00:00
feat: introduce libp2p-connection-limits
connection management module
This patch deprecates the existing connection limits within `Swarm` and uses the new `NetworkBehaviour` APIs to implement it as a plugin instead. Related #2824. Pull-Request: #3386.
This commit is contained in:
23
Cargo.lock
generated
23
Cargo.lock
generated
@ -2143,7 +2143,7 @@ checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libp2p"
|
name = "libp2p"
|
||||||
version = "0.51.1"
|
version = "0.51.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-std",
|
"async-std",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@ -2156,6 +2156,7 @@ dependencies = [
|
|||||||
"getrandom 0.2.8",
|
"getrandom 0.2.8",
|
||||||
"instant",
|
"instant",
|
||||||
"libp2p-autonat",
|
"libp2p-autonat",
|
||||||
|
"libp2p-connection-limits",
|
||||||
"libp2p-core",
|
"libp2p-core",
|
||||||
"libp2p-dcutr",
|
"libp2p-dcutr",
|
||||||
"libp2p-deflate",
|
"libp2p-deflate",
|
||||||
@ -2215,6 +2216,23 @@ dependencies = [
|
|||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libp2p-connection-limits"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"async-std",
|
||||||
|
"libp2p-core",
|
||||||
|
"libp2p-identify",
|
||||||
|
"libp2p-identity",
|
||||||
|
"libp2p-ping",
|
||||||
|
"libp2p-swarm",
|
||||||
|
"libp2p-swarm-derive",
|
||||||
|
"libp2p-swarm-test",
|
||||||
|
"quickcheck-ext",
|
||||||
|
"rand 0.8.5",
|
||||||
|
"void",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libp2p-core"
|
name = "libp2p-core"
|
||||||
version = "0.39.1"
|
version = "0.39.1"
|
||||||
@ -2752,7 +2770,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libp2p-swarm"
|
name = "libp2p-swarm"
|
||||||
version = "0.42.0"
|
version = "0.42.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-std",
|
"async-std",
|
||||||
"either",
|
"either",
|
||||||
@ -2769,6 +2787,7 @@ dependencies = [
|
|||||||
"libp2p-ping",
|
"libp2p-ping",
|
||||||
"libp2p-plaintext",
|
"libp2p-plaintext",
|
||||||
"libp2p-swarm-derive",
|
"libp2p-swarm-derive",
|
||||||
|
"libp2p-swarm-test",
|
||||||
"libp2p-yamux",
|
"libp2p-yamux",
|
||||||
"log",
|
"log",
|
||||||
"quickcheck-ext",
|
"quickcheck-ext",
|
||||||
|
18
Cargo.toml
18
Cargo.toml
@ -11,43 +11,43 @@ members = [
|
|||||||
"examples/ping-example",
|
"examples/ping-example",
|
||||||
"examples/rendezvous",
|
"examples/rendezvous",
|
||||||
"identity",
|
"identity",
|
||||||
|
"interop-tests",
|
||||||
|
"misc/connection-limits",
|
||||||
|
"misc/keygen",
|
||||||
"misc/metrics",
|
"misc/metrics",
|
||||||
"misc/multistream-select",
|
"misc/multistream-select",
|
||||||
"misc/rw-stream-sink",
|
|
||||||
"misc/keygen",
|
|
||||||
"misc/quick-protobuf-codec",
|
"misc/quick-protobuf-codec",
|
||||||
"misc/quickcheck-ext",
|
"misc/quickcheck-ext",
|
||||||
|
"misc/rw-stream-sink",
|
||||||
"muxers/mplex",
|
"muxers/mplex",
|
||||||
"muxers/yamux",
|
|
||||||
"muxers/test-harness",
|
"muxers/test-harness",
|
||||||
"protocols/dcutr",
|
"muxers/yamux",
|
||||||
"protocols/autonat",
|
"protocols/autonat",
|
||||||
|
"protocols/dcutr",
|
||||||
"protocols/floodsub",
|
"protocols/floodsub",
|
||||||
"protocols/gossipsub",
|
"protocols/gossipsub",
|
||||||
"protocols/rendezvous",
|
|
||||||
"protocols/identify",
|
"protocols/identify",
|
||||||
"protocols/kad",
|
"protocols/kad",
|
||||||
"protocols/mdns",
|
"protocols/mdns",
|
||||||
"protocols/perf",
|
"protocols/perf",
|
||||||
"protocols/ping",
|
"protocols/ping",
|
||||||
"protocols/relay",
|
"protocols/relay",
|
||||||
|
"protocols/rendezvous",
|
||||||
"protocols/request-response",
|
"protocols/request-response",
|
||||||
"swarm",
|
"swarm",
|
||||||
"swarm-derive",
|
"swarm-derive",
|
||||||
"interop-tests",
|
|
||||||
"swarm-test",
|
"swarm-test",
|
||||||
"transports/deflate",
|
"transports/deflate",
|
||||||
"transports/dns",
|
"transports/dns",
|
||||||
"transports/noise",
|
"transports/noise",
|
||||||
"transports/tls",
|
|
||||||
"transports/plaintext",
|
"transports/plaintext",
|
||||||
"transports/pnet",
|
"transports/pnet",
|
||||||
"transports/quic",
|
"transports/quic",
|
||||||
"transports/tcp",
|
"transports/tcp",
|
||||||
|
"transports/tls",
|
||||||
"transports/uds",
|
"transports/uds",
|
||||||
"transports/websocket",
|
|
||||||
"transports/wasm-ext",
|
"transports/wasm-ext",
|
||||||
"transports/webrtc",
|
"transports/webrtc",
|
||||||
"interop-tests"
|
"transports/websocket",
|
||||||
]
|
]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
@ -1,3 +1,10 @@
|
|||||||
|
# 0.52.2 - unreleased
|
||||||
|
|
||||||
|
- Introduce `libp2p::connection_limits` module.
|
||||||
|
See [PR 3386].
|
||||||
|
|
||||||
|
[PR 3386]: https://github.com/libp2p/rust-libp2p/pull/3386
|
||||||
|
|
||||||
# 0.51.1
|
# 0.51.1
|
||||||
|
|
||||||
- Depend on `libp2p-tls` `v0.1.0`.
|
- Depend on `libp2p-tls` `v0.1.0`.
|
||||||
|
@ -3,7 +3,7 @@ name = "libp2p"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
rust-version = "1.65.0"
|
rust-version = "1.65.0"
|
||||||
description = "Peer-to-peer networking library"
|
description = "Peer-to-peer networking library"
|
||||||
version = "0.51.1"
|
version = "0.51.2"
|
||||||
authors = ["Parity Technologies <admin@parity.io>"]
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://github.com/libp2p/rust-libp2p"
|
repository = "https://github.com/libp2p/rust-libp2p"
|
||||||
@ -97,6 +97,7 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature
|
|||||||
instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
|
instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
|
||||||
|
|
||||||
libp2p-autonat = { version = "0.10.0", path = "../protocols/autonat", optional = true }
|
libp2p-autonat = { version = "0.10.0", path = "../protocols/autonat", optional = true }
|
||||||
|
libp2p-connection-limits = { version = "0.1.0", path = "../misc/connection-limits" }
|
||||||
libp2p-core = { version = "0.39.0", path = "../core" }
|
libp2p-core = { version = "0.39.0", path = "../core" }
|
||||||
libp2p-dcutr = { version = "0.9.0", path = "../protocols/dcutr", optional = true }
|
libp2p-dcutr = { version = "0.9.0", path = "../protocols/dcutr", optional = true }
|
||||||
libp2p-floodsub = { version = "0.42.0", path = "../protocols/floodsub", optional = true }
|
libp2p-floodsub = { version = "0.42.0", path = "../protocols/floodsub", optional = true }
|
||||||
|
@ -44,6 +44,8 @@ pub use multiaddr;
|
|||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use libp2p_autonat as autonat;
|
pub use libp2p_autonat as autonat;
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
|
pub use libp2p_connection_limits as connection_limits;
|
||||||
|
#[doc(inline)]
|
||||||
pub use libp2p_core as core;
|
pub use libp2p_core as core;
|
||||||
#[cfg(feature = "dcutr")]
|
#[cfg(feature = "dcutr")]
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
|
3
misc/connection-limits/CHANGELOG.md
Normal file
3
misc/connection-limits/CHANGELOG.md
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# 0.1.0 - unreleased
|
||||||
|
|
||||||
|
- Initial release.
|
25
misc/connection-limits/Cargo.toml
Normal file
25
misc/connection-limits/Cargo.toml
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
[package]
|
||||||
|
name = "libp2p-connection-limits"
|
||||||
|
edition = "2021"
|
||||||
|
rust-version = "1.62.0"
|
||||||
|
description = "Connection limits for libp2p."
|
||||||
|
version = "0.1.0"
|
||||||
|
license = "MIT"
|
||||||
|
repository = "https://github.com/libp2p/rust-libp2p"
|
||||||
|
keywords = ["peer-to-peer", "libp2p", "networking"]
|
||||||
|
categories = ["network-programming", "asynchronous"]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
libp2p-core = { version = "0.39.0", path = "../../core" }
|
||||||
|
libp2p-swarm = { version = "0.42.0", path = "../../swarm" }
|
||||||
|
libp2p-identity = { version = "0.1.0", path = "../../identity", features = ["peerid"] }
|
||||||
|
void = "1"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
async-std = { version = "1.12.0", features = ["attributes"] }
|
||||||
|
libp2p-identify = { path = "../../protocols/identify" }
|
||||||
|
libp2p-ping = { path = "../../protocols/ping" }
|
||||||
|
libp2p-swarm-derive = { path = "../../swarm-derive" }
|
||||||
|
libp2p-swarm-test = { path = "../../swarm-test" }
|
||||||
|
quickcheck-ext = { path = "../quickcheck-ext" }
|
||||||
|
rand = "0.8.5"
|
483
misc/connection-limits/src/lib.rs
Normal file
483
misc/connection-limits/src/lib.rs
Normal file
@ -0,0 +1,483 @@
|
|||||||
|
// Copyright 2023 Protocol Labs.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use libp2p_core::{Endpoint, Multiaddr};
|
||||||
|
use libp2p_identity::PeerId;
|
||||||
|
use libp2p_swarm::{
|
||||||
|
dummy, ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour,
|
||||||
|
NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
|
||||||
|
};
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::fmt;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use void::Void;
|
||||||
|
|
||||||
|
/// A [`NetworkBehaviour`] that enforces a set of [`ConnectionLimits`].
|
||||||
|
///
|
||||||
|
/// For these limits to take effect, this needs to be composed into the behaviour tree of your application.
|
||||||
|
///
|
||||||
|
/// If a connection is denied due to a limit, either a [`SwarmEvent::IncomingConnectionError`](libp2p_swarm::SwarmEvent::IncomingConnectionError)
|
||||||
|
/// or [`SwarmEvent::OutgoingConnectionError`](libp2p_swarm::SwarmEvent::OutgoingConnectionError) will be emitted.
|
||||||
|
/// The [`ListenError::Denied`](libp2p_swarm::ListenError::Denied) and respectively the [`DialError::Denied`](libp2p_swarm::DialError::Denied) variant
|
||||||
|
/// contain a [`ConnectionDenied`](libp2p_swarm::ConnectionDenied) type that can be downcast to [`Exceeded`] error if (and only if) **this**
|
||||||
|
/// behaviour denied the connection.
|
||||||
|
///
|
||||||
|
/// If you employ multiple [`NetworkBehaviour`]s that manage connections, it may also be a different error.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use libp2p_identify as identify;
|
||||||
|
/// # use libp2p_ping as ping;
|
||||||
|
/// # use libp2p_swarm_derive::NetworkBehaviour;
|
||||||
|
/// # use libp2p_connection_limits as connection_limits;
|
||||||
|
///
|
||||||
|
/// #[derive(NetworkBehaviour)]
|
||||||
|
/// # #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
|
||||||
|
/// struct MyBehaviour {
|
||||||
|
/// identify: identify::Behaviour,
|
||||||
|
/// ping: ping::Behaviour,
|
||||||
|
/// limits: connection_limits::Behaviour
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub struct Behaviour {
|
||||||
|
limits: ConnectionLimits,
|
||||||
|
|
||||||
|
pending_inbound_connections: HashSet<ConnectionId>,
|
||||||
|
pending_outbound_connections: HashSet<ConnectionId>,
|
||||||
|
established_inbound_connections: HashSet<ConnectionId>,
|
||||||
|
established_outbound_connections: HashSet<ConnectionId>,
|
||||||
|
established_per_peer: HashMap<PeerId, HashSet<ConnectionId>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Behaviour {
|
||||||
|
pub fn new(limits: ConnectionLimits) -> Self {
|
||||||
|
Self {
|
||||||
|
limits,
|
||||||
|
pending_inbound_connections: Default::default(),
|
||||||
|
pending_outbound_connections: Default::default(),
|
||||||
|
established_inbound_connections: Default::default(),
|
||||||
|
established_outbound_connections: Default::default(),
|
||||||
|
established_per_peer: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_limit(
|
||||||
|
&mut self,
|
||||||
|
limit: Option<u32>,
|
||||||
|
current: usize,
|
||||||
|
kind: Kind,
|
||||||
|
) -> Result<(), ConnectionDenied> {
|
||||||
|
let limit = limit.unwrap_or(u32::MAX);
|
||||||
|
let current = current as u32;
|
||||||
|
|
||||||
|
if current >= limit {
|
||||||
|
return Err(ConnectionDenied::new(Exceeded { limit, kind }));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A connection limit has been exceeded.
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct Exceeded {
|
||||||
|
limit: u32,
|
||||||
|
kind: Kind,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Exceeded {
|
||||||
|
pub fn limit(&self) -> u32 {
|
||||||
|
self.limit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Exceeded {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"connection limit exceeded: at most {} {} are allowed",
|
||||||
|
self.limit, self.kind
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
enum Kind {
|
||||||
|
PendingIncoming,
|
||||||
|
PendingOutgoing,
|
||||||
|
EstablishedIncoming,
|
||||||
|
EstablishedOutgoing,
|
||||||
|
EstablishedPerPeer,
|
||||||
|
EstablishedTotal,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Kind {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Kind::PendingIncoming => write!(f, "pending incoming connections"),
|
||||||
|
Kind::PendingOutgoing => write!(f, "pending outgoing connections"),
|
||||||
|
Kind::EstablishedIncoming => write!(f, "established incoming connections"),
|
||||||
|
Kind::EstablishedOutgoing => write!(f, "established outgoing connections"),
|
||||||
|
Kind::EstablishedPerPeer => write!(f, "established connections per peer"),
|
||||||
|
Kind::EstablishedTotal => write!(f, "established connections"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for Exceeded {}
|
||||||
|
|
||||||
|
/// The configurable connection limits.
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct ConnectionLimits {
|
||||||
|
max_pending_incoming: Option<u32>,
|
||||||
|
max_pending_outgoing: Option<u32>,
|
||||||
|
max_established_incoming: Option<u32>,
|
||||||
|
max_established_outgoing: Option<u32>,
|
||||||
|
max_established_per_peer: Option<u32>,
|
||||||
|
max_established_total: Option<u32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionLimits {
|
||||||
|
/// Configures the maximum number of concurrently incoming connections being established.
|
||||||
|
pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
|
||||||
|
self.max_pending_incoming = limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configures the maximum number of concurrently outgoing connections being established.
|
||||||
|
pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
|
||||||
|
self.max_pending_outgoing = limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configures the maximum number of concurrent established inbound connections.
|
||||||
|
pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
|
||||||
|
self.max_established_incoming = limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configures the maximum number of concurrent established outbound connections.
|
||||||
|
pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
|
||||||
|
self.max_established_outgoing = limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configures the maximum number of concurrent established connections (both
|
||||||
|
/// inbound and outbound).
|
||||||
|
///
|
||||||
|
/// Note: This should be used in conjunction with
|
||||||
|
/// [`ConnectionLimits::with_max_established_incoming`] to prevent possible
|
||||||
|
/// eclipse attacks (all connections being inbound).
|
||||||
|
pub fn with_max_established(mut self, limit: Option<u32>) -> Self {
|
||||||
|
self.max_established_total = limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configures the maximum number of concurrent established connections per peer,
|
||||||
|
/// regardless of direction (incoming or outgoing).
|
||||||
|
pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
|
||||||
|
self.max_established_per_peer = limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetworkBehaviour for Behaviour {
|
||||||
|
type ConnectionHandler = dummy::ConnectionHandler;
|
||||||
|
type OutEvent = Void;
|
||||||
|
|
||||||
|
fn handle_pending_inbound_connection(
|
||||||
|
&mut self,
|
||||||
|
connection_id: ConnectionId,
|
||||||
|
_: &Multiaddr,
|
||||||
|
_: &Multiaddr,
|
||||||
|
) -> Result<(), ConnectionDenied> {
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_pending_incoming,
|
||||||
|
self.pending_inbound_connections.len(),
|
||||||
|
Kind::PendingIncoming,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
self.pending_inbound_connections.insert(connection_id);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_established_inbound_connection(
|
||||||
|
&mut self,
|
||||||
|
connection_id: ConnectionId,
|
||||||
|
peer: PeerId,
|
||||||
|
_: &Multiaddr,
|
||||||
|
_: &Multiaddr,
|
||||||
|
) -> Result<THandler<Self>, ConnectionDenied> {
|
||||||
|
self.pending_inbound_connections.remove(&connection_id);
|
||||||
|
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_established_incoming,
|
||||||
|
self.established_inbound_connections.len(),
|
||||||
|
Kind::EstablishedIncoming,
|
||||||
|
)?;
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_established_per_peer,
|
||||||
|
self.established_per_peer
|
||||||
|
.get(&peer)
|
||||||
|
.map(|connections| connections.len())
|
||||||
|
.unwrap_or(0),
|
||||||
|
Kind::EstablishedPerPeer,
|
||||||
|
)?;
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_established_total,
|
||||||
|
self.established_inbound_connections.len()
|
||||||
|
+ self.established_outbound_connections.len(),
|
||||||
|
Kind::EstablishedTotal,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
self.established_inbound_connections.insert(connection_id);
|
||||||
|
self.established_per_peer
|
||||||
|
.entry(peer)
|
||||||
|
.or_default()
|
||||||
|
.insert(connection_id);
|
||||||
|
|
||||||
|
Ok(dummy::ConnectionHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_pending_outbound_connection(
|
||||||
|
&mut self,
|
||||||
|
connection_id: ConnectionId,
|
||||||
|
_: Option<PeerId>,
|
||||||
|
_: &[Multiaddr],
|
||||||
|
_: Endpoint,
|
||||||
|
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_pending_outgoing,
|
||||||
|
self.pending_outbound_connections.len(),
|
||||||
|
Kind::PendingOutgoing,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
self.pending_outbound_connections.insert(connection_id);
|
||||||
|
|
||||||
|
Ok(vec![])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_established_outbound_connection(
|
||||||
|
&mut self,
|
||||||
|
connection_id: ConnectionId,
|
||||||
|
peer: PeerId,
|
||||||
|
_: &Multiaddr,
|
||||||
|
_: Endpoint,
|
||||||
|
) -> Result<THandler<Self>, ConnectionDenied> {
|
||||||
|
self.pending_outbound_connections.remove(&connection_id);
|
||||||
|
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_established_outgoing,
|
||||||
|
self.established_outbound_connections.len(),
|
||||||
|
Kind::EstablishedOutgoing,
|
||||||
|
)?;
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_established_per_peer,
|
||||||
|
self.established_per_peer
|
||||||
|
.get(&peer)
|
||||||
|
.map(|connections| connections.len())
|
||||||
|
.unwrap_or(0),
|
||||||
|
Kind::EstablishedPerPeer,
|
||||||
|
)?;
|
||||||
|
self.check_limit(
|
||||||
|
self.limits.max_established_total,
|
||||||
|
self.established_inbound_connections.len()
|
||||||
|
+ self.established_outbound_connections.len(),
|
||||||
|
Kind::EstablishedTotal,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
self.established_outbound_connections.insert(connection_id);
|
||||||
|
self.established_per_peer
|
||||||
|
.entry(peer)
|
||||||
|
.or_default()
|
||||||
|
.insert(connection_id);
|
||||||
|
|
||||||
|
Ok(dummy::ConnectionHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
|
||||||
|
match event {
|
||||||
|
FromSwarm::ConnectionClosed(ConnectionClosed {
|
||||||
|
peer_id,
|
||||||
|
connection_id,
|
||||||
|
..
|
||||||
|
}) => {
|
||||||
|
self.established_inbound_connections.remove(&connection_id);
|
||||||
|
self.established_outbound_connections.remove(&connection_id);
|
||||||
|
self.established_per_peer
|
||||||
|
.entry(peer_id)
|
||||||
|
.or_default()
|
||||||
|
.remove(&connection_id);
|
||||||
|
}
|
||||||
|
FromSwarm::ConnectionEstablished(_) => {}
|
||||||
|
FromSwarm::AddressChange(_) => {}
|
||||||
|
FromSwarm::DialFailure(_) => {}
|
||||||
|
FromSwarm::ListenFailure(_) => {}
|
||||||
|
FromSwarm::NewListener(_) => {}
|
||||||
|
FromSwarm::NewListenAddr(_) => {}
|
||||||
|
FromSwarm::ExpiredListenAddr(_) => {}
|
||||||
|
FromSwarm::ListenerError(_) => {}
|
||||||
|
FromSwarm::ListenerClosed(_) => {}
|
||||||
|
FromSwarm::NewExternalAddr(_) => {}
|
||||||
|
FromSwarm::ExpiredExternalAddr(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_connection_handler_event(
|
||||||
|
&mut self,
|
||||||
|
_id: PeerId,
|
||||||
|
_: ConnectionId,
|
||||||
|
event: THandlerOutEvent<Self>,
|
||||||
|
) {
|
||||||
|
void::unreachable(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
_: &mut impl PollParameters,
|
||||||
|
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use libp2p_swarm::{dial_opts::DialOpts, DialError, ListenError, Swarm, SwarmEvent};
|
||||||
|
use libp2p_swarm_test::SwarmExt;
|
||||||
|
use quickcheck_ext::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn max_outgoing() {
|
||||||
|
use rand::Rng;
|
||||||
|
|
||||||
|
let outgoing_limit = rand::thread_rng().gen_range(1..10);
|
||||||
|
|
||||||
|
let mut network = Swarm::new_ephemeral(|_| {
|
||||||
|
Behaviour::new(
|
||||||
|
ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
let addr: Multiaddr = "/memory/1234".parse().unwrap();
|
||||||
|
let target = PeerId::random();
|
||||||
|
|
||||||
|
for _ in 0..outgoing_limit {
|
||||||
|
network
|
||||||
|
.dial(
|
||||||
|
DialOpts::peer_id(target)
|
||||||
|
.addresses(vec![addr.clone()])
|
||||||
|
.build(),
|
||||||
|
)
|
||||||
|
.expect("Unexpected connection limit.");
|
||||||
|
}
|
||||||
|
|
||||||
|
match network
|
||||||
|
.dial(DialOpts::peer_id(target).addresses(vec![addr]).build())
|
||||||
|
.expect_err("Unexpected dialing success.")
|
||||||
|
{
|
||||||
|
DialError::Denied { cause } => {
|
||||||
|
let exceeded = cause
|
||||||
|
.downcast::<Exceeded>()
|
||||||
|
.expect("connection denied because of limit");
|
||||||
|
|
||||||
|
assert_eq!(exceeded.limit(), outgoing_limit);
|
||||||
|
}
|
||||||
|
e => panic!("Unexpected error: {e:?}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
let info = network.network_info();
|
||||||
|
assert_eq!(info.num_peers(), 0);
|
||||||
|
assert_eq!(
|
||||||
|
info.connection_counters().num_pending_outgoing(),
|
||||||
|
outgoing_limit
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn max_established_incoming() {
|
||||||
|
fn prop(Limit(limit): Limit) {
|
||||||
|
let mut swarm1 = Swarm::new_ephemeral(|_| {
|
||||||
|
Behaviour::new(
|
||||||
|
ConnectionLimits::default().with_max_established_incoming(Some(limit)),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
let mut swarm2 = Swarm::new_ephemeral(|_| {
|
||||||
|
Behaviour::new(
|
||||||
|
ConnectionLimits::default().with_max_established_incoming(Some(limit)),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
async_std::task::block_on(async {
|
||||||
|
let (listen_addr, _) = swarm1.listen().await;
|
||||||
|
|
||||||
|
for _ in 0..limit {
|
||||||
|
swarm2.connect(&mut swarm1).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
swarm2.dial(listen_addr).unwrap();
|
||||||
|
|
||||||
|
async_std::task::spawn(swarm2.loop_on_next());
|
||||||
|
|
||||||
|
let cause = swarm1
|
||||||
|
.wait(|event| match event {
|
||||||
|
SwarmEvent::IncomingConnectionError {
|
||||||
|
error: ListenError::Denied { cause },
|
||||||
|
..
|
||||||
|
} => Some(cause),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert_eq!(cause.downcast::<Exceeded>().unwrap().limit, limit);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct Limit(u32);
|
||||||
|
|
||||||
|
impl Arbitrary for Limit {
|
||||||
|
fn arbitrary(g: &mut Gen) -> Self {
|
||||||
|
Self(g.gen_range(1..10))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
quickcheck(prop as fn(_));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(libp2p_swarm_derive::NetworkBehaviour)]
|
||||||
|
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
|
||||||
|
struct Behaviour {
|
||||||
|
limits: super::Behaviour,
|
||||||
|
keep_alive: libp2p_swarm::keep_alive::Behaviour,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Behaviour {
|
||||||
|
fn new(limits: ConnectionLimits) -> Self {
|
||||||
|
Self {
|
||||||
|
limits: super::Behaviour::new(limits),
|
||||||
|
keep_alive: libp2p_swarm::keep_alive::Behaviour,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -225,6 +225,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
|
|||||||
}
|
}
|
||||||
|
|
||||||
libp2p_swarm::DialError::Banned => record(OutgoingConnectionError::Banned),
|
libp2p_swarm::DialError::Banned => record(OutgoingConnectionError::Banned),
|
||||||
|
#[allow(deprecated)]
|
||||||
libp2p_swarm::DialError::ConnectionLimit(_) => {
|
libp2p_swarm::DialError::ConnectionLimit(_) => {
|
||||||
record(OutgoingConnectionError::ConnectionLimit)
|
record(OutgoingConnectionError::ConnectionLimit)
|
||||||
}
|
}
|
||||||
@ -371,6 +372,7 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
|
|||||||
fn from(error: &libp2p_swarm::ListenError) -> Self {
|
fn from(error: &libp2p_swarm::ListenError) -> Self {
|
||||||
match error {
|
match error {
|
||||||
libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
|
libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
|
||||||
|
#[allow(deprecated)]
|
||||||
libp2p_swarm::ListenError::ConnectionLimit(_) => {
|
libp2p_swarm::ListenError::ConnectionLimit(_) => {
|
||||||
IncomingConnectionError::ConnectionLimit
|
IncomingConnectionError::ConnectionLimit
|
||||||
}
|
}
|
||||||
|
@ -1924,7 +1924,6 @@ where
|
|||||||
|
|
||||||
match error {
|
match error {
|
||||||
DialError::Banned
|
DialError::Banned
|
||||||
| DialError::ConnectionLimit(_)
|
|
||||||
| DialError::LocalPeerId { .. }
|
| DialError::LocalPeerId { .. }
|
||||||
| DialError::InvalidPeerId { .. }
|
| DialError::InvalidPeerId { .. }
|
||||||
| DialError::WrongPeerId { .. }
|
| DialError::WrongPeerId { .. }
|
||||||
@ -1951,6 +1950,8 @@ where
|
|||||||
DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
|
DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
|
||||||
unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
|
unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
|
||||||
}
|
}
|
||||||
|
#[allow(deprecated)]
|
||||||
|
DialError::ConnectionLimit(_) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ use libp2p_core::{
|
|||||||
};
|
};
|
||||||
use libp2p_identity::PeerId;
|
use libp2p_identity::PeerId;
|
||||||
use libp2p_plaintext::PlainText2Config;
|
use libp2p_plaintext::PlainText2Config;
|
||||||
|
use libp2p_swarm::dial_opts::PeerCondition;
|
||||||
use libp2p_swarm::{
|
use libp2p_swarm::{
|
||||||
dial_opts::DialOpts, AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent,
|
dial_opts::DialOpts, AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent,
|
||||||
THandlerErr,
|
THandlerErr,
|
||||||
@ -235,6 +236,7 @@ where
|
|||||||
|
|
||||||
let dial_opts = DialOpts::peer_id(*other.local_peer_id())
|
let dial_opts = DialOpts::peer_id(*other.local_peer_id())
|
||||||
.addresses(external_addresses)
|
.addresses(external_addresses)
|
||||||
|
.condition(PeerCondition::Always)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
self.dial(dial_opts).unwrap();
|
self.dial(dial_opts).unwrap();
|
||||||
|
@ -1,3 +1,10 @@
|
|||||||
|
# 0.42.1 [unreleased]
|
||||||
|
|
||||||
|
- Deprecate `ConnectionLimits` in favor of `libp2p::connection_limits`.
|
||||||
|
See [PR 3386].
|
||||||
|
|
||||||
|
[PR 3386]: https://github.com/libp2p/rust-libp2p/pull/3386
|
||||||
|
|
||||||
# 0.42.0
|
# 0.42.0
|
||||||
|
|
||||||
- Allow `NetworkBehaviour`s to manage connections.
|
- Allow `NetworkBehaviour`s to manage connections.
|
||||||
|
@ -3,7 +3,7 @@ name = "libp2p-swarm"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
rust-version = "1.62.0"
|
rust-version = "1.62.0"
|
||||||
description = "The libp2p swarm"
|
description = "The libp2p swarm"
|
||||||
version = "0.42.0"
|
version = "0.42.1"
|
||||||
authors = ["Parity Technologies <admin@parity.io>"]
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://github.com/libp2p/rust-libp2p"
|
repository = "https://github.com/libp2p/rust-libp2p"
|
||||||
@ -47,6 +47,7 @@ libp2p-kad = { path = "../protocols/kad" }
|
|||||||
libp2p-ping = { path = "../protocols/ping" }
|
libp2p-ping = { path = "../protocols/ping" }
|
||||||
libp2p-plaintext = { path = "../transports/plaintext" }
|
libp2p-plaintext = { path = "../transports/plaintext" }
|
||||||
libp2p-swarm-derive = { path = "../swarm-derive" }
|
libp2p-swarm-derive = { path = "../swarm-derive" }
|
||||||
|
libp2p-swarm-test = { path = "../swarm-test" }
|
||||||
libp2p-yamux = { path = "../muxers/yamux" }
|
libp2p-yamux = { path = "../muxers/yamux" }
|
||||||
quickcheck = { package = "quickcheck-ext", path = "../misc/quickcheck-ext" }
|
quickcheck = { package = "quickcheck-ext", path = "../misc/quickcheck-ext" }
|
||||||
void = "1"
|
void = "1"
|
||||||
|
@ -377,6 +377,7 @@ impl<'a> IncomingInfo<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Information about a connection limit.
|
/// Information about a connection limit.
|
||||||
|
#[deprecated(note = "Use `libp2p::connection_limits` instead.", since = "0.42.1")]
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub struct ConnectionLimit {
|
pub struct ConnectionLimit {
|
||||||
/// The maximum number of connections.
|
/// The maximum number of connections.
|
||||||
@ -385,6 +386,7 @@ pub struct ConnectionLimit {
|
|||||||
pub current: u32,
|
pub current: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
impl fmt::Display for ConnectionLimit {
|
impl fmt::Display for ConnectionLimit {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
write!(
|
write!(
|
||||||
@ -396,6 +398,7 @@ impl fmt::Display for ConnectionLimit {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A `ConnectionLimit` can represent an error if it has been exceeded.
|
/// A `ConnectionLimit` can represent an error if it has been exceeded.
|
||||||
|
#[allow(deprecated)]
|
||||||
impl std::error::Error for ConnectionLimit {}
|
impl std::error::Error for ConnectionLimit {}
|
||||||
|
|
||||||
struct SubstreamUpgrade<UserData, Upgrade> {
|
struct SubstreamUpgrade<UserData, Upgrade> {
|
||||||
|
@ -18,9 +18,11 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
|
use crate::connection::ConnectionLimit;
|
||||||
use crate::transport::TransportError;
|
use crate::transport::TransportError;
|
||||||
use crate::Multiaddr;
|
use crate::Multiaddr;
|
||||||
use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId};
|
use crate::{ConnectedPoint, PeerId};
|
||||||
use std::{fmt, io};
|
use std::{fmt, io};
|
||||||
|
|
||||||
/// Errors that can occur in the context of an established `Connection`.
|
/// Errors that can occur in the context of an established `Connection`.
|
||||||
@ -90,6 +92,11 @@ pub enum PendingConnectionError<TTransErr> {
|
|||||||
|
|
||||||
/// The connection was dropped because the connection limit
|
/// The connection was dropped because the connection limit
|
||||||
/// for a peer has been reached.
|
/// for a peer has been reached.
|
||||||
|
#[deprecated(
|
||||||
|
note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.",
|
||||||
|
since = "0.42.1"
|
||||||
|
)]
|
||||||
|
#[allow(deprecated)]
|
||||||
ConnectionLimit(ConnectionLimit),
|
ConnectionLimit(ConnectionLimit),
|
||||||
|
|
||||||
/// Pending connection attempt has been aborted.
|
/// Pending connection attempt has been aborted.
|
||||||
@ -110,6 +117,7 @@ impl<T> PendingConnectionError<T> {
|
|||||||
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PendingConnectionError<U> {
|
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PendingConnectionError<U> {
|
||||||
match self {
|
match self {
|
||||||
PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)),
|
PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)),
|
||||||
|
#[allow(deprecated)]
|
||||||
PendingConnectionError::ConnectionLimit(l) => {
|
PendingConnectionError::ConnectionLimit(l) => {
|
||||||
PendingConnectionError::ConnectionLimit(l)
|
PendingConnectionError::ConnectionLimit(l)
|
||||||
}
|
}
|
||||||
@ -137,6 +145,7 @@ where
|
|||||||
"Pending connection: Transport error on connection: {err}"
|
"Pending connection: Transport error on connection: {err}"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
#[allow(deprecated)]
|
||||||
PendingConnectionError::ConnectionLimit(l) => {
|
PendingConnectionError::ConnectionLimit(l) => {
|
||||||
write!(f, "Connection error: Connection limit: {l}.")
|
write!(f, "Connection error: Connection limit: {l}.")
|
||||||
}
|
}
|
||||||
@ -163,6 +172,7 @@ where
|
|||||||
PendingConnectionError::WrongPeerId { .. } => None,
|
PendingConnectionError::WrongPeerId { .. } => None,
|
||||||
PendingConnectionError::LocalPeerId { .. } => None,
|
PendingConnectionError::LocalPeerId { .. } => None,
|
||||||
PendingConnectionError::Aborted => None,
|
PendingConnectionError::Aborted => None,
|
||||||
|
#[allow(deprecated)]
|
||||||
PendingConnectionError::ConnectionLimit(..) => None,
|
PendingConnectionError::ConnectionLimit(..) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,12 +18,13 @@
|
|||||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
use crate::connection::{Connection, ConnectionId, PendingPoint};
|
#[allow(deprecated)]
|
||||||
|
use crate::connection::{Connection, ConnectionId, ConnectionLimit, PendingPoint};
|
||||||
#[allow(deprecated)]
|
#[allow(deprecated)]
|
||||||
use crate::IntoConnectionHandler;
|
use crate::IntoConnectionHandler;
|
||||||
use crate::{
|
use crate::{
|
||||||
connection::{
|
connection::{
|
||||||
Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError,
|
Connected, ConnectionError, IncomingInfo, PendingConnectionError,
|
||||||
PendingInboundConnectionError, PendingOutboundConnectionError,
|
PendingInboundConnectionError, PendingOutboundConnectionError,
|
||||||
},
|
},
|
||||||
transport::TransportError,
|
transport::TransportError,
|
||||||
@ -304,6 +305,7 @@ where
|
|||||||
THandler: ConnectionHandler,
|
THandler: ConnectionHandler,
|
||||||
{
|
{
|
||||||
/// Creates a new empty `Pool`.
|
/// Creates a new empty `Pool`.
|
||||||
|
#[allow(deprecated)]
|
||||||
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
|
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
|
||||||
let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
|
let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
|
||||||
let executor = match config.executor {
|
let executor = match config.executor {
|
||||||
@ -407,6 +409,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Returns an error if the limit of pending outgoing connections
|
/// Returns an error if the limit of pending outgoing connections
|
||||||
/// has been reached.
|
/// has been reached.
|
||||||
|
#[allow(deprecated)]
|
||||||
pub fn add_outgoing(
|
pub fn add_outgoing(
|
||||||
&mut self,
|
&mut self,
|
||||||
dials: Vec<
|
dials: Vec<
|
||||||
@ -461,6 +464,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Returns an error if the limit of pending incoming connections
|
/// Returns an error if the limit of pending incoming connections
|
||||||
/// has been reached.
|
/// has been reached.
|
||||||
|
#[allow(deprecated)]
|
||||||
pub fn add_incoming<TFut>(
|
pub fn add_incoming<TFut>(
|
||||||
&mut self,
|
&mut self,
|
||||||
future: TFut,
|
future: TFut,
|
||||||
@ -679,6 +683,8 @@ where
|
|||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
|
// Remove once `PendingConnectionError::ConnectionLimit` is gone.
|
||||||
let error = self
|
let error = self
|
||||||
.counters
|
.counters
|
||||||
// Check general established connection limit.
|
// Check general established connection limit.
|
||||||
@ -865,6 +871,7 @@ impl Drop for NewConnection {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct ConnectionCounters {
|
pub struct ConnectionCounters {
|
||||||
/// The effective connection limits.
|
/// The effective connection limits.
|
||||||
|
#[allow(deprecated)]
|
||||||
limits: ConnectionLimits,
|
limits: ConnectionLimits,
|
||||||
/// The current number of incoming connections.
|
/// The current number of incoming connections.
|
||||||
pending_incoming: u32,
|
pending_incoming: u32,
|
||||||
@ -877,6 +884,7 @@ pub struct ConnectionCounters {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectionCounters {
|
impl ConnectionCounters {
|
||||||
|
#[allow(deprecated)]
|
||||||
fn new(limits: ConnectionLimits) -> Self {
|
fn new(limits: ConnectionLimits) -> Self {
|
||||||
Self {
|
Self {
|
||||||
limits,
|
limits,
|
||||||
@ -888,6 +896,8 @@ impl ConnectionCounters {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The effective connection limits.
|
/// The effective connection limits.
|
||||||
|
#[deprecated(note = "Use the `libp2p::connection_limits` instead.")]
|
||||||
|
#[allow(deprecated)]
|
||||||
pub fn limits(&self) -> &ConnectionLimits {
|
pub fn limits(&self) -> &ConnectionLimits {
|
||||||
&self.limits
|
&self.limits
|
||||||
}
|
}
|
||||||
@ -975,14 +985,17 @@ impl ConnectionCounters {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
|
fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
|
||||||
Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
|
Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
|
fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
|
||||||
Self::check(self.pending_incoming, self.limits.max_pending_incoming)
|
Self::check(self.pending_incoming, self.limits.max_pending_incoming)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> {
|
fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> {
|
||||||
// Check total connection limit.
|
// Check total connection limit.
|
||||||
Self::check(self.num_established(), self.limits.max_established_total)?;
|
Self::check(self.num_established(), self.limits.max_established_total)?;
|
||||||
@ -999,10 +1012,12 @@ impl ConnectionCounters {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
|
fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
|
||||||
Self::check(current, self.limits.max_established_per_peer)
|
Self::check(current, self.limits.max_established_per_peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
|
fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
|
||||||
if let Some(limit) = limit {
|
if let Some(limit) = limit {
|
||||||
if current >= limit {
|
if current >= limit {
|
||||||
@ -1027,6 +1042,7 @@ fn num_peer_established<TInEvent>(
|
|||||||
///
|
///
|
||||||
/// By default no connection limits apply.
|
/// By default no connection limits apply.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
|
#[deprecated(note = "Use `libp2p::connection_limits` instead.", since = "0.42.1")]
|
||||||
pub struct ConnectionLimits {
|
pub struct ConnectionLimits {
|
||||||
max_pending_incoming: Option<u32>,
|
max_pending_incoming: Option<u32>,
|
||||||
max_pending_outgoing: Option<u32>,
|
max_pending_outgoing: Option<u32>,
|
||||||
@ -1036,6 +1052,7 @@ pub struct ConnectionLimits {
|
|||||||
max_established_total: Option<u32>,
|
max_established_total: Option<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
impl ConnectionLimits {
|
impl ConnectionLimits {
|
||||||
/// Configures the maximum number of concurrently incoming connections being established.
|
/// Configures the maximum number of concurrently incoming connections being established.
|
||||||
pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
|
pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
|
||||||
|
@ -104,14 +104,17 @@ pub mod derive_prelude {
|
|||||||
pub use libp2p_identity::PeerId;
|
pub use libp2p_identity::PeerId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
|
pub use crate::connection::ConnectionLimit;
|
||||||
pub use behaviour::{
|
pub use behaviour::{
|
||||||
AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredExternalAddr,
|
AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredExternalAddr,
|
||||||
ExpiredListenAddr, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
|
ExpiredListenAddr, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
|
||||||
ListenerClosed, ListenerError, NetworkBehaviour, NetworkBehaviourAction, NewExternalAddr,
|
ListenerClosed, ListenerError, NetworkBehaviour, NetworkBehaviourAction, NewExternalAddr,
|
||||||
NewListenAddr, NotifyHandler, PollParameters,
|
NewListenAddr, NotifyHandler, PollParameters,
|
||||||
};
|
};
|
||||||
|
#[allow(deprecated)]
|
||||||
pub use connection::pool::{ConnectionCounters, ConnectionLimits};
|
pub use connection::pool::{ConnectionCounters, ConnectionLimits};
|
||||||
pub use connection::{ConnectionError, ConnectionId, ConnectionLimit};
|
pub use connection::{ConnectionError, ConnectionId};
|
||||||
pub use executor::Executor;
|
pub use executor::Executor;
|
||||||
#[allow(deprecated)]
|
#[allow(deprecated)]
|
||||||
pub use handler::IntoConnectionHandler;
|
pub use handler::IntoConnectionHandler;
|
||||||
@ -651,6 +654,7 @@ where
|
|||||||
) {
|
) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(connection_limit) => {
|
Err(connection_limit) => {
|
||||||
|
#[allow(deprecated)]
|
||||||
let error = DialError::ConnectionLimit(connection_limit);
|
let error = DialError::ConnectionLimit(connection_limit);
|
||||||
self.behaviour
|
self.behaviour
|
||||||
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
|
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
|
||||||
@ -1089,6 +1093,7 @@ where
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(connection_limit) => {
|
Err(connection_limit) => {
|
||||||
|
#[allow(deprecated)]
|
||||||
let error = ListenError::ConnectionLimit(connection_limit);
|
let error = ListenError::ConnectionLimit(connection_limit);
|
||||||
self.behaviour
|
self.behaviour
|
||||||
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
|
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
|
||||||
@ -1511,6 +1516,7 @@ pub struct SwarmBuilder<TBehaviour> {
|
|||||||
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
|
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
|
||||||
behaviour: TBehaviour,
|
behaviour: TBehaviour,
|
||||||
pool_config: PoolConfig,
|
pool_config: PoolConfig,
|
||||||
|
#[allow(deprecated)]
|
||||||
connection_limits: ConnectionLimits,
|
connection_limits: ConnectionLimits,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1655,6 +1661,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Configures the connection limits.
|
/// Configures the connection limits.
|
||||||
|
#[allow(deprecated)]
|
||||||
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
|
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
|
||||||
self.connection_limits = limits;
|
self.connection_limits = limits;
|
||||||
self
|
self
|
||||||
@ -1712,6 +1719,11 @@ pub enum DialError {
|
|||||||
Banned,
|
Banned,
|
||||||
/// The configured limit for simultaneous outgoing connections
|
/// The configured limit for simultaneous outgoing connections
|
||||||
/// has been reached.
|
/// has been reached.
|
||||||
|
#[deprecated(
|
||||||
|
note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.",
|
||||||
|
since = "0.42.1"
|
||||||
|
)]
|
||||||
|
#[allow(deprecated)]
|
||||||
ConnectionLimit(ConnectionLimit),
|
ConnectionLimit(ConnectionLimit),
|
||||||
/// The peer identity obtained on the connection matches the local peer.
|
/// The peer identity obtained on the connection matches the local peer.
|
||||||
LocalPeerId {
|
LocalPeerId {
|
||||||
@ -1742,6 +1754,7 @@ pub enum DialError {
|
|||||||
impl From<PendingOutboundConnectionError> for DialError {
|
impl From<PendingOutboundConnectionError> for DialError {
|
||||||
fn from(error: PendingOutboundConnectionError) -> Self {
|
fn from(error: PendingOutboundConnectionError) -> Self {
|
||||||
match error {
|
match error {
|
||||||
|
#[allow(deprecated)]
|
||||||
PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit),
|
PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit),
|
||||||
PendingConnectionError::Aborted => DialError::Aborted,
|
PendingConnectionError::Aborted => DialError::Aborted,
|
||||||
PendingConnectionError::WrongPeerId { obtained, endpoint } => {
|
PendingConnectionError::WrongPeerId { obtained, endpoint } => {
|
||||||
@ -1756,6 +1769,7 @@ impl From<PendingOutboundConnectionError> for DialError {
|
|||||||
impl fmt::Display for DialError {
|
impl fmt::Display for DialError {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
|
#[allow(deprecated)]
|
||||||
DialError::ConnectionLimit(err) => write!(f, "Dial error: {err}"),
|
DialError::ConnectionLimit(err) => write!(f, "Dial error: {err}"),
|
||||||
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
|
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
|
||||||
DialError::LocalPeerId { endpoint } => write!(
|
DialError::LocalPeerId { endpoint } => write!(
|
||||||
@ -1809,6 +1823,7 @@ fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::R
|
|||||||
impl error::Error for DialError {
|
impl error::Error for DialError {
|
||||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
|
#[allow(deprecated)]
|
||||||
DialError::ConnectionLimit(err) => Some(err),
|
DialError::ConnectionLimit(err) => Some(err),
|
||||||
DialError::LocalPeerId { .. } => None,
|
DialError::LocalPeerId { .. } => None,
|
||||||
DialError::NoAddresses => None,
|
DialError::NoAddresses => None,
|
||||||
@ -1828,6 +1843,11 @@ impl error::Error for DialError {
|
|||||||
pub enum ListenError {
|
pub enum ListenError {
|
||||||
/// The configured limit for simultaneous outgoing connections
|
/// The configured limit for simultaneous outgoing connections
|
||||||
/// has been reached.
|
/// has been reached.
|
||||||
|
#[deprecated(
|
||||||
|
note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.",
|
||||||
|
since = "0.42.1"
|
||||||
|
)]
|
||||||
|
#[allow(deprecated)]
|
||||||
ConnectionLimit(ConnectionLimit),
|
ConnectionLimit(ConnectionLimit),
|
||||||
/// Pending connection attempt has been aborted.
|
/// Pending connection attempt has been aborted.
|
||||||
Aborted,
|
Aborted,
|
||||||
@ -1851,6 +1871,7 @@ impl From<PendingInboundConnectionError> for ListenError {
|
|||||||
fn from(error: PendingInboundConnectionError) -> Self {
|
fn from(error: PendingInboundConnectionError) -> Self {
|
||||||
match error {
|
match error {
|
||||||
PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
|
PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
|
||||||
|
#[allow(deprecated)]
|
||||||
PendingInboundConnectionError::ConnectionLimit(inner) => {
|
PendingInboundConnectionError::ConnectionLimit(inner) => {
|
||||||
ListenError::ConnectionLimit(inner)
|
ListenError::ConnectionLimit(inner)
|
||||||
}
|
}
|
||||||
@ -1868,6 +1889,7 @@ impl From<PendingInboundConnectionError> for ListenError {
|
|||||||
impl fmt::Display for ListenError {
|
impl fmt::Display for ListenError {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
|
#[allow(deprecated)]
|
||||||
ListenError::ConnectionLimit(_) => write!(f, "Listen error"),
|
ListenError::ConnectionLimit(_) => write!(f, "Listen error"),
|
||||||
ListenError::Aborted => write!(
|
ListenError::Aborted => write!(
|
||||||
f,
|
f,
|
||||||
@ -1893,6 +1915,7 @@ impl fmt::Display for ListenError {
|
|||||||
impl error::Error for ListenError {
|
impl error::Error for ListenError {
|
||||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
|
#[allow(deprecated)]
|
||||||
ListenError::ConnectionLimit(err) => Some(err),
|
ListenError::ConnectionLimit(err) => Some(err),
|
||||||
ListenError::WrongPeerId { .. } => None,
|
ListenError::WrongPeerId { .. } => None,
|
||||||
ListenError::Transport(err) => Some(err),
|
ListenError::Transport(err) => Some(err),
|
||||||
@ -1914,6 +1937,19 @@ impl ConnectionDenied {
|
|||||||
inner: Box::new(cause),
|
inner: Box::new(cause),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempt to downcast to a particular reason for why the connection was denied.
|
||||||
|
pub fn downcast<E>(self) -> Result<E, Self>
|
||||||
|
where
|
||||||
|
E: error::Error + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let inner = self
|
||||||
|
.inner
|
||||||
|
.downcast::<E>()
|
||||||
|
.map_err(|inner| ConnectionDenied { inner })?;
|
||||||
|
|
||||||
|
Ok(*inner)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for ConnectionDenied {
|
impl fmt::Display for ConnectionDenied {
|
||||||
@ -2485,6 +2521,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[allow(deprecated)]
|
||||||
fn max_outgoing() {
|
fn max_outgoing() {
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
|
||||||
@ -2512,6 +2549,7 @@ mod tests {
|
|||||||
.dial(DialOpts::peer_id(target).addresses(vec![addr]).build())
|
.dial(DialOpts::peer_id(target).addresses(vec![addr]).build())
|
||||||
.expect_err("Unexpected dialing success.")
|
.expect_err("Unexpected dialing success.")
|
||||||
{
|
{
|
||||||
|
#[allow(deprecated)]
|
||||||
DialError::ConnectionLimit(limit) => {
|
DialError::ConnectionLimit(limit) => {
|
||||||
assert_eq!(limit.current, outgoing_limit);
|
assert_eq!(limit.current, outgoing_limit);
|
||||||
assert_eq!(limit.limit, outgoing_limit);
|
assert_eq!(limit.limit, outgoing_limit);
|
||||||
@ -2538,6 +2576,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
fn limits(limit: u32) -> ConnectionLimits {
|
fn limits(limit: u32) -> ConnectionLimits {
|
||||||
ConnectionLimits::default().with_max_established_incoming(Some(limit))
|
ConnectionLimits::default().with_max_established_incoming(Some(limit))
|
||||||
}
|
}
|
||||||
@ -2545,9 +2584,11 @@ mod tests {
|
|||||||
fn prop(limit: Limit) {
|
fn prop(limit: Limit) {
|
||||||
let limit = limit.0;
|
let limit = limit.0;
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
let mut network1 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
|
let mut network1 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
|
||||||
.connection_limits(limits(limit))
|
.connection_limits(limits(limit))
|
||||||
.build();
|
.build();
|
||||||
|
#[allow(deprecated)]
|
||||||
let mut network2 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
|
let mut network2 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
|
||||||
.connection_limits(limits(limit))
|
.connection_limits(limits(limit))
|
||||||
.build();
|
.build();
|
||||||
@ -2580,6 +2621,7 @@ mod tests {
|
|||||||
Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
|
Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
|
||||||
network_1_established = true;
|
network_1_established = true;
|
||||||
}
|
}
|
||||||
|
#[allow(deprecated)]
|
||||||
Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
|
Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
|
||||||
error: ListenError::ConnectionLimit(err),
|
error: ListenError::ConnectionLimit(err),
|
||||||
..
|
..
|
||||||
|
Reference in New Issue
Block a user