feat(swarm): remove deprecated connection limits

Users are encouraged to use `libp2p::connection_limit::Behaviour` which is a one-to-one replacement for this functionality but built as a `NetworkBehaviour`.

Related: #3647.

Pull-Request: #3885.
This commit is contained in:
Thomas Eizinger
2023-05-08 12:54:53 +02:00
committed by GitHub
parent f08055e59b
commit b8a7684153
7 changed files with 48 additions and 510 deletions

View File

@ -214,10 +214,6 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
}; };
} }
} }
#[allow(deprecated)]
libp2p_swarm::DialError::ConnectionLimit(_) => {
record(OutgoingConnectionError::ConnectionLimit)
}
libp2p_swarm::DialError::LocalPeerId { .. } => { libp2p_swarm::DialError::LocalPeerId { .. } => {
record(OutgoingConnectionError::LocalPeerId) record(OutgoingConnectionError::LocalPeerId)
} }
@ -320,7 +316,6 @@ enum PeerStatus {
#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)] #[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum OutgoingConnectionError { enum OutgoingConnectionError {
ConnectionLimit,
LocalPeerId, LocalPeerId,
NoAddresses, NoAddresses,
DialPeerConditionFalse, DialPeerConditionFalse,
@ -345,7 +340,6 @@ enum IncomingConnectionError {
TransportErrorMultiaddrNotSupported, TransportErrorMultiaddrNotSupported,
TransportErrorOther, TransportErrorOther,
Aborted, Aborted,
ConnectionLimit,
Denied, Denied,
} }
@ -353,10 +347,6 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
fn from(error: &libp2p_swarm::ListenError) -> Self { fn from(error: &libp2p_swarm::ListenError) -> Self {
match error { match error {
libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId, libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
#[allow(deprecated)]
libp2p_swarm::ListenError::ConnectionLimit(_) => {
IncomingConnectionError::ConnectionLimit
}
libp2p_swarm::ListenError::LocalPeerId { .. } => IncomingConnectionError::LocalPeerId, libp2p_swarm::ListenError::LocalPeerId { .. } => IncomingConnectionError::LocalPeerId,
libp2p_swarm::ListenError::Transport( libp2p_swarm::ListenError::Transport(
libp2p_core::transport::TransportError::MultiaddrNotSupported(_), libp2p_core::transport::TransportError::MultiaddrNotSupported(_),

View File

@ -1931,8 +1931,6 @@ where
DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => { DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse."); unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
} }
#[allow(deprecated)]
DialError::ConnectionLimit(_) => {}
} }
} }

View File

@ -30,12 +30,17 @@
- Flatten `ConnectionHandlerUpgrErr` and rename to `StreamUpgradeError`. - Flatten `ConnectionHandlerUpgrErr` and rename to `StreamUpgradeError`.
See [PR 3882]. See [PR 3882].
- Remove deprecated `ConnectionLimits`.
Users should migrate to `libp2p::connection_limits::Behaviour`.
See [PR 3885].
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746 [PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746
[PR 3865]: https://github.com/libp2p/rust-libp2p/pull/3865 [PR 3865]: https://github.com/libp2p/rust-libp2p/pull/3865
[PR 3882]: https://github.com/libp2p/rust-libp2p/pull/3882 [PR 3882]: https://github.com/libp2p/rust-libp2p/pull/3882
[PR 3884]: https://github.com/libp2p/rust-libp2p/pull/3884 [PR 3884]: https://github.com/libp2p/rust-libp2p/pull/3884
[PR 3885]: https://github.com/libp2p/rust-libp2p/pull/3885
[PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 [PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886
## 0.42.2 ## 0.42.2

View File

@ -400,31 +400,6 @@ impl<'a> IncomingInfo<'a> {
} }
} }
/// Information about a connection limit.
#[deprecated(note = "Use `libp2p::connection_limits` instead.", since = "0.42.1")]
#[derive(Debug, Clone, Copy)]
pub struct ConnectionLimit {
/// The maximum number of connections.
pub limit: u32,
/// The current number of connections.
pub current: u32,
}
#[allow(deprecated)]
impl fmt::Display for ConnectionLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"connection limit exceeded ({}/{})",
self.current, self.limit
)
}
}
/// A `ConnectionLimit` can represent an error if it has been exceeded.
#[allow(deprecated)]
impl std::error::Error for ConnectionLimit {}
struct SubstreamUpgrade<UserData, Upgrade> { struct SubstreamUpgrade<UserData, Upgrade> {
user_data: Option<UserData>, user_data: Option<UserData>,
timeout: Delay, timeout: Delay,

View File

@ -18,8 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
#[allow(deprecated)]
use crate::connection::ConnectionLimit;
use crate::transport::TransportError; use crate::transport::TransportError;
use crate::Multiaddr; use crate::Multiaddr;
use crate::{ConnectedPoint, PeerId}; use crate::{ConnectedPoint, PeerId};
@ -90,15 +88,6 @@ pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s) on a connection. /// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(TTransErr), Transport(TTransErr),
/// The connection was dropped because the connection limit
/// for a peer has been reached.
#[deprecated(
note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.",
since = "0.42.1"
)]
#[allow(deprecated)]
ConnectionLimit(ConnectionLimit),
/// Pending connection attempt has been aborted. /// Pending connection attempt has been aborted.
Aborted, Aborted,
@ -117,10 +106,6 @@ impl<T> PendingConnectionError<T> {
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PendingConnectionError<U> { pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PendingConnectionError<U> {
match self { match self {
PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)), PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)),
#[allow(deprecated)]
PendingConnectionError::ConnectionLimit(l) => {
PendingConnectionError::ConnectionLimit(l)
}
PendingConnectionError::Aborted => PendingConnectionError::Aborted, PendingConnectionError::Aborted => PendingConnectionError::Aborted,
PendingConnectionError::WrongPeerId { obtained, endpoint } => { PendingConnectionError::WrongPeerId { obtained, endpoint } => {
PendingConnectionError::WrongPeerId { obtained, endpoint } PendingConnectionError::WrongPeerId { obtained, endpoint }
@ -145,10 +130,6 @@ where
"Pending connection: Transport error on connection: {err}" "Pending connection: Transport error on connection: {err}"
) )
} }
#[allow(deprecated)]
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {l}.")
}
PendingConnectionError::WrongPeerId { obtained, endpoint } => { PendingConnectionError::WrongPeerId { obtained, endpoint } => {
write!( write!(
f, f,
@ -172,8 +153,6 @@ where
PendingConnectionError::WrongPeerId { .. } => None, PendingConnectionError::WrongPeerId { .. } => None,
PendingConnectionError::LocalPeerId { .. } => None, PendingConnectionError::LocalPeerId { .. } => None,
PendingConnectionError::Aborted => None, PendingConnectionError::Aborted => None,
#[allow(deprecated)]
PendingConnectionError::ConnectionLimit(..) => None,
} }
} }
} }

View File

@ -18,8 +18,7 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
#[allow(deprecated)] use crate::connection::{Connection, ConnectionId, PendingPoint};
use crate::connection::{Connection, ConnectionId, ConnectionLimit, PendingPoint};
use crate::{ use crate::{
connection::{ connection::{
Connected, ConnectionError, IncomingInfo, PendingConnectionError, Connected, ConnectionError, IncomingInfo, PendingConnectionError,
@ -44,7 +43,6 @@ use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
use std::task::Waker; use std::task::Waker;
use std::{ use std::{
collections::{hash_map, HashMap}, collections::{hash_map, HashMap},
convert::TryFrom as _,
fmt, fmt,
num::{NonZeroU8, NonZeroUsize}, num::{NonZeroU8, NonZeroUsize},
pin::Pin, pin::Pin,
@ -306,8 +304,7 @@ where
THandler: ConnectionHandler, THandler: ConnectionHandler,
{ {
/// Creates a new empty `Pool`. /// Creates a new empty `Pool`.
#[allow(deprecated)] pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
pub(crate) fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0); let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
let executor = match config.executor { let executor = match config.executor {
Some(exec) => ExecSwitch::Executor(exec), Some(exec) => ExecSwitch::Executor(exec),
@ -315,7 +312,7 @@ where
}; };
Pool { Pool {
local_id, local_id,
counters: ConnectionCounters::new(limits), counters: ConnectionCounters::new(),
established: Default::default(), established: Default::default(),
pending: Default::default(), pending: Default::default(),
task_command_buffer_size: config.task_command_buffer_size, task_command_buffer_size: config.task_command_buffer_size,
@ -407,9 +404,6 @@ where
/// Adds a pending outgoing connection to the pool in the form of a `Future` /// Adds a pending outgoing connection to the pool in the form of a `Future`
/// that establishes and negotiates the connection. /// that establishes and negotiates the connection.
///
/// Returns an error if the limit of pending outgoing connections
/// has been reached.
#[allow(deprecated)] #[allow(deprecated)]
pub(crate) fn add_outgoing( pub(crate) fn add_outgoing(
&mut self, &mut self,
@ -426,9 +420,7 @@ where
role_override: Endpoint, role_override: Endpoint,
dial_concurrency_factor_override: Option<NonZeroU8>, dial_concurrency_factor_override: Option<NonZeroU8>,
connection_id: ConnectionId, connection_id: ConnectionId,
) -> Result<(), ConnectionLimit> { ) {
self.counters.check_max_pending_outgoing()?;
let dial = ConcurrentDial::new( let dial = ConcurrentDial::new(
dials, dials,
dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor), dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
@ -456,29 +448,20 @@ where
accepted_at: Instant::now(), accepted_at: Instant::now(),
}, },
); );
Ok(())
} }
/// Adds a pending incoming connection to the pool in the form of a /// Adds a pending incoming connection to the pool in the form of a
/// `Future` that establishes and negotiates the connection. /// `Future` that establishes and negotiates the connection.
///
/// Returns an error if the limit of pending incoming connections
/// has been reached.
#[allow(deprecated)]
pub(crate) fn add_incoming<TFut>( pub(crate) fn add_incoming<TFut>(
&mut self, &mut self,
future: TFut, future: TFut,
info: IncomingInfo<'_>, info: IncomingInfo<'_>,
connection_id: ConnectionId, connection_id: ConnectionId,
) -> Result<(), ConnectionLimit> ) where
where
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static, TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
{ {
let endpoint = info.create_connected_point(); let endpoint = info.create_connected_point();
self.counters.check_max_pending_incoming()?;
let (abort_notifier, abort_receiver) = oneshot::channel(); let (abort_notifier, abort_receiver) = oneshot::channel();
self.executor self.executor
@ -499,8 +482,6 @@ where
accepted_at: Instant::now(), accepted_at: Instant::now(),
}, },
); );
Ok(())
} }
#[allow(deprecated)] #[allow(deprecated)]
@ -684,49 +665,26 @@ where
), ),
}; };
#[allow(deprecated)] let check_peer_id = || {
// Remove once `PendingConnectionError::ConnectionLimit` is gone. if let Some(peer) = expected_peer_id {
let error = self if peer != obtained_peer_id {
.counters return Err(PendingConnectionError::WrongPeerId {
// Check general established connection limit. obtained: obtained_peer_id,
.check_max_established(&endpoint)
.map_err(PendingConnectionError::ConnectionLimit)
// Check per-peer established connection limit.
.and_then(|()| {
self.counters
.check_max_established_per_peer(num_peer_established(
&self.established,
obtained_peer_id,
))
.map_err(PendingConnectionError::ConnectionLimit)
})
// Check expected peer id matches.
.and_then(|()| {
if let Some(peer) = expected_peer_id {
if peer != obtained_peer_id {
Err(PendingConnectionError::WrongPeerId {
obtained: obtained_peer_id,
endpoint: endpoint.clone(),
})
} else {
Ok(())
}
} else {
Ok(())
}
})
// Check peer is not local peer.
.and_then(|()| {
if self.local_id == obtained_peer_id {
Err(PendingConnectionError::LocalPeerId {
endpoint: endpoint.clone(), endpoint: endpoint.clone(),
}) });
} else {
Ok(())
} }
}); }
if let Err(error) = error { if self.local_id == obtained_peer_id {
return Err(PendingConnectionError::LocalPeerId {
endpoint: endpoint.clone(),
});
}
Ok(())
};
if let Err(error) = check_peer_id() {
self.executor.spawn(poll_fn(move |cx| { self.executor.spawn(poll_fn(move |cx| {
if let Err(e) = ready!(muxer.poll_close_unpin(cx)) { if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
log::debug!( log::debug!(
@ -871,9 +829,6 @@ impl Drop for NewConnection {
/// Network connection information. /// Network connection information.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ConnectionCounters { pub struct ConnectionCounters {
/// The effective connection limits.
#[allow(deprecated)]
limits: ConnectionLimits,
/// The current number of incoming connections. /// The current number of incoming connections.
pending_incoming: u32, pending_incoming: u32,
/// The current number of outgoing connections. /// The current number of outgoing connections.
@ -885,10 +840,8 @@ pub struct ConnectionCounters {
} }
impl ConnectionCounters { impl ConnectionCounters {
#[allow(deprecated)] fn new() -> Self {
fn new(limits: ConnectionLimits) -> Self {
Self { Self {
limits,
pending_incoming: 0, pending_incoming: 0,
pending_outgoing: 0, pending_outgoing: 0,
established_incoming: 0, established_incoming: 0,
@ -896,13 +849,6 @@ impl ConnectionCounters {
} }
} }
/// The effective connection limits.
#[deprecated(note = "Use the `libp2p::connection_limits` instead.")]
#[allow(deprecated)]
pub fn limits(&self) -> &ConnectionLimits {
&self.limits
}
/// The total number of connections, both pending and established. /// The total number of connections, both pending and established.
pub fn num_connections(&self) -> u32 { pub fn num_connections(&self) -> u32 {
self.num_pending() + self.num_established() self.num_pending() + self.num_established()
@ -985,117 +931,6 @@ impl ConnectionCounters {
} }
} }
} }
#[allow(deprecated)]
fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
}
#[allow(deprecated)]
fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_incoming, self.limits.max_pending_incoming)
}
#[allow(deprecated)]
fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> {
// Check total connection limit.
Self::check(self.num_established(), self.limits.max_established_total)?;
// Check incoming/outgoing connection limits
match endpoint {
ConnectedPoint::Dialer { .. } => Self::check(
self.established_outgoing,
self.limits.max_established_outgoing,
),
ConnectedPoint::Listener { .. } => Self::check(
self.established_incoming,
self.limits.max_established_incoming,
),
}
}
#[allow(deprecated)]
fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
Self::check(current, self.limits.max_established_per_peer)
}
#[allow(deprecated)]
fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
if let Some(limit) = limit {
if current >= limit {
return Err(ConnectionLimit { limit, current });
}
}
Ok(())
}
}
/// Counts the number of established connections to the given peer.
fn num_peer_established<TInEvent>(
established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, EstablishedConnection<TInEvent>>>,
peer: PeerId,
) -> u32 {
established.get(&peer).map_or(0, |conns| {
u32::try_from(conns.len()).expect("Unexpectedly large number of connections for a peer.")
})
}
/// The configurable connection limits.
///
/// By default no connection limits apply.
#[derive(Debug, Clone, Default)]
#[deprecated(note = "Use `libp2p::connection_limits` instead.", since = "0.42.1")]
pub struct ConnectionLimits {
max_pending_incoming: Option<u32>,
max_pending_outgoing: Option<u32>,
max_established_incoming: Option<u32>,
max_established_outgoing: Option<u32>,
max_established_per_peer: Option<u32>,
max_established_total: Option<u32>,
}
#[allow(deprecated)]
impl ConnectionLimits {
/// Configures the maximum number of concurrently incoming connections being established.
pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
self.max_pending_incoming = limit;
self
}
/// Configures the maximum number of concurrently outgoing connections being established.
pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_pending_outgoing = limit;
self
}
/// Configures the maximum number of concurrent established inbound connections.
pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
self.max_established_incoming = limit;
self
}
/// Configures the maximum number of concurrent established outbound connections.
pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_established_outgoing = limit;
self
}
/// Configures the maximum number of concurrent established connections (both
/// inbound and outbound).
///
/// Note: This should be used in conjunction with
/// [`ConnectionLimits::with_max_established_incoming`] to prevent possible
/// eclipse attacks (all connections being inbound).
pub fn with_max_established(mut self, limit: Option<u32>) -> Self {
self.max_established_total = limit;
self
}
/// Configures the maximum number of concurrent established connections per peer,
/// regardless of direction (incoming or outgoing).
pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
self.max_established_per_peer = limit;
self
}
} }
/// Configuration options when creating a [`Pool`]. /// Configuration options when creating a [`Pool`].

View File

@ -107,8 +107,6 @@ pub mod derive_prelude {
pub use libp2p_identity::PeerId; pub use libp2p_identity::PeerId;
} }
#[allow(deprecated)]
pub use crate::connection::ConnectionLimit;
#[allow(deprecated)] #[allow(deprecated)]
pub use behaviour::NetworkBehaviourAction; pub use behaviour::NetworkBehaviourAction;
pub use behaviour::{ pub use behaviour::{
@ -117,8 +115,7 @@ pub use behaviour::{
ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddr, NewListenAddr, NotifyHandler, ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddr, NewListenAddr, NotifyHandler,
PollParameters, ToSwarm, PollParameters, ToSwarm,
}; };
#[allow(deprecated)] pub use connection::pool::ConnectionCounters;
pub use connection::pool::{ConnectionCounters, ConnectionLimits};
pub use connection::{ConnectionError, ConnectionId}; pub use connection::{ConnectionError, ConnectionId};
pub use executor::Executor; pub use executor::Executor;
pub use handler::{ pub use handler::{
@ -621,27 +618,15 @@ where
}) })
.collect(); .collect();
match self.pool.add_outgoing( self.pool.add_outgoing(
dials, dials,
peer_id, peer_id,
dial_opts.role_override(), dial_opts.role_override(),
dial_opts.dial_concurrency_override(), dial_opts.dial_concurrency_override(),
connection_id, connection_id,
) { );
Ok(()) => Ok(()),
Err(connection_limit) => {
#[allow(deprecated)]
let error = DialError::ConnectionLimit(connection_limit);
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
error: &error,
connection_id,
}));
Err(error) Ok(())
}
}
} }
/// Returns an iterator that produces the list of addresses we're listening on. /// Returns an iterator that produces the list of addresses we're listening on.
@ -1031,33 +1016,19 @@ where
} }
} }
match self.pool.add_incoming( self.pool.add_incoming(
upgrade, upgrade,
IncomingInfo { IncomingInfo {
local_addr: &local_addr, local_addr: &local_addr,
send_back_addr: &send_back_addr, send_back_addr: &send_back_addr,
}, },
connection_id, connection_id,
) { );
Ok(()) => {
return Some(SwarmEvent::IncomingConnection { Some(SwarmEvent::IncomingConnection {
local_addr, local_addr,
send_back_addr, send_back_addr,
}); })
}
Err(connection_limit) => {
#[allow(deprecated)]
let error = ListenError::ConnectionLimit(connection_limit);
self.behaviour
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
error: &error,
connection_id,
}));
log::debug!("Incoming connection rejected: {:?}", connection_limit);
}
};
} }
TransportEvent::NewAddress { TransportEvent::NewAddress {
listener_id, listener_id,
@ -1073,10 +1044,10 @@ where
listener_id, listener_id,
addr: &listen_addr, addr: &listen_addr,
})); }));
return Some(SwarmEvent::NewListenAddr { Some(SwarmEvent::NewListenAddr {
listener_id, listener_id,
address: listen_addr, address: listen_addr,
}); })
} }
TransportEvent::AddressExpired { TransportEvent::AddressExpired {
listener_id, listener_id,
@ -1095,10 +1066,10 @@ where
listener_id, listener_id,
addr: &listen_addr, addr: &listen_addr,
})); }));
return Some(SwarmEvent::ExpiredListenAddr { Some(SwarmEvent::ExpiredListenAddr {
listener_id, listener_id,
address: listen_addr, address: listen_addr,
}); })
} }
TransportEvent::ListenerClosed { TransportEvent::ListenerClosed {
listener_id, listener_id,
@ -1116,11 +1087,11 @@ where
listener_id, listener_id,
reason: reason.as_ref().copied(), reason: reason.as_ref().copied(),
})); }));
return Some(SwarmEvent::ListenerClosed { Some(SwarmEvent::ListenerClosed {
listener_id, listener_id,
addresses: addrs.to_vec(), addresses: addrs.to_vec(),
reason, reason,
}); })
} }
TransportEvent::ListenerError { listener_id, error } => { TransportEvent::ListenerError { listener_id, error } => {
self.behaviour self.behaviour
@ -1128,10 +1099,9 @@ where
listener_id, listener_id,
err: &error, err: &error,
})); }));
return Some(SwarmEvent::ListenerError { listener_id, error }); Some(SwarmEvent::ListenerError { listener_id, error })
} }
} }
None
} }
fn handle_behaviour_event( fn handle_behaviour_event(
@ -1467,8 +1437,6 @@ pub struct SwarmBuilder<TBehaviour> {
transport: transport::Boxed<(PeerId, StreamMuxerBox)>, transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour, behaviour: TBehaviour,
pool_config: PoolConfig, pool_config: PoolConfig,
#[allow(deprecated)]
connection_limits: ConnectionLimits,
} }
impl<TBehaviour> SwarmBuilder<TBehaviour> impl<TBehaviour> SwarmBuilder<TBehaviour>
@ -1489,7 +1457,6 @@ where
transport, transport,
behaviour, behaviour,
pool_config: PoolConfig::new(Some(Box::new(executor))), pool_config: PoolConfig::new(Some(Box::new(executor))),
connection_limits: Default::default(),
} }
} }
@ -1571,7 +1538,6 @@ where
transport, transport,
behaviour, behaviour,
pool_config: PoolConfig::new(None), pool_config: PoolConfig::new(None),
connection_limits: Default::default(),
} }
} }
@ -1611,13 +1577,6 @@ where
self self
} }
/// Configures the connection limits.
#[allow(deprecated)]
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.connection_limits = limits;
self
}
/// Configures an override for the substream upgrade protocol to use. /// Configures an override for the substream upgrade protocol to use.
/// ///
/// The subtream upgrade protocol is the multistream-select protocol /// The subtream upgrade protocol is the multistream-select protocol
@ -1652,7 +1611,7 @@ where
Swarm { Swarm {
local_peer_id: self.local_peer_id, local_peer_id: self.local_peer_id,
transport: self.transport, transport: self.transport,
pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits), pool: Pool::new(self.local_peer_id, self.pool_config),
behaviour: self.behaviour, behaviour: self.behaviour,
supported_protocols: Default::default(), supported_protocols: Default::default(),
listened_addrs: HashMap::new(), listened_addrs: HashMap::new(),
@ -1665,14 +1624,6 @@ where
/// Possible errors when trying to establish or upgrade an outbound connection. /// Possible errors when trying to establish or upgrade an outbound connection.
#[derive(Debug)] #[derive(Debug)]
pub enum DialError { pub enum DialError {
/// The configured limit for simultaneous outgoing connections
/// has been reached.
#[deprecated(
note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.",
since = "0.42.1"
)]
#[allow(deprecated)]
ConnectionLimit(ConnectionLimit),
/// The peer identity obtained on the connection matches the local peer. /// The peer identity obtained on the connection matches the local peer.
LocalPeerId { LocalPeerId {
endpoint: ConnectedPoint, endpoint: ConnectedPoint,
@ -1701,8 +1652,6 @@ pub enum DialError {
impl From<PendingOutboundConnectionError> for DialError { impl From<PendingOutboundConnectionError> for DialError {
fn from(error: PendingOutboundConnectionError) -> Self { fn from(error: PendingOutboundConnectionError) -> Self {
match error { match error {
#[allow(deprecated)]
PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit),
PendingConnectionError::Aborted => DialError::Aborted, PendingConnectionError::Aborted => DialError::Aborted,
PendingConnectionError::WrongPeerId { obtained, endpoint } => { PendingConnectionError::WrongPeerId { obtained, endpoint } => {
DialError::WrongPeerId { obtained, endpoint } DialError::WrongPeerId { obtained, endpoint }
@ -1716,8 +1665,6 @@ impl From<PendingOutboundConnectionError> for DialError {
impl fmt::Display for DialError { impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
#[allow(deprecated)]
DialError::ConnectionLimit(err) => write!(f, "Dial error: {err}"),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::LocalPeerId { endpoint } => write!( DialError::LocalPeerId { endpoint } => write!(
f, f,
@ -1769,8 +1716,6 @@ fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::R
impl error::Error for DialError { impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> { fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self { match self {
#[allow(deprecated)]
DialError::ConnectionLimit(err) => Some(err),
DialError::LocalPeerId { .. } => None, DialError::LocalPeerId { .. } => None,
DialError::NoAddresses => None, DialError::NoAddresses => None,
DialError::DialPeerConditionFalse(_) => None, DialError::DialPeerConditionFalse(_) => None,
@ -1786,14 +1731,6 @@ impl error::Error for DialError {
/// Possible errors when upgrading an inbound connection. /// Possible errors when upgrading an inbound connection.
#[derive(Debug)] #[derive(Debug)]
pub enum ListenError { pub enum ListenError {
/// The configured limit for simultaneous outgoing connections
/// has been reached.
#[deprecated(
note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.",
since = "0.42.1"
)]
#[allow(deprecated)]
ConnectionLimit(ConnectionLimit),
/// Pending connection attempt has been aborted. /// Pending connection attempt has been aborted.
Aborted, Aborted,
/// The peer identity obtained on the connection did not match the one that was expected. /// The peer identity obtained on the connection did not match the one that was expected.
@ -1816,10 +1753,6 @@ impl From<PendingInboundConnectionError> for ListenError {
fn from(error: PendingInboundConnectionError) -> Self { fn from(error: PendingInboundConnectionError) -> Self {
match error { match error {
PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner), PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
#[allow(deprecated)]
PendingInboundConnectionError::ConnectionLimit(inner) => {
ListenError::ConnectionLimit(inner)
}
PendingInboundConnectionError::Aborted => ListenError::Aborted, PendingInboundConnectionError::Aborted => ListenError::Aborted,
PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => { PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
ListenError::WrongPeerId { obtained, endpoint } ListenError::WrongPeerId { obtained, endpoint }
@ -1834,8 +1767,6 @@ impl From<PendingInboundConnectionError> for ListenError {
impl fmt::Display for ListenError { impl fmt::Display for ListenError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
#[allow(deprecated)]
ListenError::ConnectionLimit(_) => write!(f, "Listen error"),
ListenError::Aborted => write!( ListenError::Aborted => write!(
f, f,
"Listen error: Pending connection attempt has been aborted." "Listen error: Pending connection attempt has been aborted."
@ -1860,8 +1791,6 @@ impl fmt::Display for ListenError {
impl error::Error for ListenError { impl error::Error for ListenError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> { fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self { match self {
#[allow(deprecated)]
ListenError::ConnectionLimit(err) => Some(err),
ListenError::WrongPeerId { .. } => None, ListenError::WrongPeerId { .. } => None,
ListenError::Transport(err) => Some(err), ListenError::Transport(err) => Some(err),
ListenError::Aborted => None, ListenError::Aborted => None,
@ -1967,8 +1896,7 @@ mod tests {
use either::Either; use either::Either;
use futures::executor::block_on; use futures::executor::block_on;
use futures::executor::ThreadPool; use futures::executor::ThreadPool;
use futures::future::poll_fn; use futures::{executor, future};
use futures::{executor, future, ready};
use libp2p_core::multiaddr::multiaddr; use libp2p_core::multiaddr::multiaddr;
use libp2p_core::transport::memory::MemoryTransportError; use libp2p_core::transport::memory::MemoryTransportError;
use libp2p_core::transport::TransportEvent; use libp2p_core::transport::TransportEvent;
@ -2341,178 +2269,6 @@ mod tests {
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _); QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
} }
#[test]
#[allow(deprecated)]
fn max_outgoing() {
use rand::Rng;
let outgoing_limit = rand::thread_rng().gen_range(1..10);
let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit));
let mut network = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
.connection_limits(limits)
.build();
let addr: Multiaddr = "/memory/1234".parse().unwrap();
let target = PeerId::random();
for _ in 0..outgoing_limit {
network
.dial(
DialOpts::peer_id(target)
.addresses(vec![addr.clone()])
.build(),
)
.expect("Unexpected connection limit.");
}
match network
.dial(DialOpts::peer_id(target).addresses(vec![addr]).build())
.expect_err("Unexpected dialing success.")
{
#[allow(deprecated)]
DialError::ConnectionLimit(limit) => {
assert_eq!(limit.current, outgoing_limit);
assert_eq!(limit.limit, outgoing_limit);
}
e => panic!("Unexpected error: {e:?}"),
}
let info = network.network_info();
assert_eq!(info.num_peers(), 0);
assert_eq!(
info.connection_counters().num_pending_outgoing(),
outgoing_limit
);
}
#[test]
fn max_established_incoming() {
#[derive(Debug, Clone)]
struct Limit(u32);
impl Arbitrary for Limit {
fn arbitrary(g: &mut Gen) -> Self {
Self(g.gen_range(1..10))
}
}
#[allow(deprecated)]
fn limits(limit: u32) -> ConnectionLimits {
ConnectionLimits::default().with_max_established_incoming(Some(limit))
}
fn prop(limit: Limit) {
let limit = limit.0;
#[allow(deprecated)]
let mut network1 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
.connection_limits(limits(limit))
.build();
#[allow(deprecated)]
let mut network2 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
.connection_limits(limits(limit))
.build();
let _ = network1.listen_on(multiaddr![Memory(0u64)]).unwrap();
let listen_addr = async_std::task::block_on(poll_fn(|cx| {
match ready!(network1.poll_next_unpin(cx)).unwrap() {
SwarmEvent::NewListenAddr { address, .. } => Poll::Ready(address),
e => panic!("Unexpected network event: {e:?}"),
}
}));
// Spawn and block on the dialer.
async_std::task::block_on({
let mut n = 0;
network2.dial(listen_addr.clone()).unwrap();
let mut expected_closed = false;
let mut network_1_established = false;
let mut network_2_established = false;
let mut network_1_limit_reached = false;
let mut network_2_limit_reached = false;
poll_fn(move |cx| {
loop {
let mut network_1_pending = false;
let mut network_2_pending = false;
match network1.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) => {}
Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
network_1_established = true;
}
#[allow(deprecated)]
Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
error: ListenError::ConnectionLimit(err),
..
})) => {
assert_eq!(err.limit, limit);
assert_eq!(err.limit, err.current);
let info = network1.network_info();
let counters = info.connection_counters();
assert_eq!(counters.num_established_incoming(), limit);
assert_eq!(counters.num_established(), limit);
network_1_limit_reached = true;
}
Poll::Pending => {
network_1_pending = true;
}
e => panic!("Unexpected network event: {e:?}"),
}
match network2.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
network_2_established = true;
}
Poll::Ready(Some(SwarmEvent::ConnectionClosed { .. })) => {
assert!(expected_closed);
let info = network2.network_info();
let counters = info.connection_counters();
assert_eq!(counters.num_established_outgoing(), limit);
assert_eq!(counters.num_established(), limit);
network_2_limit_reached = true;
}
Poll::Pending => {
network_2_pending = true;
}
e => panic!("Unexpected network event: {e:?}"),
}
if network_1_pending && network_2_pending {
return Poll::Pending;
}
if network_1_established && network_2_established {
network_1_established = false;
network_2_established = false;
if n <= limit {
// Dial again until the limit is exceeded.
n += 1;
network2.dial(listen_addr.clone()).unwrap();
if n == limit {
// The the next dialing attempt exceeds the limit, this
// is the connection we expected to get closed.
expected_closed = true;
}
} else {
panic!("Expect networks not to establish connections beyond the limit.")
}
}
if network_1_limit_reached && network_2_limit_reached {
return Poll::Ready(());
}
}
})
});
}
quickcheck(prop as fn(_));
}
#[test] #[test]
fn invalid_peer_id() { fn invalid_peer_id() {
// Checks whether dialing an address containing the wrong peer id raises an error // Checks whether dialing an address containing the wrong peer id raises an error