diff --git a/CHANGELOG.md b/CHANGELOG.md index b585911a..fcd438ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Version ??? +- `libp2p-core`, `libp2p-swarm`: Added support for multiple dialing + attempts per peer, with a configurable limit. + [PR 1506](https://github.com/libp2p/rust-libp2p/pull/1506) + - `libp2p-noise`: Added the `X25519Spec` protocol suite which uses libp2p-noise-spec compliant signatures on static keys as well as the `/noise` protocol upgrade, hence providing a libp2p-noise-spec compliant diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index f8c9e1a7..7da7efa8 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -225,12 +225,7 @@ where TPeerId: Clone + Send + 'static, { let endpoint = info.to_connected_point(); - if let Some(limit) = self.limits.max_incoming { - let current = self.iter_pending_incoming().count(); - if current >= limit { - return Err(ConnectionLimit { limit, current }) - } - } + self.limits.check_incoming(|| self.iter_pending_incoming().count())?; Ok(self.add_pending(future, handler, endpoint, None)) } @@ -267,6 +262,11 @@ where TPeerId: Clone + Send + 'static, { self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?; + + if let Some(peer) = &info.peer_id { + self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?; + } + let endpoint = info.to_connected_point(); Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned())) } @@ -465,6 +465,13 @@ where self.established.get(peer).map_or(0, |conns| conns.len()) } + /// Counts the number of pending outgoing connections to the given peer. + pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize { + self.iter_pending_outgoing() + .filter(|info| info.peer_id == Some(peer)) + .count() + } + /// Returns an iterator over all established connections of `peer`. pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId) -> EstablishedConnectionIter<'a, @@ -837,6 +844,7 @@ pub struct PoolLimits { pub max_outgoing: Option, pub max_incoming: Option, pub max_established_per_peer: Option, + pub max_outgoing_per_peer: Option, } impl PoolLimits { @@ -854,6 +862,20 @@ impl PoolLimits { Self::check(current, self.max_outgoing) } + fn check_incoming(&self, current: F) -> Result<(), ConnectionLimit> + where + F: FnOnce() -> usize + { + Self::check(current, self.max_incoming) + } + + fn check_outgoing_per_peer(&self, current: F) -> Result<(), ConnectionLimit> + where + F: FnOnce() -> usize + { + Self::check(current, self.max_outgoing_per_peer) + } + fn check(current: F, limit: Option) -> Result<(), ConnectionLimit> where F: FnOnce() -> usize diff --git a/core/src/network.rs b/core/src/network.rs index ccb21e84..73240abd 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -50,6 +50,7 @@ use crate::{ }; use fnv::{FnvHashMap}; use futures::{prelude::*, future}; +use smallvec::SmallVec; use std::{ collections::hash_map, convert::TryFrom as _, @@ -78,21 +79,17 @@ where /// The ongoing dialing attempts. /// - /// The `Network` enforces a single ongoing dialing attempt per peer, - /// even if multiple (established) connections per peer are allowed. - /// However, a single dialing attempt operates on a list of addresses - /// to connect to, which can be extended with new addresses while - /// the connection attempt is still in progress. Thereby each - /// dialing attempt is associated with a new connection and hence a new - /// connection ID. + /// There may be multiple ongoing dialing attempts to the same peer. + /// Each dialing attempt is associated with a new connection and hence + /// a new connection ID. /// /// > **Note**: `dialing` must be consistent with the pending outgoing /// > connections in `pool`. That is, for every entry in `dialing` /// > there must exist a pending outgoing connection in `pool` with /// > the same connection ID. This is ensured by the implementation of /// > `Network` (see `dial_peer_impl` and `on_connection_failed`) - /// > together with the implementation of `DialingConnection::abort`. - dialing: FnvHashMap, + /// > together with the implementation of `DialingAttempt::abort`. + dialing: FnvHashMap>, } impl fmt::Debug for @@ -381,8 +378,11 @@ where Poll::Pending => return Poll::Pending, Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => { match self.dialing.entry(connection.peer_id().clone()) { - hash_map::Entry::Occupied(e) if e.get().id == connection.id() => { - e.remove(); + hash_map::Entry::Occupied(mut e) => { + e.get_mut().retain(|s| s.current.0 != connection.id()); + if e.get().is_empty() { + e.remove(); + } }, _ => {} } @@ -453,7 +453,7 @@ fn dial_peer_impl::Error, TConnInfo, TPeerId>, - dialing: &mut FnvHashMap, + dialing: &mut FnvHashMap>, opts: DialingOpts ) -> Result where @@ -489,14 +489,12 @@ where }; if let Ok(id) = &result { - let former = dialing.insert(opts.peer, - peer::DialingAttempt { - id: *id, - current: opts.address, - next: opts.remaining, + dialing.entry(opts.peer).or_default().push( + peer::DialingState { + current: (*id, opts.address), + remaining: opts.remaining, }, ); - debug_assert!(former.is_none()); } result @@ -508,7 +506,7 @@ where /// If the failed connection attempt was a dialing attempt and there /// are more addresses to try, new `DialingOpts` are returned. fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>( - dialing: &mut FnvHashMap, + dialing: &mut FnvHashMap>, id: ConnectionId, endpoint: ConnectedPoint, error: PendingConnectionError, @@ -521,27 +519,34 @@ where TPeerId: Eq + Hash + Clone, { // Check if the failed connection is associated with a dialing attempt. - // TODO: could be more optimal than iterating over everything - let dialing_peer = dialing.iter() // (1) - .find(|(_, a)| a.id == id) - .map(|(p, _)| p.clone()); + let dialing_failed = dialing.iter_mut() + .find_map(|(peer, attempts)| { + if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) { + let attempt = attempts.remove(pos); + let last = attempts.is_empty(); + Some((peer.clone(), attempt, last)) + } else { + None + } + }); - if let Some(peer_id) = dialing_peer { - // A pending outgoing connection to a known peer failed. - let mut attempt = dialing.remove(&peer_id).expect("by (1)"); + if let Some((peer_id, mut attempt, last)) = dialing_failed { + if last { + dialing.remove(&peer_id); + } - let num_remain = u32::try_from(attempt.next.len()).unwrap(); - let failed_addr = attempt.current.clone(); + let num_remain = u32::try_from(attempt.remaining.len()).unwrap(); + let failed_addr = attempt.current.1.clone(); let (opts, attempts_remaining) = if num_remain > 0 { if let Some(handler) = handler { - let next_attempt = attempt.next.remove(0); + let next_attempt = attempt.remaining.remove(0); let opts = DialingOpts { peer: peer_id.clone(), handler, address: next_attempt, - remaining: attempt.next + remaining: attempt.remaining }; (Some(opts), num_remain) } else { @@ -581,9 +586,13 @@ where /// Information about the network obtained by [`Network::info()`]. #[derive(Clone, Debug)] pub struct NetworkInfo { + /// The total number of connected peers. pub num_peers: usize, + /// The total number of connections, both established and pending. pub num_connections: usize, + /// The total number of pending connections, both incoming and outgoing. pub num_connections_pending: usize, + /// The total number of established connections. pub num_connections_established: usize, } @@ -633,4 +642,9 @@ impl NetworkConfig { self.pool_limits.max_established_per_peer = Some(n); self } + + pub fn set_outgoing_per_peer_limit(&mut self, n: usize) -> &mut Self { + self.pool_limits.max_outgoing_per_peer = Some(n); + self + } } diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index b06be772..8f9dd099 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -35,8 +35,11 @@ use crate::{ IntoConnectionHandler, PendingConnection, Substream, + pool::Pool, }, }; +use fnv::FnvHashMap; +use smallvec::SmallVec; use std::{ collections::hash_map, error, @@ -47,6 +50,10 @@ use super::{Network, DialingOpts}; /// The possible representations of a peer in a [`Network`], as /// seen by the local node. +/// +/// > **Note**: In any state there may always be a pending incoming +/// > connection attempt from the peer, however, the remote identity +/// > of a peer is only known once a connection is fully established. pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> where TTrans: Transport, @@ -63,10 +70,6 @@ where /// There exists no established connection to the peer and there is /// currently no ongoing dialing (i.e. outgoing connection) attempt /// in progress. - /// - /// > **Note**: In this state there may always be a pending incoming - /// > connection attempt from the peer, however, the remote identity - /// > of a peer is only known once a connection is fully established. Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>), /// The peer represents the local node. @@ -82,20 +85,20 @@ where TPeerId: fmt::Debug + Eq + Hash, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match *self { - Peer::Connected(ConnectedPeer { ref peer_id, .. }) => { + match self { + Peer::Connected(p) => { f.debug_struct("Connected") - .field("peer_id", peer_id) + .field("peer", &p) .finish() } - Peer::Dialing(DialingPeer { ref peer_id, .. } ) => { - f.debug_struct("DialingPeer") - .field("peer_id", peer_id) + Peer::Dialing(p) => { + f.debug_struct("Dialing") + .field("peer", &p) .finish() } - Peer::Disconnected(DisconnectedPeer { ref peer_id, .. }) => { + Peer::Disconnected(p) => { f.debug_struct("Disconnected") - .field("peer_id", peer_id) + .field("peer", &p) .finish() } Peer::Local => { @@ -164,12 +167,11 @@ where TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, THandler: IntoConnectionHandler + Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, - ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, + ::OutboundOpenInfo: Send, ::Error: error::Error + Send + 'static, TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, TPeerId: Eq + Hash + Clone + Send + 'static, @@ -208,7 +210,41 @@ where } } - /// Converts the peer into a `ConnectedPeer`, if there an established connection exists. + /// Initiates a new dialing attempt to this peer using the given addresses. + /// + /// The connection ID of the first connection attempt, i.e. to `address`, + /// is returned, together with a [`DialingPeer`] for further use. The + /// `remaining` addresses are tried in order in subsequent connection + /// attempts in the context of the same dialing attempt, if the connection + /// attempt to the first address fails. + pub fn dial(self, address: Multiaddr, remaining: I, handler: THandler) + -> Result< + (ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>), + ConnectionLimit + > + where + I: IntoIterator, + { + let (peer_id, network) = match self { + Peer::Connected(p) => (p.peer_id, p.network), + Peer::Dialing(p) => (p.peer_id, p.network), + Peer::Disconnected(p) => (p.peer_id, p.network), + Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 }) + }; + + let id = network.dial_peer(DialingOpts { + peer: peer_id.clone(), + handler, + address, + remaining: remaining.into_iter().collect(), + })?; + + Ok((id, DialingPeer { network, peer_id })) + } + + /// Converts the peer into a `ConnectedPeer`, if an established connection exists. + /// + /// Succeeds if the there is at least one established connection to the peer. pub fn into_connected(self) -> Option< ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { @@ -221,6 +257,8 @@ where } /// Converts the peer into a `DialingPeer`, if a dialing attempt exists. + /// + /// Succeeds if the there is at least one pending outgoing connection to the peer. pub fn into_dialing(self) -> Option< DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { @@ -245,7 +283,8 @@ where } /// The representation of a peer in a [`Network`] to whom at least -/// one established connection exists. +/// one established connection exists. There may also be additional ongoing +/// dialing attempts to the peer. pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> where TTrans: Transport, @@ -267,57 +306,12 @@ where &self.peer_id } - /// Attempts to establish a new connection to this peer using the given addresses, - /// if there is currently no ongoing dialing attempt. - /// - /// Existing established connections are not affected. - /// - /// > **Note**: If there is an ongoing dialing attempt, a `DialingPeer` - /// > is returned with the given addresses and handler being ignored. - /// > You may want to check [`ConnectedPeer::is_dialing`] first. - pub fn connect(self, address: Multiaddr, remaining: I, handler: THandler) - -> Result, - ConnectionLimit> - where - I: IntoIterator, - THandler: Send + 'static, - THandler::Handler: Send, - ::Error: error::Error + Send, - ::OutboundOpenInfo: Send, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, - TTrans: Transport + Clone, - TTrans::Error: Send + 'static, - TTrans::Dial: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, - TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, - TConnInfo: fmt::Debug + Send + 'static, - TPeerId: Eq + Hash + Clone + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, - - { - if self.network.dialing.contains_key(&self.peer_id) { - let peer = DialingPeer { - network: self.network, - peer_id: self.peer_id - }; - Ok(peer) - } else { - self.network.dial_peer(DialingOpts { - peer: self.peer_id.clone(), - handler, - address, - remaining: remaining.into_iter().collect(), - })?; - Ok(DialingPeer { - network: self.network, - peer_id: self.peer_id, - }) - } + /// Returns the `ConnectedPeer` into a `Peer`. + pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> { + Peer::Connected(self) } - /// Obtains an existing connection to the peer. + /// Obtains an established connection to the peer by ID. pub fn connection<'b>(&'b mut self, id: ConnectionId) -> Option> { @@ -348,7 +342,7 @@ where } } - /// Gets an iterator over all established connections of the peer. + /// Gets an iterator over all established connections to the peer. pub fn connections<'b>(&'b mut self) -> EstablishedConnectionIter<'b, impl Iterator, @@ -386,11 +380,13 @@ impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug f where TTrans: Transport, THandler: IntoConnectionHandler, - TPeerId: fmt::Debug, + TPeerId: Eq + Hash + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("ConnectedPeer") .field("peer_id", &self.peer_id) + .field("established", &self.network.pool.iter_peer_established_info(&self.peer_id)) + .field("attempts", &self.network.dialing.get(&self.peer_id)) .finish() } } @@ -419,8 +415,16 @@ where &self.peer_id } - /// Disconnects from this peer, closing all pending connections. - pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> { + /// Returns the `DialingPeer` into a `Peer`. + pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> { + Peer::Dialing(self) + } + + /// Disconnects from this peer, closing all established connections and + /// aborting all dialing attempts. + pub fn disconnect(self) + -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> + { self.network.disconnect(&self.peer_id); DisconnectedPeer { network: self.network, peer_id: self.peer_id } } @@ -443,20 +447,50 @@ where } } - /// Obtains the connection that is currently being established. - pub fn connection<'b>(&'b mut self) -> DialingConnection<'b, TInEvent, TConnInfo, TPeerId> { - let attempt = match self.network.dialing.entry(self.peer_id.clone()) { - hash_map::Entry::Occupied(e) => e, - _ => unreachable!("By `Peer::new` and the definition of `DialingPeer`.") - }; - - let inner = self.network.pool - .get_outgoing(attempt.get().id) - .expect("By consistency of `network.pool` with `network.dialing`."); - - DialingConnection { - inner, dialing: attempt, peer_id: &self.peer_id + /// Obtains a dialing attempt to the peer by connection ID of + /// the current connection attempt. + pub fn attempt<'b>(&'b mut self, id: ConnectionId) + -> Option> + { + if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id.clone()) { + if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) { + if let Some(inner) = self.network.pool.get_outgoing(id) { + return Some(DialingAttempt { pos, inner, attempts }) + } + } } + None + } + + /// The number of ongoing dialing attempts, i.e. pending outgoing connections + /// to this peer. + pub fn num_attempts(&self) -> usize { + self.network.pool.num_peer_outgoing(&self.peer_id) + } + + /// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer. + pub fn attempts<'b>(&'b mut self) + -> DialingAttemptIter<'b, + TInEvent, + TOutEvent, + THandler, + TTrans::Error, + ::Error, + TConnInfo, + TPeerId> + { + DialingAttemptIter::new(&self.peer_id, &mut self.network.pool, &mut self.network.dialing) + } + + /// Obtains some dialing connection to the peer. + /// + /// At least one dialing connection is guaranteed to exist on a `DialingPeer`. + pub fn some_attempt<'b>(&'b mut self) + -> DialingAttempt<'b, TInEvent, TConnInfo, TPeerId> + { + self.attempts() + .into_first() + .expect("By `Peer::new` and the definition of `DialingPeer`.") } } @@ -465,11 +499,13 @@ impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug f where TTrans: Transport, THandler: IntoConnectionHandler, - TPeerId: fmt::Debug, + TPeerId: Eq + Hash + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("DialingPeer") .field("peer_id", &self.peer_id) + .field("established", &self.network.pool.iter_peer_established_info(&self.peer_id)) + .field("attempts", &self.network.dialing.get(&self.peer_id)) .finish() } } @@ -500,46 +536,19 @@ where } } -impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, TConnInfo, TPeerId> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> where - TTrans: Transport + Clone, - TTrans::Error: Send + 'static, - TTrans::Dial: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, - TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, - THandler: IntoConnectionHandler + Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, - ::OutboundOpenInfo: Send, - ::Error: error::Error + Send, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, + TTrans: Transport, + THandler: IntoConnectionHandler, { pub fn id(&self) -> &TPeerId { &self.peer_id } - /// Attempts to connect to this peer using the given addresses. - pub fn connect(self, first: Multiaddr, rest: TIter, handler: THandler) - -> Result, - ConnectionLimit> - where - TIter: IntoIterator, - TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, - TPeerId: Eq + Hash + Clone + Send + 'static, - { - self.network.dial_peer(DialingOpts { - peer: self.peer_id.clone(), - handler, - address: first, - remaining: rest.into_iter().collect(), - })?; - Ok(DialingPeer { - network: self.network, - peer_id: self.peer_id, - }) - + /// Returns the `DisconnectedPeer` into a `Peer`. + pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> { + Peer::Disconnected(self) } /// Moves the peer into a connected state by supplying an existing @@ -550,8 +559,7 @@ where /// # Panics /// /// Panics if `connected.peer_id()` does not identify the current peer. - /// - pub fn set_connected( + pub fn set_connected( self, connected: Connected, connection: Connection, @@ -559,8 +567,17 @@ where ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>, ConnectionLimit > where + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + THandler: Send + 'static, + TTrans::Error: Send + 'static, + THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, + ::OutboundOpenInfo: Send, + ::Error: error::Error + Send + 'static, TConnInfo: fmt::Debug + ConnectionInfo + Clone + Send + 'static, - TPeerId: Eq + Hash + Clone + fmt::Debug, + TPeerId: Eq + Hash + Clone + Send + fmt::Debug + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer::OutboundSubstream: Send, { if connected.peer_id() != &self.peer_id { panic!("Invalid peer ID given: {:?}. Expected: {:?}", connected.peer_id(), self.peer_id) @@ -574,71 +591,142 @@ where } } -/// Attempt to reach a peer. +/// The (internal) state of a `DialingAttempt`, tracking the +/// current connection attempt as well as remaining addresses. #[derive(Debug, Clone)] -pub(super) struct DialingAttempt { - /// Identifier for the reach attempt. - pub(super) id: ConnectionId, - /// Multiaddr currently being attempted. - pub(super) current: Multiaddr, +pub(super) struct DialingState { + /// The ID and (remote) address of the current connection attempt. + pub(super) current: (ConnectionId, Multiaddr), /// Multiaddresses to attempt if the current one fails. - pub(super) next: Vec, + pub(super) remaining: Vec, } -/// A `DialingConnection` is a [`PendingConnection`] where the local peer -/// has the role of the dialer (i.e. initiator) and the (expected) remote -/// peer ID is known. -pub struct DialingConnection<'a, TInEvent, TConnInfo, TPeerId> { - peer_id: &'a TPeerId, +/// A `DialingAttempt` is an ongoing outgoing connection attempt to +/// a known / expected remote peer ID and a list of alternative addresses +/// to connect to, if the current connection attempt fails. +pub struct DialingAttempt<'a, TInEvent, TConnInfo, TPeerId> { + /// The underlying pending connection in the `Pool`. inner: PendingConnection<'a, TInEvent, TConnInfo, TPeerId>, - dialing: hash_map::OccupiedEntry<'a, TPeerId, DialingAttempt>, + /// All current dialing attempts of the peer. + attempts: hash_map::OccupiedEntry<'a, TPeerId, SmallVec<[DialingState; 10]>>, + /// The position of the current `DialingState` of this connection in the `attempts`. + pos: usize, } impl<'a, TInEvent, TConnInfo, TPeerId> - DialingConnection<'a, TInEvent, TConnInfo, TPeerId> + DialingAttempt<'a, TInEvent, TConnInfo, TPeerId> { - /// Returns the local connection ID. + /// Returns the ID of the current connection attempt. pub fn id(&self) -> ConnectionId { self.inner.id() } - /// Returns the (expected) peer ID of the ongoing connection attempt. + /// Returns the (expected) peer ID of the dialing attempt. pub fn peer_id(&self) -> &TPeerId { - self.peer_id + self.attempts.key() } - /// Returns information about this endpoint of the connection attempt. - pub fn endpoint(&self) -> &ConnectedPoint { - self.inner.endpoint() - } - - /// Aborts the connection attempt. - pub fn abort(self) - where - TPeerId: Eq + Hash + Clone, - { - self.dialing.remove(); - self.inner.abort(); - } - - /// Adds new candidate addresses to the end of the addresses used - /// in the ongoing dialing process. - /// - /// Duplicates are ignored. - pub fn add_addresses(&mut self, addrs: impl IntoIterator) { - for addr in addrs { - self.add_address(addr); + /// Returns the remote address of the current connection attempt. + pub fn address(&self) -> &Multiaddr { + match self.inner.endpoint() { + ConnectedPoint::Dialer { address } => address, + ConnectedPoint::Listener { .. } => unreachable!("by definition of a `DialingAttempt`.") } } - /// Adds an address to the end of the addresses used in the ongoing - /// dialing process. + /// Aborts the dialing attempt. /// - /// Duplicates are ignored. + /// Aborting a dialing attempt involves aborting the current connection + /// attempt and dropping any remaining addresses given to [`Peer::dial()`] + /// that have not yet been tried. + pub fn abort(mut self) { + self.attempts.get_mut().remove(self.pos); + if self.attempts.get().is_empty() { + self.attempts.remove(); + } + self.inner.abort(); + } + + /// Adds an address to the end of the remaining addresses + /// for this dialing attempt. Duplicates are ignored. pub fn add_address(&mut self, addr: Multiaddr) { - if self.dialing.get().next.iter().all(|a| a != &addr) { - self.dialing.get_mut().next.push(addr); + let remaining = &mut self.attempts.get_mut()[self.pos].remaining; + if remaining.iter().all(|a| a != &addr) { + remaining.push(addr); } } } +/// An iterator over the ongoing dialing attempts to a peer. +pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> { + /// The peer whose dialing attempts are being iterated. + peer_id: &'a TPeerId, + /// The underlying connection `Pool` of the `Network`. + pool: &'a mut Pool, + /// The state of all current dialing attempts known to the `Network`. + /// + /// Ownership of the `OccupiedEntry` for `peer_id` containing all attempts must be + /// borrowed to each `DialingAttempt` in order for it to remove the entry if the + /// last dialing attempt is aborted. + dialing: &'a mut FnvHashMap>, + /// The current position of the iterator in `dialing[peer_id]`. + pos: usize, + /// The total number of elements in `dialing[peer_id]` to iterate over. + end: usize, +} + +// Note: Ideally this would be an implementation of `Iterator`, but that +// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and +// a different definition of `Iterator`. +impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> + DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> +where + TConnInfo: ConnectionInfo, + TPeerId: Eq + Hash + Clone, +{ + fn new( + peer_id: &'a TPeerId, + pool: &'a mut Pool, + dialing: &'a mut FnvHashMap>, + ) -> Self { + let end = dialing.get(peer_id).map_or(0, |conns| conns.len()); + Self { pos: 0, end, pool, dialing, peer_id } + } + + /// Obtains the next dialing connection, if any. + pub fn next<'b>(&'b mut self) -> Option> { + if self.pos == self.end { + return None + } + + if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) { + let id = attempts.get()[self.pos].current.0; + if let Some(inner) = self.pool.get_outgoing(id) { + let conn = DialingAttempt { pos: self.pos, inner, attempts }; + self.pos += 1; + return Some(conn) + } + } + + None + } + + /// Returns the first connection, if any, consuming the iterator. + pub fn into_first<'b>(self) + -> Option> + where 'a: 'b + { + if self.pos == self.end { + return None + } + + if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) { + let id = attempts.get()[self.pos].current.0; + if let Some(inner) = self.pool.get_outgoing(id) { + return Some(DialingAttempt { pos: self.pos, inner, attempts }) + } + } + + None + } +} diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index c01cebdd..630eccc0 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -22,47 +22,60 @@ mod util; use futures::prelude::*; use libp2p_core::identity; -use libp2p_core::multiaddr::multiaddr; +use libp2p_core::multiaddr::{multiaddr, Multiaddr}; use libp2p_core::{ Network, PeerId, Transport, connection::PendingConnectionError, muxing::StreamMuxerBox, - network::NetworkEvent, + network::{NetworkEvent, NetworkConfig}, + transport, upgrade, }; +use rand::Rng; use rand::seq::SliceRandom; -use std::{io, task::Poll}; +use std::{io, error::Error, fmt, task::Poll}; use util::TestHandler; -type TestNetwork = Network; +type TestNetwork = Network; +type TestTransport = transport::boxed::Boxed<(PeerId, StreamMuxerBox), BoxError>; + +#[derive(Debug)] +struct BoxError(Box); + +impl Error for BoxError {} + +impl fmt::Display for BoxError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Transport error: {}", self.0) + } +} + +fn new_network(cfg: NetworkConfig) -> TestNetwork { + let local_key = identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let transport: TestTransport = libp2p_tcp::TcpConfig::new() + .upgrade(upgrade::Version::V1) + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()) + .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))) + .and_then(|(peer, mplex), _| { + // Gracefully close the connection to allow protocol + // negotiation to complete. + util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) + }) + .map_err(|e| BoxError(Box::new(e))) + .boxed(); + TestNetwork::new(transport, local_public_key.into(), cfg) +} #[test] fn deny_incoming_connec() { // Checks whether refusing an incoming connection on a swarm triggers the correct events. - let mut swarm1 = { - let local_key = identity::Keypair::generate_ed25519(); - let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() - .upgrade(upgrade::Version::V1) - .authenticate(libp2p_secio::SecioConfig::new(local_key)) - .multiplex(libp2p_mplex::MplexConfig::new()) - .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - TestNetwork::new(transport, local_public_key.into(), Default::default()) - }; - - let mut swarm2 = { - let local_key = identity::Keypair::generate_ed25519(); - let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() - .upgrade(upgrade::Version::V1) - .authenticate(libp2p_secio::SecioConfig::new(local_key)) - .multiplex(libp2p_mplex::MplexConfig::new()) - .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - TestNetwork::new(transport, local_public_key.into(), Default::default()) - }; + let mut swarm1 = new_network(NetworkConfig::default()); + let mut swarm2 = new_network(NetworkConfig::default()); swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); @@ -76,8 +89,7 @@ fn deny_incoming_connec() { swarm2 .peer(swarm1.local_peer_id().clone()) - .into_disconnected().unwrap() - .connect(address.clone(), Vec::new(), TestHandler()) + .dial(address.clone(), Vec::new(), TestHandler()) .unwrap(); async_std::task::block_on(future::poll_fn(|cx| -> Poll> { @@ -119,22 +131,7 @@ fn dial_self() { // // The last two can happen in any order. - let mut swarm = { - let local_key = identity::Keypair::generate_ed25519(); - let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() - .upgrade(upgrade::Version::V1) - .authenticate(libp2p_secio::SecioConfig::new(local_key)) - .multiplex(libp2p_mplex::MplexConfig::new()) - .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. - util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) - }) - .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - TestNetwork::new(transport, local_public_key.into(), Default::default()) - }; - + let mut swarm = new_network(NetworkConfig::default()); swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); let (local_address, mut swarm) = async_std::task::block_on( @@ -193,36 +190,16 @@ fn dial_self() { fn dial_self_by_id() { // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // place. - - let mut swarm = { - let local_key = identity::Keypair::generate_ed25519(); - let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() - .upgrade(upgrade::Version::V1) - .authenticate(libp2p_secio::SecioConfig::new(local_key)) - .multiplex(libp2p_mplex::MplexConfig::new()) - .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - TestNetwork::new(transport, local_public_key.into(), Default::default()) - }; - + let mut swarm = new_network(NetworkConfig::default()); let peer_id = swarm.local_peer_id().clone(); assert!(swarm.peer(peer_id).into_disconnected().is_none()); } #[test] fn multiple_addresses_err() { - // Tries dialing multiple addresses, and makes sure there's one dialing error per addresses. + // Tries dialing multiple addresses, and makes sure there's one dialing error per address. - let mut swarm = { - let local_key = identity::Keypair::generate_ed25519(); - let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() - .upgrade(upgrade::Version::V1) - .authenticate(libp2p_secio::SecioConfig::new(local_key)) - .multiplex(libp2p_mplex::MplexConfig::new()) - .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - TestNetwork::new(transport, local_public_key.into(), Default::default()) - }; + let mut swarm = new_network(NetworkConfig::default()); let mut addresses = Vec::new(); for _ in 0 .. 3 { @@ -238,8 +215,7 @@ fn multiple_addresses_err() { let target = PeerId::random(); swarm.peer(target.clone()) - .into_disconnected().unwrap() - .connect(first, rest, TestHandler()) + .dial(first, rest, TestHandler()) .unwrap(); async_std::task::block_on(future::poll_fn(|cx| -> Poll> { @@ -267,3 +243,44 @@ fn multiple_addresses_err() { } })).unwrap(); } + +#[test] +fn connection_limit() { + let outgoing_per_peer_limit = rand::thread_rng().gen_range(1, 10); + let outgoing_limit = 2 * outgoing_per_peer_limit; + + let mut cfg = NetworkConfig::default(); + cfg.set_outgoing_per_peer_limit(outgoing_per_peer_limit); + cfg.set_outgoing_limit(outgoing_limit); + let mut network = new_network(cfg); + + let target = PeerId::random(); + for _ in 0 .. outgoing_per_peer_limit { + network.peer(target.clone()) + .dial(Multiaddr::empty(), Vec::new(), TestHandler()) + .ok() + .expect("Unexpected connection limit."); + } + + let err = network.peer(target) + .dial(Multiaddr::empty(), Vec::new(), TestHandler()) + .expect_err("Unexpected dialing success."); + + assert_eq!(err.current, outgoing_per_peer_limit); + assert_eq!(err.limit, outgoing_per_peer_limit); + + let target2 = PeerId::random(); + for _ in outgoing_per_peer_limit .. outgoing_limit { + network.peer(target2.clone()) + .dial(Multiaddr::empty(), Vec::new(), TestHandler()) + .ok() + .expect("Unexpected connection limit."); + } + + let err = network.peer(target2) + .dial(Multiaddr::empty(), Vec::new(), TestHandler()) + .expect_err("Unexpected dialing success."); + + assert_eq!(err.current, outgoing_limit); + assert_eq!(err.limit, outgoing_limit); +} diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index fdddc6a0..16e804f9 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -291,12 +291,10 @@ pub enum DialPeerCondition { /// If there is an ongoing dialing attempt, the addresses reported by /// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing /// dialing attempt, ignoring duplicates. - /// - /// This condition implies [`DialPeerCondition::Disconnected`]. NotDialing, - // TODO: Once multiple dialing attempts per peer are permitted. - // See https://github.com/libp2p/rust-libp2p/pull/1506. - // Always, + /// A new dialing attempt is always initiated, only subject to the + /// configured connection limits. + Always, } impl Default for DialPeerCondition { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 1c62d8d9..d1d3ea6a 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -115,7 +115,6 @@ use libp2p_core::{ NetworkInfo, NetworkEvent, NetworkConfig, - Peer, peer::ConnectedPeer, }, upgrade::ProtocolName, @@ -379,70 +378,31 @@ where TBehaviour: NetworkBehaviour, /// /// If a new dialing attempt has been initiated, `Ok(true)` is returned. /// - /// If there is an ongoing dialing attempt, the current addresses of the - /// peer, as reported by [`NetworkBehaviour::addresses_of_peer`] are added - /// to the ongoing dialing attempt, ignoring duplicates. In this case no - /// new dialing attempt is initiated. - /// /// If no new dialing attempt has been initiated, meaning there is an ongoing /// dialing attempt or `addresses_of_peer` reports no addresses, `Ok(false)` /// is returned. - pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result { + pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> { let mut addrs = me.behaviour.addresses_of_peer(peer_id).into_iter(); - match me.network.peer(peer_id.clone()) { - Peer::Disconnected(peer) => { - if let Some(first) = addrs.next() { - let handler = me.behaviour.new_handler().into_node_handler_builder(); - match peer.connect(first, addrs, handler) { - Ok(_) => return Ok(true), - Err(error) => { - log::debug!( - "New dialing attempt to disconnected peer {:?} failed: {:?}.", - peer_id, error); - me.behaviour.inject_dial_failure(&peer_id); - return Err(error) - } - } - } else { - log::debug!( - "New dialing attempt to disconnected peer {:?} failed: no address.", - peer_id - ); - me.behaviour.inject_dial_failure(&peer_id); - } - Ok(false) - }, - Peer::Connected(peer) => { - if let Some(first) = addrs.next() { - let handler = me.behaviour.new_handler().into_node_handler_builder(); - match peer.connect(first, addrs, handler) { - Ok(_) => return Ok(true), - Err(error) => { - log::debug!( - "New dialing attempt to connected peer {:?} failed: {:?}.", - peer_id, error); - me.behaviour.inject_dial_failure(&peer_id); - return Err(error) - } - } - } else { - log::debug!( - "New dialing attempt to disconnected peer {:?} failed: no address.", - peer_id - ); - me.behaviour.inject_dial_failure(&peer_id); - } - Ok(false) - } - Peer::Dialing(mut peer) => { - peer.connection().add_addresses(addrs); - Ok(false) - }, - Peer::Local => { - me.behaviour.inject_dial_failure(&peer_id); - Err(ConnectionLimit { current: 0, limit: 0 }) - } + let peer = me.network.peer(peer_id.clone()); + + let result = + if let Some(first) = addrs.next() { + let handler = me.behaviour.new_handler().into_node_handler_builder(); + peer.dial(first, addrs, handler) + .map(|_| ()) + .map_err(DialError::ConnectionLimit) + } else { + Err(DialError::NoAddresses) + }; + + if let Err(error) = &result { + log::debug!( + "New dialing attempt to peer {:?} failed: {:?}.", + peer_id, error); + me.behaviour.inject_dial_failure(&peer_id); } + + result } /// Returns an iterator that produces the list of addresses we're listening on. @@ -721,18 +681,22 @@ where TBehaviour: NetworkBehaviour, if !this.network.is_dialing(&peer_id) => true, _ => false }; - if condition_matched { - if let Ok(true) = ExpandedSwarm::dial(this, &peer_id) { - return Poll::Ready(SwarmEvent::Dialing(peer_id)); + if ExpandedSwarm::dial(this, &peer_id).is_ok() { + return Poll::Ready(SwarmEvent::Dialing(peer_id)) } - } else { + // Even if the condition for a _new_ dialing attempt is not met, + // we always add any potentially new addresses of the peer to an + // ongoing dialing attempt, if there is one. log::trace!("Condition for new dialing attempt to {:?} not met: {:?}", peer_id, condition); if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() { let addrs = this.behaviour.addresses_of_peer(peer.id()); - peer.connection().add_addresses(addrs); + let mut attempt = peer.some_attempt(); + for addr in addrs { + attempt.add_address(addr); + } } } } @@ -1104,6 +1068,35 @@ where TBehaviour: NetworkBehaviour, } } +/// The possible failures of [`ExpandedSwarm::dial`]. +#[derive(Debug)] +pub enum DialError { + /// The configured limit for simultaneous outgoing connections + /// has been reached. + ConnectionLimit(ConnectionLimit), + /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses + /// for the peer to dial. + NoAddresses +} + +impl fmt::Display for DialError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err), + DialError::NoAddresses => write!(f, "Dial error: no addresses for peer.") + } + } +} + +impl error::Error for DialError { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + DialError::ConnectionLimit(err) => Some(err), + DialError::NoAddresses => None + } + } +} + /// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything. #[derive(Clone, Default)] pub struct DummyBehaviour {