[core/swarm] Refactor and extend configurable connection limits. (#1848)

* Refactor and extend configurable connection limits.

To better track different connection counts, permit configurable
limits for these counts and make these available for
inspection efficiently, introduce dedicated connection counters
via a `ConnectionCounters` structure that is exposed on the
API via the `NetworkInfo`. All connection or connection
states that are counted in this way can also have effective
configurable limits.

* Cleanup

* Add missing file.

* Refine naming and config API.

* Update core/CHANGELOG.md

Co-authored-by: Max Inden <mail@max-inden.de>

* Update core/CHANGELOG.md

Co-authored-by: Max Inden <mail@max-inden.de>

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Roman Borschel 2020-11-23 17:22:15 +01:00 committed by GitHub
parent 1bd013c843
commit cef75ab7e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 501 additions and 270 deletions

View File

@ -1,5 +1,14 @@
# 0.25.0 [unreleased]
- The `NetworkConfig` API is now a builder that moves `self`.
[PR 1848](https://github.com/libp2p/rust-libp2p/pull/1848/).
- New configurable connection limits for established connections and
dedicated connection counters. Removed the connection limit dedicated
to outgoing pending connection _per peer_. Connection limits are now
represented by `u32` intead of `usize` types.
[PR 1848](https://github.com/libp2p/rust-libp2p/pull/1848/).
- Update `multihash`.
- Update `multistream-select`.

View File

@ -32,6 +32,7 @@ pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
pub use manager::ConnectionId;
pub use substream::{Substream, SubstreamEndpoint, Close};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use pool::{ConnectionLimits, ConnectionCounters};
use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
@ -326,9 +327,9 @@ impl<'a> OutgoingInfo<'a> {
#[derive(Debug, Clone)]
pub struct ConnectionLimit {
/// The maximum number of connections.
pub limit: usize,
pub limit: u32,
/// The current number of connections.
pub current: usize,
pub current: u32,
}
impl fmt::Display for ConnectionLimit {

View File

@ -48,8 +48,8 @@ use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, tas
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
local_id: PeerId,
/// The configuration of the pool.
limits: PoolLimits,
/// The connection counter(s).
counters: ConnectionCounters,
/// The connection manager that handles the connection I/O for both
/// established and pending connections.
@ -75,9 +75,8 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
// TODO: More useful debug impl?
f.debug_struct("Pool")
.field("limits", &self.limits)
.field("counters", &self.counters)
.finish()
}
}
@ -183,13 +182,13 @@ where
},
PoolEvent::ConnectionEvent { ref connection, ref event } => {
f.debug_struct("PoolEvent::ConnectionEvent")
.field("conn_info", connection.info())
.field("peer", connection.peer_id())
.field("event", event)
.finish()
},
PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
f.debug_struct("PoolEvent::AddressChange")
.field("conn_info", connection.info())
.field("peer", connection.peer_id())
.field("new_endpoint", new_endpoint)
.field("old_endpoint", old_endpoint)
.finish()
@ -205,11 +204,11 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
pub fn new(
local_id: PeerId,
manager_config: ManagerConfig,
limits: PoolLimits
limits: ConnectionLimits
) -> Self {
Pool {
local_id,
limits,
counters: ConnectionCounters::new(limits),
manager: Manager::new(manager_config),
established: Default::default(),
pending: Default::default(),
@ -217,9 +216,9 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
}
}
/// Gets the configured connection limits of the pool.
pub fn limits(&self) -> &PoolLimits {
&self.limits
/// Gets the dedicated connection counters.
pub fn counters(&self) -> &ConnectionCounters {
&self.counters
}
/// Adds a pending incoming connection to the pool in the form of a
@ -252,8 +251,8 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_pending_incoming()?;
let endpoint = info.to_connected_point();
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
Ok(self.add_pending(future, handler, endpoint, None))
}
@ -287,12 +286,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
if let Some(peer) = &info.peer_id {
self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?;
}
self.counters.check_max_pending_outgoing()?;
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
@ -350,6 +344,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
});
let id = self.manager.add_pending(future, handler);
self.counters.inc_pending(&endpoint);
self.pending.insert(id, (endpoint, peer));
id
}
@ -377,13 +372,10 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
if let Some(limit) = self.limits.max_established_per_peer {
let current = self.num_peer_established(&i.peer_id);
if limit >= current {
return Err(ConnectionLimit { limit, current })
}
}
self.counters.check_max_established(&i.endpoint)?;
self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
let id = self.manager.add(c, i.clone());
self.counters.inc_established(&i.endpoint);
self.established.entry(i.peer_id.clone()).or_default().insert(id, i.endpoint);
Ok(id)
}
@ -403,6 +395,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
Some(PoolConnection::Pending(PendingConnection {
entry,
pending: &mut self.pending,
counters: &mut self.counters,
})),
None => None
}
@ -429,6 +422,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
Some(PendingConnection {
entry,
pending: &mut self.pending,
counters: &mut self.counters,
}),
_ => unreachable!("by consistency of `self.pending` with `self.manager`")
}
@ -445,7 +439,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Returns the number of connected peers, i.e. those with at least one
/// established connection in the pool.
pub fn num_connected(&self) -> usize {
pub fn num_peers(&self) -> usize {
self.established.len()
}
@ -462,7 +456,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
if let Some(conns) = self.established.get(peer) {
// Count upwards because we push to / pop from the end. See also `Pool::poll`.
let mut num_established = 0;
for &id in conns.keys() {
for (&id, endpoint) in conns.iter() {
match self.manager.entry(id) {
Some(manager::Entry::Established(e)) => {
let connected = e.remove();
@ -473,6 +467,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
},
_ => {}
}
self.counters.dec_established(endpoint);
}
}
self.established.remove(peer);
@ -490,30 +485,15 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
}
}
for id in aborted {
self.pending.remove(&id);
if let Some((endpoint, _)) = self.pending.remove(&id) {
self.counters.dec_pending(&endpoint);
}
}
}
/// Counts the number of established connections in the pool.
pub fn num_established(&self) -> usize {
self.established.iter().fold(0, |n, (_, conns)| n + conns.len())
}
/// Counts the number of pending connections in the pool.
pub fn num_pending(&self) -> usize {
self.iter_pending_info().count()
}
/// Counts the number of established connections to the given peer.
pub fn num_peer_established(&self, peer: &PeerId) -> usize {
self.established.get(peer).map_or(0, |conns| conns.len())
}
/// Counts the number of pending outgoing connections to the given peer.
pub fn num_peer_outgoing(&self, peer: &PeerId) -> usize {
self.iter_pending_outgoing()
.filter(|info| info.peer_id == Some(peer))
.count()
pub fn num_peer_established(&self, peer: &PeerId) -> u32 {
num_peer_established(&self.established, peer)
}
/// Returns an iterator over all established connections of `peer`.
@ -620,6 +600,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
match item {
manager::Event::PendingConnectionError { id, error, handler } => {
if let Some((endpoint, peer)) = self.pending.remove(&id) {
self.counters.dec_pending(&endpoint);
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint,
@ -633,7 +614,9 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
manager::Event::ConnectionClosed { id, connected, error } => {
let num_established =
if let Some(conns) = self.established.get_mut(&connected.peer_id) {
conns.remove(&id);
if let Some(endpoint) = conns.remove(&id) {
self.counters.dec_established(&endpoint);
}
u32::try_from(conns.len()).unwrap()
} else {
0
@ -648,11 +631,10 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
manager::Event::ConnectionEstablished { entry } => {
let id = entry.id();
if let Some((endpoint, peer)) = self.pending.remove(&id) {
// Check connection limit.
let established = &self.established;
let current = || established.get(&entry.connected().peer_id)
.map_or(0, |conns| conns.len());
if let Err(e) = self.limits.check_established(current) {
self.counters.dec_pending(&endpoint);
// Check general established connection limit.
if let Err(e) = self.counters.check_max_established(&endpoint) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
@ -663,6 +645,21 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
pool: self
})
}
// Check per-peer established connection limit.
let current = num_peer_established(&self.established, &entry.connected().peer_id);
if let Err(e) = self.counters.check_max_established_per_peer(current) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self
})
}
// Peer ID checks must already have happened. See `add_pending`.
if cfg!(debug_assertions) {
if self.local_id == entry.connected().peer_id {
@ -674,11 +671,13 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
}
}
}
// Add the connection to the pool.
let peer = entry.connected().peer_id.clone();
let conns = self.established.entry(peer).or_default();
let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
.expect("n + 1 is always non-zero; qed");
self.counters.inc_established(&endpoint);
conns.insert(id, endpoint);
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
@ -736,6 +735,7 @@ pub enum PoolConnection<'a, TInEvent> {
pub struct PendingConnection<'a, TInEvent> {
entry: manager::PendingEntry<'a, TInEvent>,
pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
counters: &'a mut ConnectionCounters,
}
impl<TInEvent>
@ -758,7 +758,8 @@ impl<TInEvent>
/// Aborts the connection attempt, closing the connection.
pub fn abort(self) {
self.pending.remove(&self.entry.id());
let endpoint = self.pending.remove(&self.entry.id()).expect("`entry` is a pending entry").0;
self.counters.dec_pending(&endpoint);
self.entry.abort();
}
}
@ -790,24 +791,16 @@ impl<TInEvent> EstablishedConnection<'_, TInEvent> {
&self.entry.connected().endpoint
}
/// Returns connection information obtained from the transport.
pub fn info(&self) -> &PeerId {
/// Returns the identity of the connected peer.
pub fn peer_id(&self) -> &PeerId {
&self.entry.connected().peer_id
}
}
impl<'a, TInEvent> EstablishedConnection<'a, TInEvent>
{
/// Returns the local connection ID.
pub fn id(&self) -> ConnectionId {
self.entry.id()
}
/// Returns the identity of the connected peer.
pub fn peer_id(&self) -> &PeerId {
self.info()
}
/// (Asynchronously) sends an event to the connection handler.
///
/// If the handler is not ready to receive the event, either because
@ -894,62 +887,196 @@ where
}
}
/// The configurable limits of a connection [`Pool`].
#[derive(Debug, Clone, Default)]
pub struct PoolLimits {
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
pub max_outgoing_per_peer: Option<usize>,
/// Network connection information.
#[derive(Debug, Clone)]
pub struct ConnectionCounters {
/// The effective connection limits.
limits: ConnectionLimits,
/// The current number of incoming connections.
pending_incoming: u32,
/// The current number of outgoing connections.
pending_outgoing: u32,
/// The current number of established inbound connections.
established_incoming: u32,
/// The current number of established outbound connections.
established_outgoing: u32,
}
impl PoolLimits {
fn check_established<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_established_per_peer)
impl ConnectionCounters {
fn new(limits: ConnectionLimits) -> Self {
Self {
limits,
pending_incoming: 0,
pending_outgoing: 0,
established_incoming: 0,
established_outgoing: 0,
}
}
fn check_outgoing<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing)
/// The effective connection limits.
pub fn limits(&self) -> &ConnectionLimits {
&self.limits
}
fn check_incoming<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_incoming)
/// The total number of connections, both pending and established.
pub fn num_connections(&self) -> u32 {
self.num_pending() + self.num_established()
}
fn check_outgoing_per_peer<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing_per_peer)
/// The total number of pending connections, both incoming and outgoing.
pub fn num_pending(&self) -> u32 {
self.pending_incoming + self.pending_outgoing
}
fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
/// The number of incoming connections being established.
pub fn num_pending_incoming(&self) -> u32 {
self.pending_incoming
}
/// The number of outgoing connections being established.
pub fn num_pending_outgoing(&self) -> u32 {
self.pending_outgoing
}
/// The number of established incoming connections.
pub fn num_established_incoming(&self) -> u32 {
self.established_incoming
}
/// The number of established outgoing connections.
pub fn num_established_outgoing(&self) -> u32 {
self.established_outgoing
}
/// The total number of established connections.
pub fn num_established(&self) -> u32 {
self.established_outgoing + self.established_incoming
}
fn inc_pending(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.pending_outgoing += 1; }
ConnectedPoint::Listener { .. } => { self.pending_incoming += 1; }
}
}
fn dec_pending(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.pending_outgoing -= 1; }
ConnectedPoint::Listener { .. } => { self.pending_incoming -= 1; }
}
}
fn inc_established(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.established_outgoing += 1; }
ConnectedPoint::Listener { .. } => { self.established_incoming += 1; }
}
}
fn dec_established(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.established_outgoing -= 1; }
ConnectedPoint::Listener { .. } => { self.established_incoming -= 1; }
}
}
fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
}
fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_incoming, self.limits.max_pending_incoming)
}
fn check_max_established(&self, endpoint: &ConnectedPoint)
-> Result<(), ConnectionLimit>
{
match endpoint {
ConnectedPoint::Dialer { .. } =>
Self::check(self.established_outgoing, self.limits.max_established_outgoing),
ConnectedPoint::Listener { .. } => {
Self::check(self.established_incoming, self.limits.max_established_incoming)
}
}
}
fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
Self::check(current, self.limits.max_established_per_peer)
}
fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
if let Some(limit) = limit {
let current = current();
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
Ok(())
}
}
/// Counts the number of established connections to the given peer.
fn num_peer_established(
established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
peer: &PeerId
) -> u32 {
established.get(peer).map_or(0, |conns|
u32::try_from(conns.len())
.expect("Unexpectedly large number of connections for a peer."))
}
/// The configurable connection limits.
///
/// By default no connection limits apply.
#[derive(Debug, Clone, Default)]
pub struct ConnectionLimits {
max_pending_incoming: Option<u32>,
max_pending_outgoing: Option<u32>,
max_established_incoming: Option<u32>,
max_established_outgoing: Option<u32>,
max_established_per_peer: Option<u32>,
}
impl ConnectionLimits {
/// Configures the maximum number of concurrently incoming connections being established.
pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
self.max_pending_incoming = limit;
self
}
/// Configures the maximum number of concurrently outgoing connections being established.
pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_pending_outgoing = limit;
self
}
/// Configures the maximum number of concurrent established inbound connections.
pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
self.max_established_incoming = limit;
self
}
/// Configures the maximum number of concurrent established outbound connections.
pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_established_outgoing = limit;
self
}
/// Configures the maximum number of concurrent established connections per peer,
/// regardless of direction (incoming or outgoing).
pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
self.max_established_per_peer = limit;
self
}
}
/// Information about a former established connection to a peer
/// that was dropped via [`Pool::disconnect`].
struct Disconnected {
/// The unique identifier of the dropped connection.
id: ConnectionId,
/// Information about the dropped connection.
connected: Connected,
/// The remaining number of established connections
/// to the same peer.

View File

@ -21,6 +21,7 @@
mod event;
pub mod peer;
pub use crate::connection::{ConnectionLimits, ConnectionCounters};
pub use event::{NetworkEvent, IncomingConnection};
pub use peer::Peer;
@ -43,7 +44,7 @@ use crate::{
PendingConnectionError,
Substream,
manager::ManagerConfig,
pool::{Pool, PoolEvent, PoolLimits},
pool::{Pool, PoolEvent},
},
muxing::StreamMuxer,
transport::{Transport, TransportError},
@ -134,9 +135,9 @@ where
TTrans: Transport + Clone,
TMuxer: StreamMuxer,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
{
/// Creates a new node events stream.
pub fn new(
@ -148,7 +149,7 @@ where
Network {
local_peer_id,
listeners: ListenersStream::new(transport),
pool: Pool::new(pool_local_id, config.manager_config, config.pool_limits),
pool: Pool::new(pool_local_id, config.manager_config, config.limits),
dialing: Default::default(),
}
}
@ -244,15 +245,11 @@ where
/// Returns information about the state of the `Network`.
pub fn info(&self) -> NetworkInfo {
let num_connections_established = self.pool.num_established();
let num_connections_pending = self.pool.num_pending();
let num_connections = num_connections_established + num_connections_pending;
let num_peers = self.pool.num_connected();
let num_peers = self.pool.num_peers();
let connection_counters = self.pool.counters().clone();
NetworkInfo {
num_peers,
num_connections,
num_connections_established,
num_connections_pending,
connection_counters,
}
}
@ -301,22 +298,6 @@ where
self.dialing.keys()
}
/// Gets the configured limit on pending incoming connections,
/// i.e. concurrent incoming connection attempts.
pub fn incoming_limit(&self) -> Option<usize> {
self.pool.limits().max_incoming
}
/// The total number of established connections in the `Network`.
pub fn num_connections_established(&self) -> usize {
self.pool.num_established()
}
/// The total number of pending connections in the `Network`.
pub fn num_connections_pending(&self) -> usize {
self.pool.num_pending()
}
/// Obtains a view of a [`Peer`] with the given ID in the network.
pub fn peer(&mut self, peer_id: PeerId)
-> Peer<'_, TTrans, TInEvent, TOutEvent, THandler>
@ -615,13 +596,22 @@ where
#[derive(Clone, Debug)]
pub struct NetworkInfo {
/// The total number of connected peers.
pub num_peers: usize,
/// The total number of connections, both established and pending.
pub num_connections: usize,
/// The total number of pending connections, both incoming and outgoing.
pub num_connections_pending: usize,
/// The total number of established connections.
pub num_connections_established: usize,
num_peers: usize,
/// Counters of ongoing network connections.
connection_counters: ConnectionCounters,
}
impl NetworkInfo {
/// The number of connected peers, i.e. peers with whom at least
/// one established connection exists.
pub fn num_peers(&self) -> usize {
self.num_peers
}
/// Gets counters for ongoing network connections.
pub fn connection_counters(&self) -> &ConnectionCounters {
&self.connection_counters
}
}
/// The (optional) configuration for a [`Network`].
@ -635,17 +625,25 @@ pub struct NetworkConfig {
/// one "free" slot per task. Thus the given total `notify_handler_buffer_size`
/// exposed for configuration on the `Network` is reduced by one.
manager_config: ManagerConfig,
pool_limits: PoolLimits,
/// The effective connection limits.
limits: ConnectionLimits,
}
impl NetworkConfig {
pub fn set_executor(&mut self, e: Box<dyn Executor + Send>) -> &mut Self {
/// Configures the executor to use for spawning connection background tasks.
pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.manager_config.executor = Some(e);
self
}
pub fn executor(&self) -> Option<&Box<dyn Executor + Send>> {
self.manager_config.executor.as_ref()
/// Configures the executor to use for spawning connection background tasks,
/// only if no executor has already been configured.
pub fn or_else_with_executor<F>(mut self, f: F) -> Self
where
F: FnOnce() -> Option<Box<dyn Executor + Send>>
{
self.manager_config.executor = self.manager_config.executor.or_else(f);
self
}
/// Sets the maximum number of events sent to a connection's background task
@ -655,7 +653,7 @@ impl NetworkConfig {
/// When the buffer for a particular connection is full, `notify_handler` will no
/// longer be able to deliver events to the associated `ConnectionHandler`,
/// thus exerting back-pressure on the connection and peer API.
pub fn set_notify_handler_buffer_size(&mut self, n: NonZeroUsize) -> &mut Self {
pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.manager_config.task_command_buffer_size = n.get() - 1;
self
}
@ -666,28 +664,14 @@ impl NetworkConfig {
/// When the buffer is full, the background tasks of all connections will stall.
/// In this way, the consumers of network events exert back-pressure on
/// the network connection I/O.
pub fn set_connection_event_buffer_size(&mut self, n: usize) -> &mut Self {
pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
self.manager_config.task_event_buffer_size = n;
self
}
pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_incoming = Some(n);
self
}
pub fn set_outgoing_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing = Some(n);
self
}
pub fn set_established_per_peer_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_established_per_peer = Some(n);
self
}
pub fn set_outgoing_per_peer_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing_per_peer = Some(n);
/// Sets the connection limits to enforce.
pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.limits = limits;
self
}
}
@ -705,10 +689,9 @@ mod tests {
#[test]
fn set_executor() {
NetworkConfig::default()
.set_executor(Box::new(Dummy))
.set_executor(Box::new(|f| {
.with_executor(Box::new(Dummy))
.with_executor(Box::new(|f| {
async_std::task::spawn(f);
}));
}
}

View File

@ -310,7 +310,7 @@ where
}
/// The number of established connections to the peer.
pub fn num_connections(&self) -> usize {
pub fn num_connections(&self) -> u32 {
self.network.pool.num_peer_established(&self.peer_id)
}
@ -448,12 +448,6 @@ where
None
}
/// The number of ongoing dialing attempts, i.e. pending outgoing connections
/// to this peer.
pub fn num_attempts(&self) -> usize {
self.network.pool.num_peer_outgoing(&self.peer_id)
}
/// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer.
pub fn attempts<'b>(&'b mut self)
-> DialingAttemptIter<'b,
@ -672,6 +666,15 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Obtains the next dialing connection, if any.
pub fn next<'b>(&'b mut self) -> Option<DialingAttempt<'b, TInEvent>> {
// If the number of elements reduced, the current `DialingAttempt` has been
// aborted and iteration needs to continue from the previous position to
// account for the removed element.
let end = self.dialing.get(self.peer_id).map_or(0, |conns| conns.len());
if self.end > end {
self.end = end;
self.pos -= 1;
}
if self.pos == self.end {
return None
}

View File

@ -0,0 +1,162 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod util;
use futures::{ready, future::poll_fn};
use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
network::{NetworkEvent, NetworkConfig, ConnectionLimits},
};
use rand::Rng;
use std::task::Poll;
use util::{TestHandler, test_network};
#[test]
fn max_outgoing() {
let outgoing_limit = rand::thread_rng().gen_range(1, 10);
let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit));
let cfg = NetworkConfig::default().with_connection_limits(limits);
let mut network = test_network(cfg);
let target = PeerId::random();
for _ in 0 .. outgoing_limit {
network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.ok()
.expect("Unexpected connection limit.");
}
let err = network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
let info = network.info();
assert_eq!(info.num_peers(), 0);
assert_eq!(info.connection_counters().num_pending_outgoing(), outgoing_limit);
// Abort all dialing attempts.
let mut peer = network.peer(target.clone())
.into_dialing()
.expect("Unexpected peer state");
let mut attempts = peer.attempts();
while let Some(attempt) = attempts.next() {
attempt.abort();
}
assert_eq!(network.info().connection_counters().num_pending_outgoing(), 0);
}
#[test]
fn max_established_incoming() {
let limit = rand::thread_rng().gen_range(1, 10);
fn config(limit: u32) -> NetworkConfig {
let limits = ConnectionLimits::default().with_max_established_incoming(Some(limit));
NetworkConfig::default().with_connection_limits(limits)
}
let mut network1 = test_network(config(limit));
let mut network2 = test_network(config(limit));
let listen_addr = multiaddr![Ip4(std::net::Ipv4Addr::new(127,0,0,1)), Tcp(0u16)];
let _ = network1.listen_on(listen_addr.clone()).unwrap();
let (addr_sender, addr_receiver) = futures::channel::oneshot::channel();
let mut addr_sender = Some(addr_sender);
// Spawn the listener.
let listener = async_std::task::spawn(poll_fn(move |cx| {
loop {
match ready!(network1.poll(cx)) {
NetworkEvent::NewListenerAddress { listen_addr, .. } => {
addr_sender.take().unwrap().send(listen_addr).unwrap();
}
NetworkEvent::IncomingConnection { connection, .. } => {
network1.accept(connection, TestHandler()).unwrap();
}
NetworkEvent::ConnectionEstablished { .. } => {}
NetworkEvent::IncomingConnectionError {
error: PendingConnectionError::ConnectionLimit(err), ..
} => {
assert_eq!(err.limit, limit);
assert_eq!(err.limit, err.current);
let info = network1.info();
let counters = info.connection_counters();
assert_eq!(counters.num_established_incoming(), limit);
assert_eq!(counters.num_established(), limit);
return Poll::Ready(())
}
e => panic!("Unexpected network event: {:?}", e)
}
}
}));
// Spawn and block on the dialer.
async_std::task::block_on(async move {
let addr = addr_receiver.await.unwrap();
let mut n = 0;
let _ = network2.dial(&addr, TestHandler()).unwrap();
let mut expected_closed = None;
poll_fn(|cx| {
loop {
match ready!(network2.poll(cx)) {
NetworkEvent::ConnectionEstablished { connection, .. } => {
n += 1;
if n <= limit {
// Dial again until the limit is exceeded.
let id = network2.dial(&addr, TestHandler()).unwrap();
if n == limit {
// The the next dialing attempt exceeds the limit, this
// is the connection we expected to get closed.
expected_closed = Some(id);
}
} else {
// This connection exceeds the limit for the listener and
// is expected to close shortly. For the dialer, these connections
// will first appear established before the listener closes them as
// a result of the limit violation.
assert_eq!(Some(connection.id()), expected_closed);
}
}
NetworkEvent::ConnectionClosed { id, .. } => {
assert_eq!(Some(id), expected_closed);
let info = network2.info();
let counters = info.connection_counters();
assert_eq!(counters.num_established_outgoing(), limit);
assert_eq!(counters.num_established(), limit);
return Poll::Ready(())
}
e => panic!("Unexpected network event: {:?}", e)
}
}
}).await
});
// Wait for the listener to complete.
async_std::task::block_on(listener);
}

View File

@ -21,51 +21,22 @@
mod util;
use futures::prelude::*;
use libp2p_core::identity;
use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::multiaddr::multiaddr;
use libp2p_core::{
Network,
PeerId,
Transport,
connection::PendingConnectionError,
muxing::StreamMuxerBox,
network::{NetworkEvent, NetworkConfig},
transport,
upgrade,
};
use libp2p_noise as noise;
use rand::Rng;
use rand::seq::SliceRandom;
use std::{io, task::Poll};
use util::TestHandler;
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>;
fn new_network(cfg: NetworkConfig) -> TestNetwork {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&local_key).unwrap();
let transport: TestTransport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_mplex::MplexConfig::new())
.boxed()
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
})
.boxed();
TestNetwork::new(transport, local_public_key.into(), cfg)
}
use util::{TestHandler, test_network};
#[test]
fn deny_incoming_connec() {
// Checks whether refusing an incoming connection on a swarm triggers the correct events.
let mut swarm1 = new_network(NetworkConfig::default());
let mut swarm2 = new_network(NetworkConfig::default());
let mut swarm1 = test_network(NetworkConfig::default());
let mut swarm2 = test_network(NetworkConfig::default());
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
@ -121,7 +92,7 @@ fn dial_self() {
//
// The last two can happen in any order.
let mut swarm = new_network(NetworkConfig::default());
let mut swarm = test_network(NetworkConfig::default());
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let (local_address, mut swarm) = async_std::task::block_on(
@ -180,7 +151,7 @@ fn dial_self() {
fn dial_self_by_id() {
// Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
// place.
let mut swarm = new_network(NetworkConfig::default());
let mut swarm = test_network(NetworkConfig::default());
let peer_id = swarm.local_peer_id().clone();
assert!(swarm.peer(peer_id).into_disconnected().is_none());
}
@ -189,7 +160,7 @@ fn dial_self_by_id() {
fn multiple_addresses_err() {
// Tries dialing multiple addresses, and makes sure there's one dialing error per address.
let mut swarm = new_network(NetworkConfig::default());
let mut swarm = test_network(NetworkConfig::default());
let mut addresses = Vec::new();
for _ in 0 .. 3 {
@ -233,44 +204,3 @@ fn multiple_addresses_err() {
}
})).unwrap();
}
#[test]
fn connection_limit() {
let outgoing_per_peer_limit = rand::thread_rng().gen_range(1, 10);
let outgoing_limit = 2 * outgoing_per_peer_limit;
let mut cfg = NetworkConfig::default();
cfg.set_outgoing_per_peer_limit(outgoing_per_peer_limit);
cfg.set_outgoing_limit(outgoing_limit);
let mut network = new_network(cfg);
let target = PeerId::random();
for _ in 0 .. outgoing_per_peer_limit {
network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.ok()
.expect("Unexpected connection limit.");
}
let err = network.peer(target)
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");
assert_eq!(err.current, outgoing_per_peer_limit);
assert_eq!(err.limit, outgoing_per_peer_limit);
let target2 = PeerId::random();
for _ in outgoing_per_peer_limit .. outgoing_limit {
network.peer(target2.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.ok()
.expect("Unexpected connection limit.");
}
let err = network.peer(target2)
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
}

View File

@ -4,16 +4,42 @@
use futures::prelude::*;
use libp2p_core::{
Multiaddr,
PeerId,
Transport,
connection::{
ConnectionHandler,
ConnectionHandlerEvent,
Substream,
SubstreamEndpoint,
},
identity,
muxing::{StreamMuxer, StreamMuxerBox},
network::{Network, NetworkConfig},
transport,
upgrade,
};
use libp2p_mplex as mplex;
use libp2p_noise as noise;
use libp2p_tcp as tcp;
use std::{io, pin::Pin, task::Context, task::Poll};
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>;
/// Creates a new `TestNetwork` with a TCP transport.
pub fn test_network(cfg: NetworkConfig) -> TestNetwork {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&local_key).unwrap();
let transport: TestTransport = tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(mplex::MplexConfig::new())
.boxed();
TestNetwork::new(transport, local_public_key.into(), cfg)
}
pub struct TestHandler();
impl ConnectionHandler for TestHandler {
@ -35,7 +61,7 @@ impl ConnectionHandler for TestHandler {
fn poll(&mut self, _: &mut Context<'_>)
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>
{
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(())))
Poll::Pending
}
}

View File

@ -1,5 +1,8 @@
# 0.25.0 [unreleased]
- Changed parameters for connection limits from `usize` to `u32`.
Connection limits are now configured via `SwarmBuilder::connection_limits()`.
- Update `libp2p-core`.
- Expose configurable scores for external addresses, as well as

View File

@ -113,6 +113,7 @@ use libp2p_core::{
transport::{self, TransportError},
muxing::StreamMuxerBox,
network::{
ConnectionLimits,
Network,
NetworkInfo,
NetworkEvent,
@ -987,7 +988,7 @@ where TBehaviour: NetworkBehaviour,
/// By default, unless another executor has been configured,
/// [`SwarmBuilder::build`] will try to set up a `ThreadPool`.
pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.network_config.set_executor(e);
self.network_config = self.network_config.with_executor(e);
self
}
@ -1001,7 +1002,7 @@ where TBehaviour: NetworkBehaviour,
/// be sleeping more often than necessary. Increasing this value increases
/// the overall memory usage.
pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.network_config.set_notify_handler_buffer_size(n);
self.network_config = self.network_config.with_notify_handler_buffer_size(n);
self
}
@ -1029,28 +1030,13 @@ where TBehaviour: NetworkBehaviour,
/// event is emitted and the moment when it is received by the
/// [`NetworkBehaviour`].
pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
self.network_config.set_connection_event_buffer_size(n);
self.network_config = self.network_config.with_connection_event_buffer_size(n);
self
}
/// Configures a limit for the number of simultaneous incoming
/// connection attempts.
pub fn incoming_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_incoming_limit(n);
self
}
/// Configures a limit for the number of simultaneous outgoing
/// connection attempts.
pub fn outgoing_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_outgoing_limit(n);
self
}
/// Configures a limit for the number of simultaneous
/// established connections per peer.
pub fn peer_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_established_per_peer_limit(n);
/// Configures the connection limits.
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.network_config = self.network_config.with_connection_limits(limits);
self
}
@ -1064,20 +1050,21 @@ where TBehaviour: NetworkBehaviour,
.map(|info| info.protocol_name().to_vec())
.collect();
let mut network_cfg = self.network_config;
// If no executor has been explicitly configured, try to set up a thread pool.
if network_cfg.executor().is_none() {
let network_cfg = self.network_config.or_else_with_executor(|| {
match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
{
Ok(tp) => {
network_cfg.set_executor(Box::new(move |f| tp.spawn_ok(f)));
Some(Box::new(move |f| tp.spawn_ok(f)))
},
Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err)
Err(err) => {
log::warn!("Failed to create executor thread pool: {:?}", err);
None
}
}
}
});
let network = Network::new(self.transport, self.local_peer_id, network_cfg);