mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-29 01:31:33 +00:00
swarm/: Enable advanced dialing requests (#2317)
Enable advanced dialing requests both on `Swarm` and via `NetworkBehaviourAction`. Users can now trigger a dial with a specific set of addresses, optionally extended via `NetworkBehaviour::addresses_of_peer`. In addition the whole process is now modelled in a type safe way via the builder pattern. Example of a `NetworkBehaviour` requesting a dial to a specific peer with a set of addresses additionally extended through `NetworkBehaviour::addresses_of_peer`: ```rust NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id) .condition(PeerCondition::Always) .addresses(addresses) .extend_addresses_through_behaviour() .build(), handler, } ``` Example of a user requesting a dial to an unknown peer with a single address via `Swarm`: ```rust swarm1.dial( DialOpts::unknown_peer_id() .address(addr2.clone()) .build() ) ```
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::dial_opts::DialOpts;
|
||||
use crate::protocols_handler::{IntoProtocolsHandler, ProtocolsHandler};
|
||||
use crate::{AddressRecord, AddressScore, DialError};
|
||||
use libp2p_core::{
|
||||
@ -265,31 +266,7 @@ pub enum NetworkBehaviourAction<
|
||||
/// Instructs the `Swarm` to return an event when it is being polled.
|
||||
GenerateEvent(TOutEvent),
|
||||
|
||||
/// Instructs the swarm to dial the given multiaddress optionally including a [`PeerId`].
|
||||
///
|
||||
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
|
||||
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
|
||||
///
|
||||
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
|
||||
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
|
||||
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
|
||||
/// can be included in the handler, and thus directly send on connection success or extracted by
|
||||
/// the [`NetworkBehaviour`] on connection failure. See [`NetworkBehaviourAction::DialPeer`] for
|
||||
/// example.
|
||||
DialAddress {
|
||||
/// The address to dial.
|
||||
address: Multiaddr,
|
||||
/// The handler to be used to handle the connection to the peer.
|
||||
handler: THandler,
|
||||
},
|
||||
|
||||
/// Instructs the swarm to dial a known `PeerId`.
|
||||
///
|
||||
/// The [`NetworkBehaviour::addresses_of_peer`] method is called to determine which addresses to
|
||||
/// attempt to reach.
|
||||
///
|
||||
/// If we were already trying to dial this node, the addresses that are not yet in the queue of
|
||||
/// addresses to try are added back to this queue.
|
||||
/// Instructs the swarm to start a dial.
|
||||
///
|
||||
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
|
||||
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
|
||||
@ -300,7 +277,7 @@ pub enum NetworkBehaviourAction<
|
||||
/// can be included in the handler, and thus directly send on connection success or extracted by
|
||||
/// the [`NetworkBehaviour`] on connection failure.
|
||||
///
|
||||
/// # Example
|
||||
/// # Example carrying state in the handler
|
||||
///
|
||||
/// ```rust
|
||||
/// # use futures::executor::block_on;
|
||||
@ -312,10 +289,11 @@ pub enum NetworkBehaviourAction<
|
||||
/// # use libp2p::core::PeerId;
|
||||
/// # use libp2p::plaintext::PlainText2Config;
|
||||
/// # use libp2p::swarm::{
|
||||
/// # DialError, DialPeerCondition, IntoProtocolsHandler, KeepAlive, NegotiatedSubstream,
|
||||
/// # DialError, IntoProtocolsHandler, KeepAlive, NegotiatedSubstream,
|
||||
/// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
|
||||
/// # ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent,
|
||||
/// # };
|
||||
/// # use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
|
||||
/// # use libp2p::yamux;
|
||||
/// # use std::collections::VecDeque;
|
||||
/// # use std::task::{Context, Poll};
|
||||
@ -350,21 +328,22 @@ pub enum NetworkBehaviourAction<
|
||||
/// );
|
||||
/// });
|
||||
///
|
||||
/// # #[derive(Default)]
|
||||
/// # struct MyBehaviour {
|
||||
/// # outbox_to_swarm: VecDeque<NetworkBehaviourAction<PreciousMessage, MyHandler>>,
|
||||
/// # }
|
||||
/// #
|
||||
/// # impl MyBehaviour {
|
||||
/// # fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) {
|
||||
/// # self.outbox_to_swarm
|
||||
/// # .push_back(NetworkBehaviourAction::DialPeer {
|
||||
/// # peer_id,
|
||||
/// # condition: DialPeerCondition::Always,
|
||||
/// # handler: MyHandler { message: Some(msg) },
|
||||
/// # });
|
||||
/// # }
|
||||
/// # }
|
||||
/// #[derive(Default)]
|
||||
/// struct MyBehaviour {
|
||||
/// outbox_to_swarm: VecDeque<NetworkBehaviourAction<PreciousMessage, MyHandler>>,
|
||||
/// }
|
||||
///
|
||||
/// impl MyBehaviour {
|
||||
/// fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) {
|
||||
/// self.outbox_to_swarm
|
||||
/// .push_back(NetworkBehaviourAction::Dial {
|
||||
/// opts: DialOpts::peer_id(peer_id)
|
||||
/// .condition(PeerCondition::Always)
|
||||
/// .build(),
|
||||
/// handler: MyHandler { message: Some(msg) },
|
||||
/// });
|
||||
/// }
|
||||
/// }
|
||||
/// #
|
||||
/// impl NetworkBehaviour for MyBehaviour {
|
||||
/// # type ProtocolsHandler = MyHandler;
|
||||
@ -472,14 +451,7 @@ pub enum NetworkBehaviourAction<
|
||||
/// # #[derive(Debug, PartialEq, Eq)]
|
||||
/// # struct PreciousMessage(String);
|
||||
/// ```
|
||||
DialPeer {
|
||||
/// The peer to try reach.
|
||||
peer_id: PeerId,
|
||||
/// The condition for initiating a new dialing attempt.
|
||||
condition: DialPeerCondition,
|
||||
/// The handler to be used to handle the connection to the peer.
|
||||
handler: THandler,
|
||||
},
|
||||
Dial { opts: DialOpts, handler: THandler },
|
||||
|
||||
/// Instructs the `Swarm` to send an event to the handler dedicated to a
|
||||
/// connection with a peer.
|
||||
@ -549,18 +521,9 @@ impl<TOutEvent, THandler: IntoProtocolsHandler, TInEventOld>
|
||||
) -> NetworkBehaviourAction<TOutEvent, THandler, TInEventNew> {
|
||||
match self {
|
||||
NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e),
|
||||
NetworkBehaviourAction::DialAddress { address, handler } => {
|
||||
NetworkBehaviourAction::DialAddress { address, handler }
|
||||
NetworkBehaviourAction::Dial { opts, handler } => {
|
||||
NetworkBehaviourAction::Dial { opts, handler }
|
||||
}
|
||||
NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition,
|
||||
handler,
|
||||
} => NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition,
|
||||
handler,
|
||||
},
|
||||
NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler,
|
||||
@ -589,18 +552,9 @@ impl<TOutEvent, THandler: IntoProtocolsHandler> NetworkBehaviourAction<TOutEvent
|
||||
pub fn map_out<E>(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction<E, THandler> {
|
||||
match self {
|
||||
NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(f(e)),
|
||||
NetworkBehaviourAction::DialAddress { address, handler } => {
|
||||
NetworkBehaviourAction::DialAddress { address, handler }
|
||||
NetworkBehaviourAction::Dial { opts, handler } => {
|
||||
NetworkBehaviourAction::Dial { opts, handler }
|
||||
}
|
||||
NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition,
|
||||
handler,
|
||||
} => NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition,
|
||||
handler,
|
||||
},
|
||||
NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler,
|
||||
@ -640,19 +594,8 @@ where
|
||||
{
|
||||
match self {
|
||||
NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e),
|
||||
NetworkBehaviourAction::DialAddress { address, handler } => {
|
||||
NetworkBehaviourAction::DialAddress {
|
||||
address,
|
||||
handler: f(handler),
|
||||
}
|
||||
}
|
||||
NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition,
|
||||
handler,
|
||||
} => NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition,
|
||||
NetworkBehaviourAction::Dial { opts, handler } => NetworkBehaviourAction::Dial {
|
||||
opts,
|
||||
handler: f(handler),
|
||||
},
|
||||
NetworkBehaviourAction::NotifyHandler {
|
||||
@ -687,29 +630,6 @@ pub enum NotifyHandler {
|
||||
Any,
|
||||
}
|
||||
|
||||
/// The available conditions under which a new dialing attempt to
|
||||
/// a peer is initiated when requested by [`NetworkBehaviourAction::DialPeer`].
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum DialPeerCondition {
|
||||
/// A new dialing attempt is initiated _only if_ the peer is currently
|
||||
/// considered disconnected, i.e. there is no established connection
|
||||
/// and no ongoing dialing attempt.
|
||||
Disconnected,
|
||||
/// A new dialing attempt is initiated _only if_ there is currently
|
||||
/// no ongoing dialing attempt, i.e. the peer is either considered
|
||||
/// disconnected or connected but without an ongoing dialing attempt.
|
||||
NotDialing,
|
||||
/// A new dialing attempt is always initiated, only subject to the
|
||||
/// configured connection limits.
|
||||
Always,
|
||||
}
|
||||
|
||||
impl Default for DialPeerCondition {
|
||||
fn default() -> Self {
|
||||
DialPeerCondition::Disconnected
|
||||
}
|
||||
}
|
||||
|
||||
/// The options which connections to close.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum CloseConnection {
|
||||
|
217
swarm/src/dial_opts.rs
Normal file
217
swarm/src/dial_opts.rs
Normal file
@ -0,0 +1,217 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// Copyright 2021 Protocol Labs.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use libp2p_core::{Multiaddr, PeerId};
|
||||
|
||||
/// Options to configure a dial to a known or unknown peer.
|
||||
///
|
||||
/// Used in [`Swarm::dial`](crate::Swarm::dial) and
|
||||
/// [`NetworkBehaviourAction::Dial`](crate::behaviour::NetworkBehaviourAction::Dial).
|
||||
///
|
||||
/// To construct use either of:
|
||||
///
|
||||
/// - [`DialOpts::peer_id`] dialing a known peer
|
||||
///
|
||||
/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
|
||||
#[derive(Debug)]
|
||||
pub struct DialOpts(pub(super) Opts);
|
||||
|
||||
impl DialOpts {
|
||||
/// Dial a known peer.
|
||||
///
|
||||
/// ```
|
||||
/// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
|
||||
/// # use libp2p_core::PeerId;
|
||||
/// DialOpts::peer_id(PeerId::random())
|
||||
/// .condition(PeerCondition::Disconnected)
|
||||
/// .addresses(vec!["/ip6/::1/tcp/12345".parse().unwrap()])
|
||||
/// .extend_addresses_through_behaviour()
|
||||
/// .build();
|
||||
/// ```
|
||||
pub fn peer_id(peer_id: PeerId) -> WithPeerId {
|
||||
WithPeerId {
|
||||
peer_id,
|
||||
condition: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Dial an unknown peer.
|
||||
///
|
||||
/// ```
|
||||
/// # use libp2p_swarm::dial_opts::DialOpts;
|
||||
/// DialOpts::unknown_peer_id()
|
||||
/// .address("/ip6/::1/tcp/12345".parse().unwrap())
|
||||
/// .build();
|
||||
/// ```
|
||||
pub fn unknown_peer_id() -> WithoutPeerId {
|
||||
WithoutPeerId {}
|
||||
}
|
||||
|
||||
/// Get the [`PeerId`] specified in a [`DialOpts`] if any.
|
||||
pub fn get_peer_id(&self) -> Option<PeerId> {
|
||||
match self {
|
||||
DialOpts(Opts::WithPeerId(WithPeerId { peer_id, .. })) => Some(*peer_id),
|
||||
DialOpts(Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses {
|
||||
peer_id, ..
|
||||
})) => Some(*peer_id),
|
||||
DialOpts(Opts::WithoutPeerIdWithAddress(_)) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Multiaddr> for DialOpts {
|
||||
fn from(address: Multiaddr) -> Self {
|
||||
DialOpts::unknown_peer_id().address(address).build()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PeerId> for DialOpts {
|
||||
fn from(peer_id: PeerId) -> Self {
|
||||
DialOpts::peer_id(peer_id).build()
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal options type.
|
||||
///
|
||||
/// Not to be constructed manually. Use either of the below instead:
|
||||
///
|
||||
/// - [`DialOpts::peer_id`] dialing a known peer
|
||||
/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
|
||||
#[derive(Debug)]
|
||||
pub(super) enum Opts {
|
||||
WithPeerId(WithPeerId),
|
||||
WithPeerIdWithAddresses(WithPeerIdWithAddresses),
|
||||
WithoutPeerIdWithAddress(WithoutPeerIdWithAddress),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WithPeerId {
|
||||
pub(crate) peer_id: PeerId,
|
||||
pub(crate) condition: PeerCondition,
|
||||
}
|
||||
|
||||
impl WithPeerId {
|
||||
/// Specify a [`PeerCondition`] for the dial.
|
||||
pub fn condition(mut self, condition: PeerCondition) -> Self {
|
||||
self.condition = condition;
|
||||
self
|
||||
}
|
||||
|
||||
/// Specify a set of addresses to be used to dial the known peer.
|
||||
pub fn addresses(self, addresses: Vec<Multiaddr>) -> WithPeerIdWithAddresses {
|
||||
WithPeerIdWithAddresses {
|
||||
peer_id: self.peer_id,
|
||||
condition: self.condition,
|
||||
addresses,
|
||||
extend_addresses_through_behaviour: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the final [`DialOpts`].
|
||||
///
|
||||
/// Addresses to dial the peer are retrieved via
|
||||
/// [`NetworkBehaviour::addresses_of_peer`](crate::behaviour::NetworkBehaviour::addresses_of_peer).
|
||||
pub fn build(self) -> DialOpts {
|
||||
DialOpts(Opts::WithPeerId(self))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WithPeerIdWithAddresses {
|
||||
pub(crate) peer_id: PeerId,
|
||||
pub(crate) condition: PeerCondition,
|
||||
pub(crate) addresses: Vec<Multiaddr>,
|
||||
pub(crate) extend_addresses_through_behaviour: bool,
|
||||
}
|
||||
|
||||
impl WithPeerIdWithAddresses {
|
||||
/// Specify a [`PeerCondition`] for the dial.
|
||||
pub fn condition(mut self, condition: PeerCondition) -> Self {
|
||||
self.condition = condition;
|
||||
self
|
||||
}
|
||||
|
||||
/// In addition to the provided addresses, extend the set via
|
||||
/// [`NetworkBehaviour::addresses_of_peer`](crate::behaviour::NetworkBehaviour::addresses_of_peer).
|
||||
pub fn extend_addresses_through_behaviour(mut self) -> Self {
|
||||
self.extend_addresses_through_behaviour = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Build the final [`DialOpts`].
|
||||
pub fn build(self) -> DialOpts {
|
||||
DialOpts(Opts::WithPeerIdWithAddresses(self))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WithoutPeerId {}
|
||||
|
||||
impl WithoutPeerId {
|
||||
/// Specify a single address to dial the unknown peer.
|
||||
pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress {
|
||||
WithoutPeerIdWithAddress { address }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WithoutPeerIdWithAddress {
|
||||
pub(crate) address: Multiaddr,
|
||||
}
|
||||
|
||||
impl WithoutPeerIdWithAddress {
|
||||
/// Build the final [`DialOpts`].
|
||||
pub fn build(self) -> DialOpts {
|
||||
DialOpts(Opts::WithoutPeerIdWithAddress(self))
|
||||
}
|
||||
}
|
||||
|
||||
/// The available conditions under which a new dialing attempt to
|
||||
/// a known peer is initiated.
|
||||
///
|
||||
/// ```
|
||||
/// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
|
||||
/// # use libp2p_core::PeerId;
|
||||
/// #
|
||||
/// DialOpts::peer_id(PeerId::random())
|
||||
/// .condition(PeerCondition::Disconnected)
|
||||
/// .build();
|
||||
/// ```
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum PeerCondition {
|
||||
/// A new dialing attempt is initiated _only if_ the peer is currently
|
||||
/// considered disconnected, i.e. there is no established connection
|
||||
/// and no ongoing dialing attempt.
|
||||
Disconnected,
|
||||
/// A new dialing attempt is initiated _only if_ there is currently
|
||||
/// no ongoing dialing attempt, i.e. the peer is either considered
|
||||
/// disconnected or connected but without an ongoing dialing attempt.
|
||||
NotDialing,
|
||||
/// A new dialing attempt is always initiated, only subject to the
|
||||
/// configured connection limits.
|
||||
Always,
|
||||
}
|
||||
|
||||
impl Default for PeerCondition {
|
||||
fn default() -> Self {
|
||||
PeerCondition::Disconnected
|
||||
}
|
||||
}
|
247
swarm/src/lib.rs
247
swarm/src/lib.rs
@ -59,12 +59,13 @@ mod registry;
|
||||
mod test;
|
||||
mod upgrade;
|
||||
|
||||
pub mod dial_opts;
|
||||
pub mod protocols_handler;
|
||||
pub mod toggle;
|
||||
|
||||
pub use behaviour::{
|
||||
CloseConnection, DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction,
|
||||
NetworkBehaviourEventProcess, NotifyHandler, PollParameters,
|
||||
CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess,
|
||||
NotifyHandler, PollParameters,
|
||||
};
|
||||
pub use protocols_handler::{
|
||||
IntoProtocolsHandler, IntoProtocolsHandlerSelect, KeepAlive, OneShotHandler,
|
||||
@ -73,6 +74,7 @@ pub use protocols_handler::{
|
||||
};
|
||||
pub use registry::{AddAddressResult, AddressRecord, AddressScore};
|
||||
|
||||
use dial_opts::{DialOpts, PeerCondition};
|
||||
use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream};
|
||||
use libp2p_core::{
|
||||
connection::{
|
||||
@ -321,72 +323,151 @@ where
|
||||
self.network.remove_listener(id)
|
||||
}
|
||||
|
||||
/// Initiates a new dialing attempt to the given address.
|
||||
pub fn dial_addr(&mut self, addr: Multiaddr) -> Result<(), DialError> {
|
||||
/// Dial a known or unknown peer.
|
||||
///
|
||||
/// See also [`DialOpts`].
|
||||
///
|
||||
/// ```
|
||||
/// # use libp2p_swarm::Swarm;
|
||||
/// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
|
||||
/// # use libp2p_core::{Multiaddr, PeerId, Transport};
|
||||
/// # use libp2p_core::transport::dummy::DummyTransport;
|
||||
/// # use libp2p_swarm::DummyBehaviour;
|
||||
/// #
|
||||
/// let mut swarm = Swarm::new(
|
||||
/// DummyTransport::new().boxed(),
|
||||
/// DummyBehaviour::default(),
|
||||
/// PeerId::random(),
|
||||
/// );
|
||||
///
|
||||
/// // Dial a known peer.
|
||||
/// swarm.dial(PeerId::random());
|
||||
///
|
||||
/// // Dial an unknown peer.
|
||||
/// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
|
||||
/// ```
|
||||
pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
|
||||
let handler = self.behaviour.new_handler();
|
||||
self.dial_addr_with_handler(addr, handler)
|
||||
.map_err(DialError::from_network_dial_error)
|
||||
.map_err(|(e, _)| e)
|
||||
}
|
||||
|
||||
fn dial_addr_with_handler(
|
||||
&mut self,
|
||||
addr: Multiaddr,
|
||||
handler: <TBehaviour as NetworkBehaviour>::ProtocolsHandler,
|
||||
) -> Result<(), network::DialError<NodeHandlerWrapperBuilder<THandler<TBehaviour>>>> {
|
||||
let handler = handler
|
||||
.into_node_handler_builder()
|
||||
.with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override);
|
||||
|
||||
self.network.dial(&addr, handler).map(|_id| ())
|
||||
}
|
||||
|
||||
/// Initiates a new dialing attempt to the given peer.
|
||||
pub fn dial(&mut self, peer_id: &PeerId) -> Result<(), DialError> {
|
||||
let handler = self.behaviour.new_handler();
|
||||
self.dial_with_handler(peer_id, handler)
|
||||
self.dial_with_handler(opts.into(), handler)
|
||||
}
|
||||
|
||||
fn dial_with_handler(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
opts: DialOpts,
|
||||
handler: <TBehaviour as NetworkBehaviour>::ProtocolsHandler,
|
||||
) -> Result<(), DialError> {
|
||||
if self.banned_peers.contains(peer_id) {
|
||||
let error = DialError::Banned;
|
||||
self.behaviour
|
||||
.inject_dial_failure(Some(*peer_id), handler, &error);
|
||||
return Err(error);
|
||||
}
|
||||
match opts.0 {
|
||||
// Dial a known peer.
|
||||
dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { peer_id, condition })
|
||||
| dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses {
|
||||
peer_id,
|
||||
condition,
|
||||
..
|
||||
}) => {
|
||||
// Check [`PeerCondition`] if provided.
|
||||
let condition_matched = match condition {
|
||||
PeerCondition::Disconnected => self.network.is_disconnected(&peer_id),
|
||||
PeerCondition::NotDialing => !self.network.is_dialing(&peer_id),
|
||||
PeerCondition::Always => true,
|
||||
};
|
||||
if !condition_matched {
|
||||
self.behaviour.inject_dial_failure(
|
||||
Some(peer_id),
|
||||
handler,
|
||||
&DialError::DialPeerConditionFalse(condition),
|
||||
);
|
||||
|
||||
let self_listening = self.listened_addrs.clone();
|
||||
let mut addrs = self
|
||||
.behaviour
|
||||
.addresses_of_peer(peer_id)
|
||||
.into_iter()
|
||||
.filter(move |a| !self_listening.contains(a))
|
||||
.peekable();
|
||||
return Err(DialError::DialPeerConditionFalse(condition));
|
||||
}
|
||||
|
||||
if addrs.peek().is_none() {
|
||||
let error = DialError::NoAddresses;
|
||||
self.behaviour
|
||||
.inject_dial_failure(Some(*peer_id), handler, &error);
|
||||
return Err(error);
|
||||
};
|
||||
// Check if peer is banned.
|
||||
if self.banned_peers.contains(&peer_id) {
|
||||
let error = DialError::Banned;
|
||||
self.behaviour
|
||||
.inject_dial_failure(Some(peer_id), handler, &error);
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
let handler = handler
|
||||
.into_node_handler_builder()
|
||||
.with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override);
|
||||
match self.network.peer(*peer_id).dial(addrs, handler) {
|
||||
Ok(_connection_id) => Ok(()),
|
||||
Err(error) => {
|
||||
let (error, handler) = DialError::from_network_dial_error(error);
|
||||
self.behaviour.inject_dial_failure(
|
||||
Some(*peer_id),
|
||||
handler.into_protocols_handler(),
|
||||
&error,
|
||||
);
|
||||
Err(error)
|
||||
// Retrieve the addresses to dial.
|
||||
let addresses = {
|
||||
let mut addresses = match opts.0 {
|
||||
dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { .. }) => {
|
||||
self.behaviour.addresses_of_peer(&peer_id)
|
||||
}
|
||||
dial_opts::Opts::WithPeerIdWithAddresses(
|
||||
dial_opts::WithPeerIdWithAddresses {
|
||||
peer_id,
|
||||
mut addresses,
|
||||
extend_addresses_through_behaviour,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
if extend_addresses_through_behaviour {
|
||||
addresses.extend(self.behaviour.addresses_of_peer(&peer_id))
|
||||
}
|
||||
addresses
|
||||
}
|
||||
dial_opts::Opts::WithoutPeerIdWithAddress { .. } => {
|
||||
unreachable!("Due to outer match.")
|
||||
}
|
||||
};
|
||||
|
||||
let mut unique_addresses = HashSet::new();
|
||||
addresses.retain(|a| {
|
||||
!self.listened_addrs.contains(a) && unique_addresses.insert(a.clone())
|
||||
});
|
||||
|
||||
if addresses.is_empty() {
|
||||
let error = DialError::NoAddresses;
|
||||
self.behaviour
|
||||
.inject_dial_failure(Some(peer_id), handler, &error);
|
||||
return Err(error);
|
||||
};
|
||||
|
||||
addresses
|
||||
};
|
||||
|
||||
let handler = handler
|
||||
.into_node_handler_builder()
|
||||
.with_substream_upgrade_protocol_override(
|
||||
self.substream_upgrade_protocol_override,
|
||||
);
|
||||
|
||||
match self.network.peer(peer_id).dial(addresses, handler) {
|
||||
Ok(_connection_id) => Ok(()),
|
||||
Err(error) => {
|
||||
let (error, handler) = DialError::from_network_dial_error(error);
|
||||
self.behaviour.inject_dial_failure(
|
||||
Some(peer_id),
|
||||
handler.into_protocols_handler(),
|
||||
&error,
|
||||
);
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Dial an unknown peer.
|
||||
dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress {
|
||||
address,
|
||||
}) => {
|
||||
let handler = handler
|
||||
.into_node_handler_builder()
|
||||
.with_substream_upgrade_protocol_override(
|
||||
self.substream_upgrade_protocol_override,
|
||||
);
|
||||
|
||||
match self.network.dial(&address, handler).map(|_id| ()) {
|
||||
Ok(_connection_id) => Ok(()),
|
||||
Err(error) => {
|
||||
let (error, handler) = DialError::from_network_dial_error(error);
|
||||
self.behaviour.inject_dial_failure(
|
||||
None,
|
||||
handler.into_protocols_handler(),
|
||||
&error,
|
||||
);
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -808,35 +889,12 @@ where
|
||||
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
|
||||
return Poll::Ready(SwarmEvent::Behaviour(event))
|
||||
}
|
||||
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
|
||||
let _ = Swarm::dial_addr_with_handler(&mut *this, address, handler);
|
||||
}
|
||||
Poll::Ready(NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition,
|
||||
handler,
|
||||
}) => {
|
||||
let condition_matched = match condition {
|
||||
DialPeerCondition::Disconnected => this.network.is_disconnected(&peer_id),
|
||||
DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id),
|
||||
DialPeerCondition::Always => true,
|
||||
};
|
||||
if condition_matched {
|
||||
if Swarm::dial_with_handler(this, &peer_id, handler).is_ok() {
|
||||
Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => {
|
||||
let peer_id = opts.get_peer_id();
|
||||
if let Ok(()) = this.dial_with_handler(opts, handler) {
|
||||
if let Some(peer_id) = peer_id {
|
||||
return Poll::Ready(SwarmEvent::Dialing(peer_id));
|
||||
}
|
||||
} else {
|
||||
log::trace!(
|
||||
"Condition for new dialing attempt to {:?} not met: {:?}",
|
||||
peer_id,
|
||||
condition
|
||||
);
|
||||
|
||||
this.behaviour.inject_dial_failure(
|
||||
Some(peer_id),
|
||||
handler,
|
||||
&DialError::DialPeerConditionFalse(condition),
|
||||
);
|
||||
}
|
||||
}
|
||||
Poll::Ready(NetworkBehaviourAction::NotifyHandler {
|
||||
@ -1214,8 +1272,9 @@ pub enum DialError {
|
||||
/// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
|
||||
/// for the peer to dial.
|
||||
NoAddresses,
|
||||
/// The provided [`DialPeerCondition`] evaluated to false and thus the dial was aborted.
|
||||
DialPeerConditionFalse(DialPeerCondition),
|
||||
/// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
|
||||
/// the dial was aborted.
|
||||
DialPeerConditionFalse(dial_opts::PeerCondition),
|
||||
/// Pending connection attempt has been aborted.
|
||||
Aborted,
|
||||
/// The peer identity obtained on the connection did not
|
||||
@ -1457,7 +1516,7 @@ mod tests {
|
||||
let num_connections = 10;
|
||||
|
||||
for _ in 0..num_connections {
|
||||
swarm1.dial_addr(addr2.clone()).unwrap();
|
||||
swarm1.dial(addr2.clone()).unwrap();
|
||||
}
|
||||
let mut state = State::Connecting;
|
||||
|
||||
@ -1489,7 +1548,7 @@ mod tests {
|
||||
swarm2.behaviour.reset();
|
||||
unbanned = true;
|
||||
for _ in 0..num_connections {
|
||||
swarm2.dial_addr(addr1.clone()).unwrap();
|
||||
swarm2.dial(addr1.clone()).unwrap();
|
||||
}
|
||||
state = State::Connecting;
|
||||
}
|
||||
@ -1532,7 +1591,7 @@ mod tests {
|
||||
let num_connections = 10;
|
||||
|
||||
for _ in 0..num_connections {
|
||||
swarm1.dial_addr(addr2.clone()).unwrap();
|
||||
swarm1.dial(addr2.clone()).unwrap();
|
||||
}
|
||||
let mut state = State::Connecting;
|
||||
|
||||
@ -1562,7 +1621,7 @@ mod tests {
|
||||
swarm1.behaviour.reset();
|
||||
swarm2.behaviour.reset();
|
||||
for _ in 0..num_connections {
|
||||
swarm2.dial_addr(addr1.clone()).unwrap();
|
||||
swarm2.dial(addr1.clone()).unwrap();
|
||||
}
|
||||
state = State::Connecting;
|
||||
}
|
||||
@ -1605,7 +1664,7 @@ mod tests {
|
||||
let num_connections = 10;
|
||||
|
||||
for _ in 0..num_connections {
|
||||
swarm1.dial_addr(addr2.clone()).unwrap();
|
||||
swarm1.dial(addr2.clone()).unwrap();
|
||||
}
|
||||
let mut state = State::Connecting;
|
||||
|
||||
@ -1638,7 +1697,7 @@ mod tests {
|
||||
swarm1.behaviour.reset();
|
||||
swarm2.behaviour.reset();
|
||||
for _ in 0..num_connections {
|
||||
swarm2.dial_addr(addr1.clone()).unwrap();
|
||||
swarm2.dial(addr1.clone()).unwrap();
|
||||
}
|
||||
state = State::Connecting;
|
||||
}
|
||||
@ -1680,7 +1739,7 @@ mod tests {
|
||||
let num_connections = 10;
|
||||
|
||||
for _ in 0..num_connections {
|
||||
swarm1.dial_addr(addr2.clone()).unwrap();
|
||||
swarm1.dial(addr2.clone()).unwrap();
|
||||
}
|
||||
let mut state = State::Connecting;
|
||||
let mut disconnected_conn_id = None;
|
||||
|
Reference in New Issue
Block a user