Permit concurrent dialing attempts per peer. (#1506)

* Permit concurrent dialing attempts per peer.

This is a follow-up to https://github.com/libp2p/rust-libp2p/pull/1440
and relates to https://github.com/libp2p/rust-libp2p/issues/925.
This change permits multiple dialing attempts per peer.
Note though that `libp2p-swarm` does not yet make use of this ability,
retaining the current behaviour. The essence of the changes are that the
`Peer` API now provides `Peer::dial()`, i.e. regardless of the state in
which the peer is. A dialing attempt is always made up of one or more
addresses tried sequentially, as before, but now there can be multiple
dialing attempts per peer. A configurable per-peer limit for outgoing
connections and thus concurrent dialing attempts is also included.

* Introduce `DialError` in `libp2p-swarm`.

For a cleaner API and to treat the case of no addresses
for a peer as an error, such that a `NetworkBehaviourAction::DialPeer`
request is always matched up with either `inject_connection_established`
or `inject_dial_error`.

* Fix rustdoc link.

* Add `DialPeerCondition::Always`.

* Adapt to master.

* Update changelog.
This commit is contained in:
Roman Borschel 2020-05-12 13:10:18 +02:00 committed by GitHub
parent 44c0c76981
commit 5ba7c4831b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 473 additions and 337 deletions

View File

@ -1,5 +1,9 @@
# Version ???
- `libp2p-core`, `libp2p-swarm`: Added support for multiple dialing
attempts per peer, with a configurable limit.
[PR 1506](https://github.com/libp2p/rust-libp2p/pull/1506)
- `libp2p-noise`: Added the `X25519Spec` protocol suite which uses
libp2p-noise-spec compliant signatures on static keys as well as the
`/noise` protocol upgrade, hence providing a libp2p-noise-spec compliant

View File

@ -225,12 +225,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
Ok(self.add_pending(future, handler, endpoint, None))
}
@ -267,6 +262,11 @@ where
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
if let Some(peer) = &info.peer_id {
self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?;
}
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
@ -465,6 +465,13 @@ where
self.established.get(peer).map_or(0, |conns| conns.len())
}
/// Counts the number of pending outgoing connections to the given peer.
pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize {
self.iter_pending_outgoing()
.filter(|info| info.peer_id == Some(peer))
.count()
}
/// Returns an iterator over all established connections of `peer`.
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
-> EstablishedConnectionIter<'a,
@ -837,6 +844,7 @@ pub struct PoolLimits {
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
pub max_outgoing_per_peer: Option<usize>,
}
impl PoolLimits {
@ -854,6 +862,20 @@ impl PoolLimits {
Self::check(current, self.max_outgoing)
}
fn check_incoming<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_incoming)
}
fn check_outgoing_per_peer<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing_per_peer)
}
fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize

View File

@ -50,6 +50,7 @@ use crate::{
};
use fnv::{FnvHashMap};
use futures::{prelude::*, future};
use smallvec::SmallVec;
use std::{
collections::hash_map,
convert::TryFrom as _,
@ -78,21 +79,17 @@ where
/// The ongoing dialing attempts.
///
/// The `Network` enforces a single ongoing dialing attempt per peer,
/// even if multiple (established) connections per peer are allowed.
/// However, a single dialing attempt operates on a list of addresses
/// to connect to, which can be extended with new addresses while
/// the connection attempt is still in progress. Thereby each
/// dialing attempt is associated with a new connection and hence a new
/// connection ID.
/// There may be multiple ongoing dialing attempts to the same peer.
/// Each dialing attempt is associated with a new connection and hence
/// a new connection ID.
///
/// > **Note**: `dialing` must be consistent with the pending outgoing
/// > connections in `pool`. That is, for every entry in `dialing`
/// > there must exist a pending outgoing connection in `pool` with
/// > the same connection ID. This is ensured by the implementation of
/// > `Network` (see `dial_peer_impl` and `on_connection_failed`)
/// > together with the implementation of `DialingConnection::abort`.
dialing: FnvHashMap<TPeerId, peer::DialingAttempt>,
/// > together with the implementation of `DialingAttempt::abort`.
dialing: FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
}
impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
@ -381,8 +378,11 @@ where
Poll::Pending => return Poll::Pending,
Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
match self.dialing.entry(connection.peer_id().clone()) {
hash_map::Entry::Occupied(e) if e.get().id == connection.id() => {
e.remove();
hash_map::Entry::Occupied(mut e) => {
e.get_mut().retain(|s| s.current.0 != connection.id());
if e.get().is_empty() {
e.remove();
}
},
_ => {}
}
@ -453,7 +453,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans, TConnInfo, TPee
transport: TTrans,
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<TPeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
where
@ -489,14 +489,12 @@ where
};
if let Ok(id) = &result {
let former = dialing.insert(opts.peer,
peer::DialingAttempt {
id: *id,
current: opts.address,
next: opts.remaining,
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
remaining: opts.remaining,
},
);
debug_assert!(former.is_none());
}
result
@ -508,7 +506,7 @@ where
/// If the failed connection attempt was a dialing attempt and there
/// are more addresses to try, new `DialingOpts` are returned.
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
@ -521,27 +519,34 @@ where
TPeerId: Eq + Hash + Clone,
{
// Check if the failed connection is associated with a dialing attempt.
// TODO: could be more optimal than iterating over everything
let dialing_peer = dialing.iter() // (1)
.find(|(_, a)| a.id == id)
.map(|(p, _)| p.clone());
let dialing_failed = dialing.iter_mut()
.find_map(|(peer, attempts)| {
if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
let attempt = attempts.remove(pos);
let last = attempts.is_empty();
Some((peer.clone(), attempt, last))
} else {
None
}
});
if let Some(peer_id) = dialing_peer {
// A pending outgoing connection to a known peer failed.
let mut attempt = dialing.remove(&peer_id).expect("by (1)");
if let Some((peer_id, mut attempt, last)) = dialing_failed {
if last {
dialing.remove(&peer_id);
}
let num_remain = u32::try_from(attempt.next.len()).unwrap();
let failed_addr = attempt.current.clone();
let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
let failed_addr = attempt.current.1.clone();
let (opts, attempts_remaining) =
if num_remain > 0 {
if let Some(handler) = handler {
let next_attempt = attempt.next.remove(0);
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id.clone(),
handler,
address: next_attempt,
remaining: attempt.next
remaining: attempt.remaining
};
(Some(opts), num_remain)
} else {
@ -581,9 +586,13 @@ where
/// Information about the network obtained by [`Network::info()`].
#[derive(Clone, Debug)]
pub struct NetworkInfo {
/// The total number of connected peers.
pub num_peers: usize,
/// The total number of connections, both established and pending.
pub num_connections: usize,
/// The total number of pending connections, both incoming and outgoing.
pub num_connections_pending: usize,
/// The total number of established connections.
pub num_connections_established: usize,
}
@ -633,4 +642,9 @@ impl NetworkConfig {
self.pool_limits.max_established_per_peer = Some(n);
self
}
pub fn set_outgoing_per_peer_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing_per_peer = Some(n);
self
}
}

View File

@ -35,8 +35,11 @@ use crate::{
IntoConnectionHandler,
PendingConnection,
Substream,
pool::Pool,
},
};
use fnv::FnvHashMap;
use smallvec::SmallVec;
use std::{
collections::hash_map,
error,
@ -47,6 +50,10 @@ use super::{Network, DialingOpts};
/// The possible representations of a peer in a [`Network`], as
/// seen by the local node.
///
/// > **Note**: In any state there may always be a pending incoming
/// > connection attempt from the peer, however, the remote identity
/// > of a peer is only known once a connection is fully established.
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
@ -63,10 +70,6 @@ where
/// There exists no established connection to the peer and there is
/// currently no ongoing dialing (i.e. outgoing connection) attempt
/// in progress.
///
/// > **Note**: In this state there may always be a pending incoming
/// > connection attempt from the peer, however, the remote identity
/// > of a peer is only known once a connection is fully established.
Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
/// The peer represents the local node.
@ -82,20 +85,20 @@ where
TPeerId: fmt::Debug + Eq + Hash,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
Peer::Connected(ConnectedPeer { ref peer_id, .. }) => {
match self {
Peer::Connected(p) => {
f.debug_struct("Connected")
.field("peer_id", peer_id)
.field("peer", &p)
.finish()
}
Peer::Dialing(DialingPeer { ref peer_id, .. } ) => {
f.debug_struct("DialingPeer")
.field("peer_id", peer_id)
Peer::Dialing(p) => {
f.debug_struct("Dialing")
.field("peer", &p)
.finish()
}
Peer::Disconnected(DisconnectedPeer { ref peer_id, .. }) => {
Peer::Disconnected(p) => {
f.debug_struct("Disconnected")
.field("peer_id", peer_id)
.field("peer", &p)
.finish()
}
Peer::Local => {
@ -164,12 +167,11 @@ where
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
@ -208,7 +210,41 @@ where
}
}
/// Converts the peer into a `ConnectedPeer`, if there an established connection exists.
/// Initiates a new dialing attempt to this peer using the given addresses.
///
/// The connection ID of the first connection attempt, i.e. to `address`,
/// is returned, together with a [`DialingPeer`] for further use. The
/// `remaining` addresses are tried in order in subsequent connection
/// attempts in the context of the same dialing attempt, if the connection
/// attempt to the first address fails.
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
ConnectionLimit
>
where
I: IntoIterator<Item = Multiaddr>,
{
let (peer_id, network) = match self {
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
};
let id = network.dial_peer(DialingOpts {
peer: peer_id.clone(),
handler,
address,
remaining: remaining.into_iter().collect(),
})?;
Ok((id, DialingPeer { network, peer_id }))
}
/// Converts the peer into a `ConnectedPeer`, if an established connection exists.
///
/// Succeeds if the there is at least one established connection to the peer.
pub fn into_connected(self) -> Option<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
@ -221,6 +257,8 @@ where
}
/// Converts the peer into a `DialingPeer`, if a dialing attempt exists.
///
/// Succeeds if the there is at least one pending outgoing connection to the peer.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
@ -245,7 +283,8 @@ where
}
/// The representation of a peer in a [`Network`] to whom at least
/// one established connection exists.
/// one established connection exists. There may also be additional ongoing
/// dialing attempts to the peer.
pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
@ -267,57 +306,12 @@ where
&self.peer_id
}
/// Attempts to establish a new connection to this peer using the given addresses,
/// if there is currently no ongoing dialing attempt.
///
/// Existing established connections are not affected.
///
/// > **Note**: If there is an ongoing dialing attempt, a `DialingPeer`
/// > is returned with the given addresses and handler being ignored.
/// > You may want to check [`ConnectedPeer::is_dialing`] first.
pub fn connect<I, TMuxer>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
ConnectionLimit>
where
I: IntoIterator<Item = Multiaddr>,
THandler: Send + 'static,
THandler::Handler: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TConnInfo: fmt::Debug + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
if self.network.dialing.contains_key(&self.peer_id) {
let peer = DialingPeer {
network: self.network,
peer_id: self.peer_id
};
Ok(peer)
} else {
self.network.dial_peer(DialingOpts {
peer: self.peer_id.clone(),
handler,
address,
remaining: remaining.into_iter().collect(),
})?;
Ok(DialingPeer {
network: self.network,
peer_id: self.peer_id,
})
}
/// Returns the `ConnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
Peer::Connected(self)
}
/// Obtains an existing connection to the peer.
/// Obtains an established connection to the peer by ID.
pub fn connection<'b>(&'b mut self, id: ConnectionId)
-> Option<EstablishedConnection<'b, TInEvent, TConnInfo, TPeerId>>
{
@ -348,7 +342,7 @@ where
}
}
/// Gets an iterator over all established connections of the peer.
/// Gets an iterator over all established connections to the peer.
pub fn connections<'b>(&'b mut self) ->
EstablishedConnectionIter<'b,
impl Iterator<Item = ConnectionId>,
@ -386,11 +380,13 @@ impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug f
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: fmt::Debug,
TPeerId: Eq + Hash + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ConnectedPeer")
.field("peer_id", &self.peer_id)
.field("established", &self.network.pool.iter_peer_established_info(&self.peer_id))
.field("attempts", &self.network.dialing.get(&self.peer_id))
.finish()
}
}
@ -419,8 +415,16 @@ where
&self.peer_id
}
/// Disconnects from this peer, closing all pending connections.
pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
/// Returns the `DialingPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
Peer::Dialing(self)
}
/// Disconnects from this peer, closing all established connections and
/// aborting all dialing attempts.
pub fn disconnect(self)
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
{
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
}
@ -443,20 +447,50 @@ where
}
}
/// Obtains the connection that is currently being established.
pub fn connection<'b>(&'b mut self) -> DialingConnection<'b, TInEvent, TConnInfo, TPeerId> {
let attempt = match self.network.dialing.entry(self.peer_id.clone()) {
hash_map::Entry::Occupied(e) => e,
_ => unreachable!("By `Peer::new` and the definition of `DialingPeer`.")
};
let inner = self.network.pool
.get_outgoing(attempt.get().id)
.expect("By consistency of `network.pool` with `network.dialing`.");
DialingConnection {
inner, dialing: attempt, peer_id: &self.peer_id
/// Obtains a dialing attempt to the peer by connection ID of
/// the current connection attempt.
pub fn attempt<'b>(&'b mut self, id: ConnectionId)
-> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>>
{
if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id.clone()) {
if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
if let Some(inner) = self.network.pool.get_outgoing(id) {
return Some(DialingAttempt { pos, inner, attempts })
}
}
}
None
}
/// The number of ongoing dialing attempts, i.e. pending outgoing connections
/// to this peer.
pub fn num_attempts(&self) -> usize {
self.network.pool.num_peer_outgoing(&self.peer_id)
}
/// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer.
pub fn attempts<'b>(&'b mut self)
-> DialingAttemptIter<'b,
TInEvent,
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error,
TConnInfo,
TPeerId>
{
DialingAttemptIter::new(&self.peer_id, &mut self.network.pool, &mut self.network.dialing)
}
/// Obtains some dialing connection to the peer.
///
/// At least one dialing connection is guaranteed to exist on a `DialingPeer`.
pub fn some_attempt<'b>(&'b mut self)
-> DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>
{
self.attempts()
.into_first()
.expect("By `Peer::new` and the definition of `DialingPeer`.")
}
}
@ -465,11 +499,13 @@ impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug f
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: fmt::Debug,
TPeerId: Eq + Hash + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("DialingPeer")
.field("peer_id", &self.peer_id)
.field("established", &self.network.pool.iter_peer_established_info(&self.peer_id))
.field("attempts", &self.network.dialing.get(&self.peer_id))
.finish()
}
}
@ -500,46 +536,19 @@ where
}
}
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}
/// Attempts to connect to this peer using the given addresses.
pub fn connect<TIter>(self, first: Multiaddr, rest: TIter, handler: THandler)
-> Result<DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
ConnectionLimit>
where
TIter: IntoIterator<Item = Multiaddr>,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
{
self.network.dial_peer(DialingOpts {
peer: self.peer_id.clone(),
handler,
address: first,
remaining: rest.into_iter().collect(),
})?;
Ok(DialingPeer {
network: self.network,
peer_id: self.peer_id,
})
/// Returns the `DisconnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
Peer::Disconnected(self)
}
/// Moves the peer into a connected state by supplying an existing
@ -550,8 +559,7 @@ where
/// # Panics
///
/// Panics if `connected.peer_id()` does not identify the current peer.
///
pub fn set_connected(
pub fn set_connected<TMuxer>(
self,
connected: Connected<TConnInfo>,
connection: Connection<TMuxer, THandler::Handler>,
@ -559,8 +567,17 @@ where
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
ConnectionLimit
> where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: Send + 'static,
TTrans::Error: Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
TPeerId: Eq + Hash + Clone + fmt::Debug,
TPeerId: Eq + Hash + Clone + Send + fmt::Debug + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
{
if connected.peer_id() != &self.peer_id {
panic!("Invalid peer ID given: {:?}. Expected: {:?}", connected.peer_id(), self.peer_id)
@ -574,71 +591,142 @@ where
}
}
/// Attempt to reach a peer.
/// The (internal) state of a `DialingAttempt`, tracking the
/// current connection attempt as well as remaining addresses.
#[derive(Debug, Clone)]
pub(super) struct DialingAttempt {
/// Identifier for the reach attempt.
pub(super) id: ConnectionId,
/// Multiaddr currently being attempted.
pub(super) current: Multiaddr,
pub(super) struct DialingState {
/// The ID and (remote) address of the current connection attempt.
pub(super) current: (ConnectionId, Multiaddr),
/// Multiaddresses to attempt if the current one fails.
pub(super) next: Vec<Multiaddr>,
pub(super) remaining: Vec<Multiaddr>,
}
/// A `DialingConnection` is a [`PendingConnection`] where the local peer
/// has the role of the dialer (i.e. initiator) and the (expected) remote
/// peer ID is known.
pub struct DialingConnection<'a, TInEvent, TConnInfo, TPeerId> {
peer_id: &'a TPeerId,
/// A `DialingAttempt` is an ongoing outgoing connection attempt to
/// a known / expected remote peer ID and a list of alternative addresses
/// to connect to, if the current connection attempt fails.
pub struct DialingAttempt<'a, TInEvent, TConnInfo, TPeerId> {
/// The underlying pending connection in the `Pool`.
inner: PendingConnection<'a, TInEvent, TConnInfo, TPeerId>,
dialing: hash_map::OccupiedEntry<'a, TPeerId, DialingAttempt>,
/// All current dialing attempts of the peer.
attempts: hash_map::OccupiedEntry<'a, TPeerId, SmallVec<[DialingState; 10]>>,
/// The position of the current `DialingState` of this connection in the `attempts`.
pos: usize,
}
impl<'a, TInEvent, TConnInfo, TPeerId>
DialingConnection<'a, TInEvent, TConnInfo, TPeerId>
DialingAttempt<'a, TInEvent, TConnInfo, TPeerId>
{
/// Returns the local connection ID.
/// Returns the ID of the current connection attempt.
pub fn id(&self) -> ConnectionId {
self.inner.id()
}
/// Returns the (expected) peer ID of the ongoing connection attempt.
/// Returns the (expected) peer ID of the dialing attempt.
pub fn peer_id(&self) -> &TPeerId {
self.peer_id
self.attempts.key()
}
/// Returns information about this endpoint of the connection attempt.
pub fn endpoint(&self) -> &ConnectedPoint {
self.inner.endpoint()
}
/// Aborts the connection attempt.
pub fn abort(self)
where
TPeerId: Eq + Hash + Clone,
{
self.dialing.remove();
self.inner.abort();
}
/// Adds new candidate addresses to the end of the addresses used
/// in the ongoing dialing process.
///
/// Duplicates are ignored.
pub fn add_addresses(&mut self, addrs: impl IntoIterator<Item = Multiaddr>) {
for addr in addrs {
self.add_address(addr);
/// Returns the remote address of the current connection attempt.
pub fn address(&self) -> &Multiaddr {
match self.inner.endpoint() {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { .. } => unreachable!("by definition of a `DialingAttempt`.")
}
}
/// Adds an address to the end of the addresses used in the ongoing
/// dialing process.
/// Aborts the dialing attempt.
///
/// Duplicates are ignored.
/// Aborting a dialing attempt involves aborting the current connection
/// attempt and dropping any remaining addresses given to [`Peer::dial()`]
/// that have not yet been tried.
pub fn abort(mut self) {
self.attempts.get_mut().remove(self.pos);
if self.attempts.get().is_empty() {
self.attempts.remove();
}
self.inner.abort();
}
/// Adds an address to the end of the remaining addresses
/// for this dialing attempt. Duplicates are ignored.
pub fn add_address(&mut self, addr: Multiaddr) {
if self.dialing.get().next.iter().all(|a| a != &addr) {
self.dialing.get_mut().next.push(addr);
let remaining = &mut self.attempts.get_mut()[self.pos].remaining;
if remaining.iter().all(|a| a != &addr) {
remaining.push(addr);
}
}
}
/// An iterator over the ongoing dialing attempts to a peer.
pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
/// The peer whose dialing attempts are being iterated.
peer_id: &'a TPeerId,
/// The underlying connection `Pool` of the `Network`.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
/// The state of all current dialing attempts known to the `Network`.
///
/// Ownership of the `OccupiedEntry` for `peer_id` containing all attempts must be
/// borrowed to each `DialingAttempt` in order for it to remove the entry if the
/// last dialing attempt is aborted.
dialing: &'a mut FnvHashMap<TPeerId, SmallVec<[DialingState; 10]>>,
/// The current position of the iterator in `dialing[peer_id]`.
pos: usize,
/// The total number of elements in `dialing[peer_id]` to iterate over.
end: usize,
}
// Note: Ideally this would be an implementation of `Iterator`, but that
// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
// a different definition of `Iterator`.
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
fn new(
peer_id: &'a TPeerId,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
dialing: &'a mut FnvHashMap<TPeerId, SmallVec<[DialingState; 10]>>,
) -> Self {
let end = dialing.get(peer_id).map_or(0, |conns| conns.len());
Self { pos: 0, end, pool, dialing, peer_id }
}
/// Obtains the next dialing connection, if any.
pub fn next<'b>(&'b mut self) -> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>> {
if self.pos == self.end {
return None
}
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) {
let id = attempts.get()[self.pos].current.0;
if let Some(inner) = self.pool.get_outgoing(id) {
let conn = DialingAttempt { pos: self.pos, inner, attempts };
self.pos += 1;
return Some(conn)
}
}
None
}
/// Returns the first connection, if any, consuming the iterator.
pub fn into_first<'b>(self)
-> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>>
where 'a: 'b
{
if self.pos == self.end {
return None
}
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) {
let id = attempts.get()[self.pos].current.0;
if let Some(inner) = self.pool.get_outgoing(id) {
return Some(DialingAttempt { pos: self.pos, inner, attempts })
}
}
None
}
}

View File

@ -22,47 +22,60 @@ mod util;
use futures::prelude::*;
use libp2p_core::identity;
use libp2p_core::multiaddr::multiaddr;
use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{
Network,
PeerId,
Transport,
connection::PendingConnectionError,
muxing::StreamMuxerBox,
network::NetworkEvent,
network::{NetworkEvent, NetworkConfig},
transport,
upgrade,
};
use rand::Rng;
use rand::seq::SliceRandom;
use std::{io, task::Poll};
use std::{io, error::Error, fmt, task::Poll};
use util::TestHandler;
type TestNetwork<TTrans> = Network<TTrans, (), (), TestHandler>;
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
type TestTransport = transport::boxed::Boxed<(PeerId, StreamMuxerBox), BoxError>;
#[derive(Debug)]
struct BoxError(Box<dyn Error + Send + 'static>);
impl Error for BoxError {}
impl fmt::Display for BoxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Transport error: {}", self.0)
}
}
fn new_network(cfg: NetworkConfig) -> TestNetwork {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport: TestTransport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
})
.map_err(|e| BoxError(Box::new(e)))
.boxed();
TestNetwork::new(transport, local_public_key.into(), cfg)
}
#[test]
fn deny_incoming_connec() {
// Checks whether refusing an incoming connection on a swarm triggers the correct events.
let mut swarm1 = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let mut swarm2 = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let mut swarm1 = new_network(NetworkConfig::default());
let mut swarm2 = new_network(NetworkConfig::default());
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
@ -76,8 +89,7 @@ fn deny_incoming_connec() {
swarm2
.peer(swarm1.local_peer_id().clone())
.into_disconnected().unwrap()
.connect(address.clone(), Vec::new(), TestHandler())
.dial(address.clone(), Vec::new(), TestHandler())
.unwrap();
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
@ -119,22 +131,7 @@ fn dial_self() {
//
// The last two can happen in any order.
let mut swarm = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
})
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let mut swarm = new_network(NetworkConfig::default());
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let (local_address, mut swarm) = async_std::task::block_on(
@ -193,36 +190,16 @@ fn dial_self() {
fn dial_self_by_id() {
// Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
// place.
let mut swarm = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let mut swarm = new_network(NetworkConfig::default());
let peer_id = swarm.local_peer_id().clone();
assert!(swarm.peer(peer_id).into_disconnected().is_none());
}
#[test]
fn multiple_addresses_err() {
// Tries dialing multiple addresses, and makes sure there's one dialing error per addresses.
// Tries dialing multiple addresses, and makes sure there's one dialing error per address.
let mut swarm = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let mut swarm = new_network(NetworkConfig::default());
let mut addresses = Vec::new();
for _ in 0 .. 3 {
@ -238,8 +215,7 @@ fn multiple_addresses_err() {
let target = PeerId::random();
swarm.peer(target.clone())
.into_disconnected().unwrap()
.connect(first, rest, TestHandler())
.dial(first, rest, TestHandler())
.unwrap();
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
@ -267,3 +243,44 @@ fn multiple_addresses_err() {
}
})).unwrap();
}
#[test]
fn connection_limit() {
let outgoing_per_peer_limit = rand::thread_rng().gen_range(1, 10);
let outgoing_limit = 2 * outgoing_per_peer_limit;
let mut cfg = NetworkConfig::default();
cfg.set_outgoing_per_peer_limit(outgoing_per_peer_limit);
cfg.set_outgoing_limit(outgoing_limit);
let mut network = new_network(cfg);
let target = PeerId::random();
for _ in 0 .. outgoing_per_peer_limit {
network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.ok()
.expect("Unexpected connection limit.");
}
let err = network.peer(target)
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");
assert_eq!(err.current, outgoing_per_peer_limit);
assert_eq!(err.limit, outgoing_per_peer_limit);
let target2 = PeerId::random();
for _ in outgoing_per_peer_limit .. outgoing_limit {
network.peer(target2.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.ok()
.expect("Unexpected connection limit.");
}
let err = network.peer(target2)
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
}

View File

@ -291,12 +291,10 @@ pub enum DialPeerCondition {
/// If there is an ongoing dialing attempt, the addresses reported by
/// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing
/// dialing attempt, ignoring duplicates.
///
/// This condition implies [`DialPeerCondition::Disconnected`].
NotDialing,
// TODO: Once multiple dialing attempts per peer are permitted.
// See https://github.com/libp2p/rust-libp2p/pull/1506.
// Always,
/// A new dialing attempt is always initiated, only subject to the
/// configured connection limits.
Always,
}
impl Default for DialPeerCondition {

View File

@ -115,7 +115,6 @@ use libp2p_core::{
NetworkInfo,
NetworkEvent,
NetworkConfig,
Peer,
peer::ConnectedPeer,
},
upgrade::ProtocolName,
@ -379,70 +378,31 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
///
/// If a new dialing attempt has been initiated, `Ok(true)` is returned.
///
/// If there is an ongoing dialing attempt, the current addresses of the
/// peer, as reported by [`NetworkBehaviour::addresses_of_peer`] are added
/// to the ongoing dialing attempt, ignoring duplicates. In this case no
/// new dialing attempt is initiated.
///
/// If no new dialing attempt has been initiated, meaning there is an ongoing
/// dialing attempt or `addresses_of_peer` reports no addresses, `Ok(false)`
/// is returned.
pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<bool, ConnectionLimit> {
pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
let mut addrs = me.behaviour.addresses_of_peer(peer_id).into_iter();
match me.network.peer(peer_id.clone()) {
Peer::Disconnected(peer) => {
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
match peer.connect(first, addrs, handler) {
Ok(_) => return Ok(true),
Err(error) => {
log::debug!(
"New dialing attempt to disconnected peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
return Err(error)
}
}
} else {
log::debug!(
"New dialing attempt to disconnected peer {:?} failed: no address.",
peer_id
);
me.behaviour.inject_dial_failure(&peer_id);
}
Ok(false)
},
Peer::Connected(peer) => {
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
match peer.connect(first, addrs, handler) {
Ok(_) => return Ok(true),
Err(error) => {
log::debug!(
"New dialing attempt to connected peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
return Err(error)
}
}
} else {
log::debug!(
"New dialing attempt to disconnected peer {:?} failed: no address.",
peer_id
);
me.behaviour.inject_dial_failure(&peer_id);
}
Ok(false)
}
Peer::Dialing(mut peer) => {
peer.connection().add_addresses(addrs);
Ok(false)
},
Peer::Local => {
me.behaviour.inject_dial_failure(&peer_id);
Err(ConnectionLimit { current: 0, limit: 0 })
}
let peer = me.network.peer(peer_id.clone());
let result =
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
peer.dial(first, addrs, handler)
.map(|_| ())
.map_err(DialError::ConnectionLimit)
} else {
Err(DialError::NoAddresses)
};
if let Err(error) = &result {
log::debug!(
"New dialing attempt to peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
}
result
}
/// Returns an iterator that produces the list of addresses we're listening on.
@ -721,18 +681,22 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
if !this.network.is_dialing(&peer_id) => true,
_ => false
};
if condition_matched {
if let Ok(true) = ExpandedSwarm::dial(this, &peer_id) {
return Poll::Ready(SwarmEvent::Dialing(peer_id));
if ExpandedSwarm::dial(this, &peer_id).is_ok() {
return Poll::Ready(SwarmEvent::Dialing(peer_id))
}
} else {
// Even if the condition for a _new_ dialing attempt is not met,
// we always add any potentially new addresses of the peer to an
// ongoing dialing attempt, if there is one.
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
peer_id, condition);
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() {
let addrs = this.behaviour.addresses_of_peer(peer.id());
peer.connection().add_addresses(addrs);
let mut attempt = peer.some_attempt();
for addr in addrs {
attempt.add_address(addr);
}
}
}
}
@ -1104,6 +1068,35 @@ where TBehaviour: NetworkBehaviour,
}
}
/// The possible failures of [`ExpandedSwarm::dial`].
#[derive(Debug)]
pub enum DialError {
/// The configured limit for simultaneous outgoing connections
/// has been reached.
ConnectionLimit(ConnectionLimit),
/// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
/// for the peer to dial.
NoAddresses
}
impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer.")
}
}
}
impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::NoAddresses => None
}
}
}
/// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything.
#[derive(Clone, Default)]
pub struct DummyBehaviour {