{core,swarm}/: Allow configuring dial concurrency factor per dial (#2404)

Enable a `NetworkBehaviour` or a user via `Swarm::dial` to override the
dial concurrency factor per dial. This is especially relevant in the
case of libp2p-autonat where one wants to probe addresses in sequence to
reduce the amount of work a remote peer can force onto the local node.

To enable the above, this commit also:

- Introduces `libp2p_core::DialOpts` mirroring `libp2p_swarm::DialOpts`.
  Passed as an argument to `Network::dial`.
- Removes `Peer::dial` in favor of `Network::dial`.
- Simplifies `Swarm::dial_with_handler`.

The introduction of `libp2p_core::DialOpts` will be useful beyond this
feature, e.g. for https://github.com/libp2p/rust-libp2p/pull/2363.

In the long run I would like to move and merge `libp2p_core::Network`
and `libp2p_core::Pool` into `libp2p_swarm::Swarm` thus deduplicating
`libp2p_core::DialOpts` and `libp2p_swarm::DialOpts`.

Fixes #2385.
This commit is contained in:
Max Inden
2022-01-13 18:07:07 +01:00
committed by GitHub
parent 446fcaad3d
commit 74f31f1266
12 changed files with 267 additions and 155 deletions

View File

@ -11,12 +11,22 @@
- Add `ConnectedPoint::is_relayed` (see [PR 2392]). - Add `ConnectedPoint::is_relayed` (see [PR 2392]).
- Enable overriding _dial concurrency factor_ per dial via
`DialOpts::override_dial_concurrency_factor`.
- Introduces `libp2p_core::DialOpts` mirroring `libp2p_swarm::DialOpts`.
Passed as an argument to `Network::dial`.
- Removes `Peer::dial` in favor of `Network::dial`.
See [PR 2404].
- Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408]) - Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408])
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350
[PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352 [PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352
[PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392 [PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392
[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408 [PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408
# 0.30.1 [2021-11-16] # 0.30.1 [2021-11-16]

View File

@ -535,6 +535,7 @@ where
addresses: impl Iterator<Item = Multiaddr> + Send + 'static, addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>, peer: Option<PeerId>,
handler: THandler, handler: THandler,
dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, DialError<THandler>> ) -> Result<ConnectionId, DialError<THandler>>
where where
TTrans: Clone + Send, TTrans: Clone + Send,
@ -544,7 +545,12 @@ where
return Err(DialError::ConnectionLimit { limit, handler }); return Err(DialError::ConnectionLimit { limit, handler });
}; };
let dial = ConcurrentDial::new(transport, peer, addresses, self.dial_concurrency_factor); let dial = ConcurrentDial::new(
transport,
peer,
addresses,
dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
);
let connection_id = self.next_connection_id(); let connection_id = self.next_connection_id();

View File

@ -72,7 +72,7 @@ pub use identity::PublicKey;
pub use multiaddr::Multiaddr; pub use multiaddr::Multiaddr;
pub use multihash; pub use multihash;
pub use muxing::StreamMuxer; pub use muxing::StreamMuxer;
pub use network::Network; pub use network::{DialOpts, Network};
pub use peer_id::PeerId; pub use peer_id::PeerId;
pub use peer_record::PeerRecord; pub use peer_record::PeerRecord;
pub use signed_envelope::SignedEnvelope; pub use signed_envelope::SignedEnvelope;

View File

@ -36,6 +36,7 @@ use crate::{
transport::{Transport, TransportError}, transport::{Transport, TransportError},
Executor, Multiaddr, PeerId, Executor, Multiaddr, PeerId,
}; };
use either::Either;
use std::{ use std::{
convert::TryFrom as _, convert::TryFrom as _,
error, fmt, error, fmt,
@ -43,6 +44,7 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use thiserror::Error;
/// Implementation of `Stream` that handles the nodes. /// Implementation of `Stream` that handles the nodes.
pub struct Network<TTrans, THandler> pub struct Network<TTrans, THandler>
@ -185,16 +187,15 @@ where
&self.local_peer_id &self.local_peer_id
} }
/// Dials a [`Multiaddr`] that may or may not encapsulate a /// Dial a known or unknown peer.
/// specific expected remote peer ID.
/// ///
/// The given `handler` will be used to create the /// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the /// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned. /// connection ID is returned.
pub fn dial( pub fn dial(
&mut self, &mut self,
address: &Multiaddr,
handler: THandler, handler: THandler,
opts: impl Into<DialOpts>,
) -> Result<ConnectionId, DialError<THandler>> ) -> Result<ConnectionId, DialError<THandler>>
where where
TTrans: Transport + Send, TTrans: Transport + Send,
@ -203,50 +204,54 @@ where
TTrans::Error: Send + 'static, TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static, TTrans::Dial: Send + 'static,
{ {
// If the address ultimately encapsulates an expected peer ID, dial that peer let opts = opts.into();
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p` let (peer_id, addresses, dial_concurrency_factor_override) = match opts.0 {
// is a protocol component that can influence any transport, like `libp2p-dns`. // Dial a known peer.
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() { Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses {
if let Ok(peer) = PeerId::try_from(ma) { peer_id,
return self.dial_peer(DialingOpts { addresses,
peer, dial_concurrency_factor_override,
addresses: std::iter::once(address.clone()), }) => (
handler, Some(peer_id),
}); Either::Left(addresses.into_iter()),
dial_concurrency_factor_override,
),
// Dial an unknown peer.
Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { address }) => {
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
let peer_id = match address
.iter()
.last()
.and_then(|p| {
if let multiaddr::Protocol::P2p(ma) = p {
Some(PeerId::try_from(ma))
} else {
None
}
})
.transpose()
{
Ok(peer_id) => peer_id,
Err(_) => return Err(DialError::InvalidPeerId { handler }),
};
(peer_id, Either::Right(std::iter::once(address)), None)
} }
} };
self.pool.add_outgoing( self.pool.add_outgoing(
self.transport().clone(), self.transport().clone(),
std::iter::once(address.clone()), addresses,
None, peer_id,
handler, handler,
dial_concurrency_factor_override,
) )
} }
/// Initiates a connection attempt to a known peer.
fn dial_peer<I>(
&mut self,
opts: DialingOpts<THandler, I>,
) -> Result<ConnectionId, DialError<THandler>>
where
I: Iterator<Item = Multiaddr> + Send + 'static,
TTrans: Transport + Send,
TTrans::Output: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
{
let id = self.pool.add_outgoing(
self.transport().clone(),
opts.addresses,
Some(opts.peer),
opts.handler,
)?;
Ok(id)
}
/// Returns information about the state of the `Network`. /// Returns information about the state of the `Network`.
pub fn info(&self) -> NetworkInfo { pub fn info(&self) -> NetworkInfo {
let num_peers = self.pool.num_peers(); let num_peers = self.pool.num_peers();
@ -463,14 +468,6 @@ where
} }
} }
/// Options for a dialing attempt (i.e. repeated connection attempt
/// via a list of address) to a peer.
struct DialingOpts<THandler, I> {
peer: PeerId,
handler: THandler,
addresses: I,
}
/// Information about the network obtained by [`Network::info()`]. /// Information about the network obtained by [`Network::info()`].
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct NetworkInfo { pub struct NetworkInfo {
@ -560,7 +557,7 @@ impl NetworkConfig {
} }
/// Possible (synchronous) errors when dialing a peer. /// Possible (synchronous) errors when dialing a peer.
#[derive(Clone)] #[derive(Debug, Clone, Error)]
pub enum DialError<THandler> { pub enum DialError<THandler> {
/// The dialing attempt is rejected because of a connection limit. /// The dialing attempt is rejected because of a connection limit.
ConnectionLimit { ConnectionLimit {
@ -568,23 +565,114 @@ pub enum DialError<THandler> {
handler: THandler, handler: THandler,
}, },
/// The dialing attempt is rejected because the peer being dialed is the local peer. /// The dialing attempt is rejected because the peer being dialed is the local peer.
LocalPeerId { handler: THandler }, LocalPeerId {
handler: THandler,
},
InvalidPeerId {
handler: THandler,
},
} }
impl<THandler> fmt::Debug for DialError<THandler> { /// Options to configure a dial to a known or unknown peer.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { ///
match self { /// Used in [`Network::dial`].
DialError::ConnectionLimit { limit, handler: _ } => f ///
.debug_struct("DialError::ConnectionLimit") /// To construct use either of:
.field("limit", limit) ///
.finish(), /// - [`DialOpts::peer_id`] dialing a known peer
DialError::LocalPeerId { handler: _ } => { ///
f.debug_struct("DialError::LocalPeerId").finish() /// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
} #[derive(Debug, Clone, PartialEq)]
pub struct DialOpts(pub(super) Opts);
impl DialOpts {
/// Dial a known peer.
pub fn peer_id(peer_id: PeerId) -> WithPeerId {
WithPeerId { peer_id }
}
/// Dial an unknown peer.
pub fn unknown_peer_id() -> WithoutPeerId {
WithoutPeerId {}
}
}
impl From<Multiaddr> for DialOpts {
fn from(address: Multiaddr) -> Self {
DialOpts::unknown_peer_id().address(address).build()
}
}
/// Internal options type.
///
/// Not to be constructed manually. Use either of the below instead:
///
/// - [`DialOpts::peer_id`] dialing a known peer
/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
#[derive(Debug, Clone, PartialEq)]
pub(super) enum Opts {
WithPeerIdWithAddresses(WithPeerIdWithAddresses),
WithoutPeerIdWithAddress(WithoutPeerIdWithAddress),
}
#[derive(Debug, Clone, PartialEq)]
pub struct WithPeerId {
pub(crate) peer_id: PeerId,
}
impl WithPeerId {
/// Specify a set of addresses to be used to dial the known peer.
pub fn addresses(self, addresses: Vec<Multiaddr>) -> WithPeerIdWithAddresses {
WithPeerIdWithAddresses {
peer_id: self.peer_id,
addresses,
dial_concurrency_factor_override: Default::default(),
} }
} }
} }
#[derive(Debug, Clone, PartialEq)]
pub struct WithPeerIdWithAddresses {
pub(crate) peer_id: PeerId,
pub(crate) addresses: Vec<Multiaddr>,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
}
impl WithPeerIdWithAddresses {
/// Override [`NetworkConfig::with_dial_concurrency_factor`].
pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.dial_concurrency_factor_override = Some(factor);
self
}
/// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts {
DialOpts(Opts::WithPeerIdWithAddresses(self))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct WithoutPeerId {}
impl WithoutPeerId {
/// Specify a single address to dial the unknown peer.
pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress {
WithoutPeerIdWithAddress { address }
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct WithoutPeerIdWithAddress {
pub(crate) address: Multiaddr,
}
impl WithoutPeerIdWithAddress {
/// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts {
DialOpts(Opts::WithoutPeerIdWithAddress(self))
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -18,13 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use super::{DialError, DialingOpts, Network}; use super::Network;
use crate::{ use crate::{
connection::{ connection::{
handler::THandlerInEvent, pool::Pool, ConnectionHandler, ConnectionId, handler::THandlerInEvent, pool::Pool, ConnectionHandler, ConnectionId,
EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, PendingConnection, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, PendingConnection,
}, },
Multiaddr, PeerId, Transport, PeerId, Transport,
}; };
use std::{collections::VecDeque, error, fmt}; use std::{collections::VecDeque, error, fmt};
@ -95,7 +95,10 @@ where
} }
fn disconnected(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self { fn disconnected(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self {
Peer::Disconnected(DisconnectedPeer { network, peer_id }) Peer::Disconnected(DisconnectedPeer {
_network: network,
peer_id,
})
} }
fn connected(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self { fn connected(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self {
@ -149,38 +152,6 @@ where
matches!(self, Peer::Disconnected(..)) matches!(self, Peer::Disconnected(..))
} }
/// Initiates a new dialing attempt to this peer using the given addresses.
///
/// The connection ID of the first connection attempt, i.e. to `address`,
/// is returned, together with a [`DialingPeer`] for further use. The
/// `remaining` addresses are tried in order in subsequent connection
/// attempts in the context of the same dialing attempt, if the connection
/// attempt to the first address fails.
pub fn dial<I>(
self,
addresses: I,
handler: THandler,
) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError<THandler>>
where
I: IntoIterator<Item = Multiaddr>,
I::IntoIter: Send + 'static,
{
let (peer_id, network) = match self {
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(DialError::LocalPeerId { handler }),
};
let id = network.dial_peer(DialingOpts {
peer: peer_id,
handler,
addresses: addresses.into_iter(),
})?;
Ok((id, DialingPeer { network, peer_id }))
}
/// Converts the peer into a `ConnectedPeer`, if an established connection exists. /// Converts the peer into a `ConnectedPeer`, if an established connection exists.
/// ///
/// Succeeds if the there is at least one established connection to the peer. /// Succeeds if the there is at least one established connection to the peer.
@ -294,7 +265,7 @@ where
pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> { pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> {
self.network.disconnect(&self.peer_id); self.network.disconnect(&self.peer_id);
DisconnectedPeer { DisconnectedPeer {
network: self.network, _network: self.network,
peer_id: self.peer_id, peer_id: self.peer_id,
} }
} }
@ -354,7 +325,7 @@ where
pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> { pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> {
self.network.disconnect(&self.peer_id); self.network.disconnect(&self.peer_id);
DisconnectedPeer { DisconnectedPeer {
network: self.network, _network: self.network,
peer_id: self.peer_id, peer_id: self.peer_id,
} }
} }
@ -432,7 +403,7 @@ where
THandler: IntoConnectionHandler, THandler: IntoConnectionHandler,
{ {
peer_id: PeerId, peer_id: PeerId,
network: &'a mut Network<TTrans, THandler>, _network: &'a mut Network<TTrans, THandler>,
} }
impl<'a, TTrans, THandler> fmt::Debug for DisconnectedPeer<'a, TTrans, THandler> impl<'a, TTrans, THandler> fmt::Debug for DisconnectedPeer<'a, TTrans, THandler>

View File

@ -26,7 +26,7 @@ use futures::ready;
use libp2p_core::{ use libp2p_core::{
multiaddr::Protocol, multiaddr::Protocol,
network::{NetworkConfig, NetworkEvent}, network::{NetworkConfig, NetworkEvent},
ConnectedPoint, ConnectedPoint, DialOpts,
}; };
use quickcheck::*; use quickcheck::*;
use rand07::Rng; use rand07::Rng;
@ -73,8 +73,12 @@ fn concurrent_dialing() {
// Have network 2 dial network 1 and wait for network 1 to receive the incoming // Have network 2 dial network 1 and wait for network 1 to receive the incoming
// connections. // connections.
network_2 network_2
.peer(*network_1.local_peer_id()) .dial(
.dial(network_1_listen_addresses.clone(), TestHandler()) TestHandler(),
DialOpts::peer_id(*network_1.local_peer_id())
.addresses(network_1_listen_addresses.clone().into())
.build(),
)
.unwrap(); .unwrap();
let mut network_1_incoming_connections = Vec::new(); let mut network_1_incoming_connections = Vec::new();
for i in 0..concurrency_factor.0.get() { for i in 0..concurrency_factor.0.get() {

View File

@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{ use libp2p_core::{
connection::PendingConnectionError, connection::PendingConnectionError,
network::{ConnectionLimits, DialError, NetworkConfig, NetworkEvent}, network::{ConnectionLimits, DialError, NetworkConfig, NetworkEvent},
PeerId, DialOpts, PeerId,
}; };
use quickcheck::*; use quickcheck::*;
use std::task::Poll; use std::task::Poll;
@ -46,15 +46,23 @@ fn max_outgoing() {
let target = PeerId::random(); let target = PeerId::random();
for _ in 0..outgoing_limit { for _ in 0..outgoing_limit {
network network
.peer(target.clone()) .dial(
.dial(vec![addr.clone()], TestHandler()) TestHandler(),
DialOpts::peer_id(target)
.addresses(vec![addr.clone()])
.build(),
)
.ok() .ok()
.expect("Unexpected connection limit."); .expect("Unexpected connection limit.");
} }
match network match network
.peer(target.clone()) .dial(
.dial(vec![addr.clone()], TestHandler()) TestHandler(),
DialOpts::peer_id(target)
.addresses(vec![addr.clone()])
.build(),
)
.expect_err("Unexpected dialing success.") .expect_err("Unexpected dialing success.")
{ {
DialError::ConnectionLimit { limit, handler: _ } => { DialError::ConnectionLimit { limit, handler: _ } => {
@ -122,7 +130,7 @@ fn max_established_incoming() {
// Spawn and block on the dialer. // Spawn and block on the dialer.
async_std::task::block_on({ async_std::task::block_on({
let mut n = 0; let mut n = 0;
let _ = network2.dial(&listen_addr, TestHandler()).unwrap(); let _ = network2.dial(TestHandler(), listen_addr.clone()).unwrap();
let mut expected_closed = false; let mut expected_closed = false;
let mut network_1_established = false; let mut network_1_established = false;
@ -188,7 +196,7 @@ fn max_established_incoming() {
if n <= limit { if n <= limit {
// Dial again until the limit is exceeded. // Dial again until the limit is exceeded.
n += 1; n += 1;
network2.dial(&listen_addr, TestHandler()).unwrap(); network2.dial(TestHandler(), listen_addr.clone()).unwrap();
if n == limit { if n == limit {
// The the next dialing attempt exceeds the limit, this // The the next dialing attempt exceeds the limit, this

View File

@ -22,6 +22,7 @@ mod util;
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::multiaddr::multiaddr; use libp2p_core::multiaddr::multiaddr;
use libp2p_core::DialOpts;
use libp2p_core::{ use libp2p_core::{
connection::PendingConnectionError, connection::PendingConnectionError,
multiaddr::Protocol, multiaddr::Protocol,
@ -50,8 +51,12 @@ fn deny_incoming_connec() {
})); }));
swarm2 swarm2
.peer(swarm1.local_peer_id().clone()) .dial(
.dial(vec![address.clone()], TestHandler()) TestHandler(),
DialOpts::peer_id(*swarm1.local_peer_id())
.addresses(vec![address.clone()])
.build(),
)
.unwrap(); .unwrap();
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> { async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
@ -106,7 +111,7 @@ fn dial_self() {
_ => panic!("Was expecting the listen address to be reported"), _ => panic!("Was expecting the listen address to be reported"),
})); }));
swarm.dial(&local_address, TestHandler()).unwrap(); swarm.dial(TestHandler(), local_address.clone()).unwrap();
let mut got_dial_err = false; let mut got_dial_err = false;
let mut got_inc_err = false; let mut got_inc_err = false;
@ -174,8 +179,12 @@ fn multiple_addresses_err() {
addresses.shuffle(&mut rand::thread_rng()); addresses.shuffle(&mut rand::thread_rng());
swarm swarm
.peer(target.clone()) .dial(
.dial(addresses.clone(), TestHandler()) TestHandler(),
DialOpts::peer_id(target.clone())
.addresses(addresses.clone())
.build(),
)
.unwrap(); .unwrap();
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> { async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {

View File

@ -75,7 +75,7 @@ fn transport_upgrade() {
let client = async move { let client = async move {
let addr = addr_receiver.await.unwrap(); let addr = addr_receiver.await.unwrap();
dialer.dial(&addr, TestHandler()).unwrap(); dialer.dial(TestHandler(), addr).unwrap();
futures::future::poll_fn(move |cx| loop { futures::future::poll_fn(move |cx| loop {
match ready!(dialer.poll(cx)) { match ready!(dialer.poll(cx)) {
NetworkEvent::ConnectionEstablished { .. } => return Poll::Ready(()), NetworkEvent::ConnectionEstablished { .. } => return Poll::Ready(()),

View File

@ -14,12 +14,16 @@
- Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]). - Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]).
- Enable overriding _dial concurrency factor_ per dial via
`DialOpts::override_dial_concurrency_factor`. See [PR 2404].
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350
[PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 [PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362
[PR 2370]: https://github.com/libp2p/rust-libp2p/pull/2370 [PR 2370]: https://github.com/libp2p/rust-libp2p/pull/2370
[PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375 [PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375
[PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378 [PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378
[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404
# 0.32.0 [2021-11-16] # 0.32.0 [2021-11-16]

View File

@ -20,6 +20,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use libp2p_core::{Multiaddr, PeerId}; use libp2p_core::{Multiaddr, PeerId};
use std::num::NonZeroU8;
/// Options to configure a dial to a known or unknown peer. /// Options to configure a dial to a known or unknown peer.
/// ///
@ -50,6 +51,7 @@ impl DialOpts {
WithPeerId { WithPeerId {
peer_id, peer_id,
condition: Default::default(), condition: Default::default(),
dial_concurrency_factor_override: Default::default(),
} }
} }
@ -106,6 +108,7 @@ pub(super) enum Opts {
pub struct WithPeerId { pub struct WithPeerId {
pub(crate) peer_id: PeerId, pub(crate) peer_id: PeerId,
pub(crate) condition: PeerCondition, pub(crate) condition: PeerCondition,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
} }
impl WithPeerId { impl WithPeerId {
@ -115,6 +118,13 @@ impl WithPeerId {
self self
} }
/// Override
/// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor).
pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.dial_concurrency_factor_override = Some(factor);
self
}
/// Specify a set of addresses to be used to dial the known peer. /// Specify a set of addresses to be used to dial the known peer.
pub fn addresses(self, addresses: Vec<Multiaddr>) -> WithPeerIdWithAddresses { pub fn addresses(self, addresses: Vec<Multiaddr>) -> WithPeerIdWithAddresses {
WithPeerIdWithAddresses { WithPeerIdWithAddresses {
@ -122,6 +132,7 @@ impl WithPeerId {
condition: self.condition, condition: self.condition,
addresses, addresses,
extend_addresses_through_behaviour: false, extend_addresses_through_behaviour: false,
dial_concurrency_factor_override: self.dial_concurrency_factor_override,
} }
} }
@ -140,6 +151,7 @@ pub struct WithPeerIdWithAddresses {
pub(crate) condition: PeerCondition, pub(crate) condition: PeerCondition,
pub(crate) addresses: Vec<Multiaddr>, pub(crate) addresses: Vec<Multiaddr>,
pub(crate) extend_addresses_through_behaviour: bool, pub(crate) extend_addresses_through_behaviour: bool,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
} }
impl WithPeerIdWithAddresses { impl WithPeerIdWithAddresses {
@ -156,6 +168,13 @@ impl WithPeerIdWithAddresses {
self self
} }
/// Override
/// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor).
pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.dial_concurrency_factor_override = Some(factor);
self
}
/// Build the final [`DialOpts`]. /// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts { pub fn build(self) -> DialOpts {
DialOpts(Opts::WithPeerIdWithAddresses(self)) DialOpts(Opts::WithPeerIdWithAddresses(self))

View File

@ -359,15 +359,20 @@ where
fn dial_with_handler( fn dial_with_handler(
&mut self, &mut self,
opts: DialOpts, swarm_dial_opts: DialOpts,
handler: <TBehaviour as NetworkBehaviour>::ProtocolsHandler, handler: <TBehaviour as NetworkBehaviour>::ProtocolsHandler,
) -> Result<(), DialError> { ) -> Result<(), DialError> {
match opts.0 { let core_dial_opts = match swarm_dial_opts.0 {
// Dial a known peer. // Dial a known peer.
dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { peer_id, condition }) dial_opts::Opts::WithPeerId(dial_opts::WithPeerId {
peer_id,
condition,
dial_concurrency_factor_override,
})
| dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses { | dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses {
peer_id, peer_id,
condition, condition,
dial_concurrency_factor_override,
.. ..
}) => { }) => {
// Check [`PeerCondition`] if provided. // Check [`PeerCondition`] if provided.
@ -396,7 +401,7 @@ where
// Retrieve the addresses to dial. // Retrieve the addresses to dial.
let addresses = { let addresses = {
let mut addresses = match opts.0 { let mut addresses = match swarm_dial_opts.0 {
dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { .. }) => { dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { .. }) => {
self.behaviour.addresses_of_peer(&peer_id) self.behaviour.addresses_of_peer(&peer_id)
} }
@ -433,47 +438,33 @@ where
addresses addresses
}; };
let handler = handler let mut opts = libp2p_core::DialOpts::peer_id(peer_id).addresses(addresses);
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(
self.substream_upgrade_protocol_override,
);
match self.network.peer(peer_id).dial(addresses, handler) { if let Some(f) = dial_concurrency_factor_override {
Ok(_connection_id) => Ok(()), opts = opts.override_dial_concurrency_factor(f);
Err(error) => {
let (error, handler) = DialError::from_network_dial_error(error);
self.behaviour.inject_dial_failure(
Some(peer_id),
handler.into_protocols_handler(),
&error,
);
return Err(error);
}
} }
opts.build()
} }
// Dial an unknown peer. // Dial an unknown peer.
dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress { dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress {
address, address,
}) => { }) => libp2p_core::DialOpts::unknown_peer_id()
let handler = handler .address(address)
.into_node_handler_builder() .build(),
.with_substream_upgrade_protocol_override( };
self.substream_upgrade_protocol_override,
);
match self.network.dial(&address, handler).map(|_id| ()) { let handler = handler
Ok(_connection_id) => Ok(()), .into_node_handler_builder()
Err(error) => { .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override);
let (error, handler) = DialError::from_network_dial_error(error);
self.behaviour.inject_dial_failure( match self.network.dial(handler, core_dial_opts).map(|_id| ()) {
None, Ok(_connection_id) => Ok(()),
handler.into_protocols_handler(), Err(error) => {
&error, let (error, handler) = DialError::from_network_dial_error(error);
); self.behaviour
return Err(error); .inject_dial_failure(None, handler.into_protocols_handler(), &error);
} return Err(error);
}
} }
} }
} }
@ -1334,8 +1325,9 @@ pub enum DialError {
DialPeerConditionFalse(dial_opts::PeerCondition), DialPeerConditionFalse(dial_opts::PeerCondition),
/// Pending connection attempt has been aborted. /// Pending connection attempt has been aborted.
Aborted, Aborted,
/// The peer identity obtained on the connection did not /// The provided peer identity is invalid or the peer identity obtained on
/// match the one that was expected or is otherwise invalid. /// the connection did not match the one that was expected or is otherwise
/// invalid.
InvalidPeerId, InvalidPeerId,
/// An I/O error occurred on the connection. /// An I/O error occurred on the connection.
ConnectionIo(io::Error), ConnectionIo(io::Error),
@ -1350,6 +1342,7 @@ impl DialError {
(DialError::ConnectionLimit(limit), handler) (DialError::ConnectionLimit(limit), handler)
} }
network::DialError::LocalPeerId { handler } => (DialError::LocalPeerId, handler), network::DialError::LocalPeerId { handler } => (DialError::LocalPeerId, handler),
network::DialError::InvalidPeerId { handler } => (DialError::InvalidPeerId, handler),
} }
} }
} }