diff --git a/CHANGELOG.md b/CHANGELOG.md index 20fe3de7..81da8bb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,11 @@ # 0.51.0 [unreleased] +- Enable `NetworkBehaviour`s to manage connections. + This deprecates `NetworkBehaviour::new_handler` and `NetworkBehaviour::addresses_of_peer`. + Due to limitations in the Rust compiler, these deprecations may not show up for you, nevertheless they will be removed in a future release. + See [`libp2p-swarm`'s CHANGELOG](swarm/CHANGELOG.md#0420) for details. + - Count bandwidth at the application level. Previously `BandwidthLogging` would implement `Transport` and now implements `StreamMuxer` ([PR 3180](https://github.com/libp2p/rust-libp2p/pull/3180)). - `BandwidthLogging::new` now requires a 2nd argument: `Arc` - Remove `BandwidthFuture` diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index d2fa0f24..ff19c650 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -244,6 +244,9 @@ impl super::Recorder { record(OutgoingConnectionError::WrongPeerId) } + libp2p_swarm::DialError::Denied { .. } => { + record(OutgoingConnectionError::Denied) + } }; } libp2p_swarm::SwarmEvent::BannedPeer { endpoint, .. } => { @@ -344,6 +347,7 @@ enum OutgoingConnectionError { WrongPeerId, TransportMultiaddrNotSupported, TransportOther, + Denied, } #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] @@ -360,6 +364,7 @@ enum IncomingConnectionError { TransportErrorOther, Aborted, ConnectionLimit, + Denied, } impl From<&libp2p_swarm::ListenError> for IncomingConnectionError { @@ -377,6 +382,7 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError { libp2p_core::transport::TransportError::Other(_), ) => IncomingConnectionError::TransportErrorOther, libp2p_swarm::ListenError::Aborted => IncomingConnectionError::Aborted, + libp2p_swarm::ListenError::Denied { .. } => IncomingConnectionError::Denied, } } } diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 9ffbaefd..7e83992a 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -37,8 +37,8 @@ use libp2p_swarm::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr, ExpiredListenAddr, FromSwarm, }, - ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, + NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use std::{ collections::{HashMap, VecDeque}, @@ -485,12 +485,55 @@ impl NetworkBehaviour for Behaviour { } } - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.inner.new_handler() + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.inner + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) } - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { - self.inner.addresses_of_peer(peer) + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.inner.handle_established_inbound_connection( + _connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + self.inner.handle_pending_outbound_connection( + _connection_id, + maybe_peer, + _addresses, + _effective_role, + ) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.inner + .handle_established_outbound_connection(_connection_id, peer, addr, role_override) } fn on_swarm_event(&mut self, event: FromSwarm) { diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index a494b411..f8edc593 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -24,14 +24,14 @@ use crate::handler; use either::Either; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Protocol; -use libp2p_core::{Multiaddr, PeerId}; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::dial_opts::{self, DialOpts}; +use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent}; use libp2p_swarm::{ ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, }; -use libp2p_swarm::{ConnectionId, THandlerOutEvent}; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; use thiserror::Error; @@ -64,12 +64,14 @@ pub enum Error { #[error("Failed to dial peer.")] Dial, #[error("Failed to establish substream: {0}.")] - Handler(ConnectionHandlerUpgrErr), + Handler(ConnectionHandlerUpgrErr), } pub struct Behaviour { /// Queue of actions to return when polled. - queued_events: VecDeque>>, + queued_events: VecDeque< + NetworkBehaviourAction>>, + >, /// All direct (non-relayed) connections. direct_connections: HashMap>, @@ -237,11 +239,82 @@ impl Behaviour { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = handler::Prototype; + type ConnectionHandler = Either< + handler::relayed::Handler, + Either, + >; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - handler::Prototype + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + match self + .outgoing_direct_connection_attempts + .remove(&(connection_id, peer)) + { + None => { + let handler = if is_relayed(local_addr) { + Either::Left(handler::relayed::Handler::new(ConnectedPoint::Listener { + local_addr: local_addr.clone(), + send_back_addr: remote_addr.clone(), + })) // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. + } else { + Either::Right(Either::Right(dummy::ConnectionHandler)) + }; + + Ok(handler) + } + Some(_) => { + assert!( + !is_relayed(local_addr), + "`Prototype::DirectConnection` is never created for relayed connection." + ); + + Ok(Either::Right(Either::Left( + handler::direct::Handler::default(), + ))) + } + } + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + match self + .outgoing_direct_connection_attempts + .remove(&(connection_id, peer)) + { + None => { + let handler = if is_relayed(addr) { + Either::Left(handler::relayed::Handler::new(ConnectedPoint::Dialer { + address: addr.clone(), + role_override, + })) // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. + } else { + Either::Right(Either::Right(dummy::ConnectionHandler)) + }; + + Ok(handler) + } + Some(_) => { + assert!( + !is_relayed(addr), + "`Prototype::DirectConnection` is never created for relayed connection." + ); + + Ok(Either::Right(Either::Left( + handler::direct::Handler::default(), + ))) + } + } } fn on_connection_handler_event( @@ -332,7 +405,7 @@ impl NetworkBehaviour for Behaviour { self.queued_events .push_back(NetworkBehaviourAction::Dial { opts }); } - Either::Right(handler::direct::Event::DirectConnectionEstablished) => { + Either::Right(Either::Left(handler::direct::Event::DirectConnectionEstablished)) => { self.queued_events.extend([ NetworkBehaviourAction::NotifyHandler { peer_id: event_source, @@ -348,6 +421,7 @@ impl NetworkBehaviour for Behaviour { ), ]); } + Either::Right(Either::Right(never)) => void::unreachable(never), }; } @@ -386,3 +460,7 @@ impl NetworkBehaviour for Behaviour { } } } + +fn is_relayed(addr: &Multiaddr) -> bool { + addr.iter().any(|p| p == Protocol::P2pCircuit) +} diff --git a/protocols/dcutr/src/handler.rs b/protocols/dcutr/src/handler.rs index 01062415..cc59e3ab 100644 --- a/protocols/dcutr/src/handler.rs +++ b/protocols/dcutr/src/handler.rs @@ -18,29 +18,5 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol; -use either::Either; -use libp2p_core::{ConnectedPoint, PeerId}; -use libp2p_swarm::handler::SendWrapper; -use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler}; - pub mod direct; pub mod relayed; - -pub struct Prototype; - -impl IntoConnectionHandler for Prototype { - type Handler = Either; - - fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - if endpoint.is_relayed() { - Either::Left(relayed::Handler::new(endpoint.clone())) - } else { - Either::Right(direct::Handler::default()) // This is a direct connection. What we don't know is whether it is the one we created or another one that happened accidentally. - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - Either::Left(SendWrapper(Either::Left(protocol::inbound::Upgrade {}))) - } -} diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index d0d77b5a..769bf7df 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -26,11 +26,11 @@ use crate::topic::Topic; use crate::FloodsubConfig; use cuckoofilter::{CuckooError, CuckooFilter}; use fnv::FnvHashSet; -use libp2p_core::PeerId; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::{ - dial_opts::DialOpts, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - OneShotHandler, PollParameters, THandlerInEvent, THandlerOutEvent, + dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use log::warn; use smallvec::SmallVec; @@ -334,8 +334,24 @@ impl NetworkBehaviour for Floodsub { type ConnectionHandler = OneShotHandler; type OutEvent = FloodsubEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Default::default() + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(Default::default()) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Default::default()) } fn on_connection_handler_event( diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index d709e142..8346e231 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -36,13 +36,14 @@ use prost::Message as _; use rand::{seq::SliceRandom, thread_rng}; use libp2p_core::{ - identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Multiaddr, PeerId, + identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr, + PeerId, }; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm}, dial_opts::DialOpts, - ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, - THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use wasm_timer::Instant; @@ -3289,11 +3290,30 @@ where type ConnectionHandler = Handler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Handler::new( + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( ProtocolConfig::new(&self.config), self.config.idle_timeout(), - ) + )) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( + ProtocolConfig::new(&self.config), + self.config.idle_timeout(), + )) } fn on_connection_handler_event( diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 01cf0c8b..22e50d1d 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -18,16 +18,16 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{self, InEvent, Proto}; +use crate::handler::{self, Handler, InEvent}; use crate::protocol::{Info, Protocol, UpgradeError}; -use libp2p_core::{multiaddr, ConnectedPoint, Multiaddr, PeerId, PublicKey}; +use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr, PeerId, PublicKey}; +use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ - behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, - dial_opts::DialOpts, - AddressScore, ConnectionHandlerUpgrErr, ConnectionId, DialError, ExternalAddresses, - ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, - THandlerInEvent, THandlerOutEvent, + dial_opts::DialOpts, AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError, + ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, THandlerInEvent, }; +use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent}; use lru::LruCache; use std::num::NonZeroUsize; use std::{ @@ -234,17 +234,43 @@ impl Behaviour { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = Proto; + type ConnectionHandler = Handler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Proto::new( + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + peer: PeerId, + _: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( self.config.initial_delay, self.config.interval, + peer, self.config.local_public_key.clone(), self.config.protocol_version.clone(), self.config.agent_version.clone(), - ) + remote_addr.clone(), + )) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( + self.config.initial_delay, + self.config.interval, + peer, + self.config.local_public_key.clone(), + self.config.protocol_version.clone(), + self.config.agent_version.clone(), + addr.clone(), // TODO: This is weird? That is the public address we dialed, shouldn't need to tell the other party? + )) } fn on_connection_handler_event( @@ -352,8 +378,19 @@ impl NetworkBehaviour for Behaviour { } } - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { - self.discovered_peers.get(peer) + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let peer = match maybe_peer { + None => return Ok(vec![]), + Some(peer) => peer, + }; + + Ok(self.discovered_peers.get(&peer)) } fn on_swarm_event(&mut self, event: FromSwarm) { diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 7b5f4b58..2aa8cf34 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -27,70 +27,19 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; use futures_timer::Delay; use libp2p_core::upgrade::SelectUpgrade; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, PublicKey}; +use libp2p_core::{Multiaddr, PeerId, PublicKey}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use log::warn; use smallvec::SmallVec; use std::collections::VecDeque; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; -pub struct Proto { - initial_delay: Duration, - interval: Duration, - public_key: PublicKey, - protocol_version: String, - agent_version: String, -} - -impl Proto { - pub fn new( - initial_delay: Duration, - interval: Duration, - public_key: PublicKey, - protocol_version: String, - agent_version: String, - ) -> Self { - Proto { - initial_delay, - interval, - public_key, - protocol_version, - agent_version, - } - } -} - -impl IntoConnectionHandler for Proto { - type Handler = Handler; - - fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - let observed_addr = match endpoint { - ConnectedPoint::Dialer { address, .. } => address, - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, - }; - - Handler::new( - self.initial_delay, - self.interval, - *remote_peer_id, - self.public_key, - self.protocol_version, - self.agent_version, - observed_addr.clone(), - ) - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - SelectUpgrade::new(Identify, Push::inbound()) - } -} - /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 43fd0fde..9d3d9014 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -24,7 +24,7 @@ mod test; use crate::addresses::Addresses; use crate::handler::{ - KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn, KademliaHandlerProto, + KademliaHandler, KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId, }; use crate::jobs::*; @@ -39,14 +39,15 @@ use crate::record::{ use crate::K_VALUE; use fnv::{FnvHashMap, FnvHashSet}; use instant::Instant; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, }; use libp2p_swarm::{ dial_opts::{self, DialOpts}, - ConnectionId, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses, + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, }; use log::{debug, info, warn}; use smallvec::SmallVec; @@ -1927,6 +1928,7 @@ where | DialError::InvalidPeerId { .. } | DialError::WrongPeerId { .. } | DialError::Aborted + | DialError::Denied { .. } | DialError::Transport(_) | DialError::NoAddresses => { if let DialError::Transport(addresses) = error { @@ -1978,21 +1980,66 @@ impl NetworkBehaviour for Kademlia where TStore: RecordStore + Send + 'static, { - type ConnectionHandler = KademliaHandlerProto; + type ConnectionHandler = KademliaHandler; type OutEvent = KademliaEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - KademliaHandlerProto::new(KademliaHandlerConfig { - protocol_config: self.protocol_config.clone(), - allow_listening: true, - idle_timeout: self.connection_idle_timeout, - }) + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(KademliaHandler::new( + KademliaHandlerConfig { + protocol_config: self.protocol_config.clone(), + allow_listening: true, + idle_timeout: self.connection_idle_timeout, + }, + ConnectedPoint::Listener { + local_addr: local_addr.clone(), + send_back_addr: remote_addr.clone(), + }, + peer, + )) } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(KademliaHandler::new( + KademliaHandlerConfig { + protocol_config: self.protocol_config.clone(), + allow_listening: true, + idle_timeout: self.connection_idle_timeout, + }, + ConnectedPoint::Dialer { + address: addr.clone(), + role_override, + }, + peer, + )) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let peer_id = match maybe_peer { + None => return Ok(vec![]), + Some(peer) => peer, + }; + // We should order addresses from decreasing likelyhood of connectivity, so start with // the addresses of that peer in the k-buckets. - let key = kbucket::Key::from(*peer_id); + let key = kbucket::Key::from(peer_id); let mut peer_addrs = if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) { let addrs = entry.value().iter().cloned().collect::>(); @@ -2004,12 +2051,12 @@ where // We add to that a temporary list of addresses from the ongoing queries. for query in self.queries.iter() { - if let Some(addrs) = query.inner.addresses.get(peer_id) { + if let Some(addrs) = query.inner.addresses.get(&peer_id) { peer_addrs.extend(addrs.iter().cloned()) } } - peer_addrs + Ok(peer_addrs) } fn on_connection_handler_event( diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index ba65902e..2f51eaef 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -1318,7 +1318,15 @@ fn network_behaviour_on_address_change() { // At this point the remote is not yet known to support the // configured protocol name, so the peer is not yet in the // local routing table and hence no addresses are known. - assert!(kademlia.addresses_of_peer(&remote_peer_id).is_empty()); + assert!(kademlia + .handle_pending_outbound_connection( + connection_id, + Some(remote_peer_id), + &[], + Endpoint::Dialer + ) + .unwrap() + .is_empty()); // Mimick the connection handler confirming the protocol for // the test connection, so that the peer is added to the routing table. @@ -1330,7 +1338,14 @@ fn network_behaviour_on_address_change() { assert_eq!( vec![old_address.clone()], - kademlia.addresses_of_peer(&remote_peer_id), + kademlia + .handle_pending_outbound_connection( + connection_id, + Some(remote_peer_id), + &[], + Endpoint::Dialer + ) + .unwrap(), ); kademlia.on_swarm_event(FromSwarm::AddressChange(AddressChange { @@ -1348,7 +1363,14 @@ fn network_behaviour_on_address_change() { assert_eq!( vec![new_address], - kademlia.addresses_of_peer(&remote_peer_id), + kademlia + .handle_pending_outbound_connection( + connection_id, + Some(remote_peer_id), + &[], + Endpoint::Dialer + ) + .unwrap(), ); } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index fadd29d9..27938c1f 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -32,8 +32,8 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use log::trace; use std::collections::VecDeque; @@ -44,39 +44,6 @@ use std::{ const MAX_NUM_SUBSTREAMS: usize = 32; -/// A prototype from which [`KademliaHandler`]s can be constructed. -pub struct KademliaHandlerProto { - config: KademliaHandlerConfig, - _type: PhantomData, -} - -impl KademliaHandlerProto { - pub fn new(config: KademliaHandlerConfig) -> Self { - KademliaHandlerProto { - config, - _type: PhantomData, - } - } -} - -impl IntoConnectionHandler - for KademliaHandlerProto -{ - type Handler = KademliaHandler; - - fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id) - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - if self.config.allow_listening { - Either::Left(self.config.protocol_config.clone()) - } else { - Either::Right(upgrade::DeniedUpgrade) - } - } -} - /// Protocol handler that manages substreams for the Kademlia protocol /// on a single connection with a peer. /// diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 472e1101..f110373f 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -27,11 +27,11 @@ use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::Config; use futures::Stream; use if_watch::IfEvent; -use libp2p_core::{Multiaddr, PeerId}; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ - dummy, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, PollParameters, - THandlerInEvent, THandlerOutEvent, + dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, + NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; @@ -174,22 +174,50 @@ where type ConnectionHandler = dummy::ConnectionHandler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - dummy::ConnectionHandler + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(dummy::ConnectionHandler) } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.discovered_nodes + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let peer_id = match maybe_peer { + None => return Ok(vec![]), + Some(peer) => peer, + }; + + Ok(self + .discovered_nodes .iter() - .filter(|(peer, _, _)| peer == peer_id) + .filter(|(peer, _, _)| peer == &peer_id) .map(|(_, addr, _)| addr.clone()) - .collect() + .collect()) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(dummy::ConnectionHandler) } fn on_connection_handler_event( &mut self, _: PeerId, - _: libp2p_swarm::ConnectionId, + _: ConnectionId, ev: THandlerOutEvent, ) { void::unreachable(ev) diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 9da00949..b9016f34 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -47,10 +47,10 @@ mod protocol; use handler::Handler; pub use handler::{Config, Failure, Success}; -use libp2p_core::PeerId; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use libp2p_swarm::{ - behaviour::FromSwarm, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, PollParameters, - THandlerInEvent, THandlerOutEvent, + behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use std::{ collections::VecDeque, @@ -120,8 +120,24 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = Handler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Handler::new(self.config.clone()) + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> std::result::Result, ConnectionDenied> { + Ok(Handler::new(self.config.clone())) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> std::result::Result, ConnectionDenied> { + Ok(Handler::new(self.config.clone())) } fn on_connection_handler_event( diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index c94274e7..96c99697 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -23,16 +23,19 @@ mod handler; pub mod rate_limiter; +use crate::behaviour::handler::Handler; use crate::message_proto; +use crate::multiaddr_ext::MultiaddrExt; use crate::protocol::{inbound_hop, outbound_stop}; use either::Either; use instant::Instant; use libp2p_core::multiaddr::Protocol; -use libp2p_core::PeerId; +use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, ConnectionId, ExternalAddresses, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, + dummy, ConnectionDenied, ConnectionHandlerUpgrErr, ConnectionId, ExternalAddresses, + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; @@ -250,17 +253,57 @@ impl Behaviour { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = handler::Prototype; + type ConnectionHandler = Either; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - handler::Prototype { - config: handler::Config { + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + if local_addr.is_relayed() { + // Deny all substreams on relayed connection. + return Ok(Either::Right(dummy::ConnectionHandler)); + } + + Ok(Either::Left(Handler::new( + handler::Config { reservation_duration: self.config.reservation_duration, max_circuit_duration: self.config.max_circuit_duration, max_circuit_bytes: self.config.max_circuit_bytes, }, + ConnectedPoint::Listener { + local_addr: local_addr.clone(), + send_back_addr: remote_addr.clone(), + }, + ))) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + if addr.is_relayed() { + // Deny all substreams on relayed connection. + return Ok(Either::Right(dummy::ConnectionHandler)); } + + Ok(Either::Left(Handler::new( + handler::Config { + reservation_duration: self.config.reservation_duration, + max_circuit_duration: self.config.max_circuit_duration, + max_circuit_bytes: self.config.max_circuit_bytes, + }, + ConnectedPoint::Dialer { + address: addr.clone(), + role_override, + }, + ))) } fn on_swarm_event(&mut self, event: FromSwarm) { diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index a1aaf0fa..4ed521d5 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -33,11 +33,11 @@ use instant::Instant; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + ListenUpgradeError, }; use libp2p_swarm::{ - dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, ConnectionId, - IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, ConnectionId, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use std::collections::VecDeque; use std::fmt; @@ -337,31 +337,6 @@ impl fmt::Debug for Event { } } -pub struct Prototype { - pub config: Config, -} - -impl IntoConnectionHandler for Prototype { - type Handler = Either; - - fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - if endpoint.is_relayed() { - // Deny all substreams on relayed connection. - Either::Right(dummy::ConnectionHandler) - } else { - Either::Left(Handler::new(self.config, endpoint.clone())) - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - Either::Left(SendWrapper(inbound_hop::Upgrade { - reservation_duration: self.config.reservation_duration, - max_circuit_duration: self.config.max_circuit_duration, - max_circuit_bytes: self.config.max_circuit_bytes, - })) - } -} - /// [`ConnectionHandler`] that manages substreams for a relay on a single /// connection with a peer. pub struct Handler { @@ -418,7 +393,7 @@ pub struct Handler { } impl Handler { - fn new(config: Config, endpoint: ConnectedPoint) -> Handler { + pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler { Handler { endpoint, config, diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index aa5a8204..87f3cee0 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -25,6 +25,7 @@ mod behaviour; mod copy_future; +mod multiaddr_ext; mod priv_client; mod protocol; pub mod v2; diff --git a/protocols/relay/src/multiaddr_ext.rs b/protocols/relay/src/multiaddr_ext.rs new file mode 100644 index 00000000..6991a8b9 --- /dev/null +++ b/protocols/relay/src/multiaddr_ext.rs @@ -0,0 +1,12 @@ +use libp2p_core::multiaddr::Protocol; +use libp2p_core::Multiaddr; + +pub(crate) trait MultiaddrExt { + fn is_relayed(&self) -> bool; +} + +impl MultiaddrExt for Multiaddr { + fn is_relayed(&self) -> bool { + self.iter().any(|p| p == Protocol::P2pCircuit) + } +} diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 7d3f3de8..f9869832 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -23,6 +23,8 @@ mod handler; pub(crate) mod transport; +use crate::multiaddr_ext::MultiaddrExt; +use crate::priv_client::handler::Handler; use crate::protocol::{self, inbound_stop, outbound_hop}; use bytes::Bytes; use either::Either; @@ -32,12 +34,13 @@ use futures::future::{BoxFuture, FutureExt}; use futures::io::{AsyncRead, AsyncWrite}; use futures::ready; use futures::stream::StreamExt; -use libp2p_core::PeerId; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, ConnectionId, DialFailure, NegotiatedSubstream, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, + dummy, ConnectionDenied, ConnectionHandler, ConnectionHandlerUpgrErr, ConnectionId, + DialFailure, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; @@ -156,11 +159,47 @@ impl Behaviour { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = handler::Prototype; + type ConnectionHandler = Either; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - handler::Prototype::new(self.local_peer_id, None) + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + if local_addr.is_relayed() { + return Ok(Either::Right(dummy::ConnectionHandler)); + } + + let mut handler = Handler::new(self.local_peer_id, peer, remote_addr.clone()); + + if let Some(event) = self.pending_handler_commands.remove(&connection_id) { + handler.on_behaviour_event(event) + } + + Ok(Either::Left(handler)) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + if addr.is_relayed() { + return Ok(Either::Right(dummy::ConnectionHandler)); + } + + let mut handler = Handler::new(self.local_peer_id, peer, addr.clone()); + + if let Some(event) = self.pending_handler_commands.remove(&connection_id) { + handler.on_behaviour_event(event) + } + + Ok(Either::Left(handler)) } fn on_swarm_event(&mut self, event: FromSwarm) { diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 9bf09e3d..cae7db78 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -29,14 +29,14 @@ use futures::stream::{FuturesUnordered, StreamExt}; use futures_timer::Delay; use instant::Instant; use libp2p_core::multiaddr::Protocol; -use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + ListenUpgradeError, }; use libp2p_swarm::{ - dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - IntoConnectionHandler, KeepAlive, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + SubstreamProtocol, }; use log::debug; use std::collections::{HashMap, VecDeque}; @@ -109,56 +109,6 @@ pub enum Event { }, } -pub struct Prototype { - local_peer_id: PeerId, - /// Initial [`In`] event from [`super::Behaviour`] provided at creation time. - initial_in: Option, -} - -impl Prototype { - pub(crate) fn new(local_peer_id: PeerId, initial_in: Option) -> Self { - Self { - local_peer_id, - initial_in, - } - } -} - -impl IntoConnectionHandler for Prototype { - type Handler = Either; - - fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - if endpoint.is_relayed() { - if let Some(event) = self.initial_in { - debug!( - "Established relayed instead of direct connection to {:?}, \ - dropping initial in event {:?}.", - remote_peer_id, event - ); - } - - // Deny all substreams on relayed connection. - Either::Right(dummy::ConnectionHandler) - } else { - let mut handler = Handler::new( - self.local_peer_id, - *remote_peer_id, - endpoint.get_remote_address().clone(), - ); - - if let Some(event) = self.initial_in { - handler.on_behaviour_event(event) - } - - Either::Left(handler) - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - Either::Left(SendWrapper(inbound_stop::Upgrade {})) - } -} - pub struct Handler { local_peer_id: PeerId, remote_peer_id: PeerId, @@ -205,7 +155,7 @@ pub struct Handler { } impl Handler { - fn new(local_peer_id: PeerId, remote_peer_id: PeerId, remote_addr: Multiaddr) -> Self { + pub fn new(local_peer_id: PeerId, remote_peer_id: PeerId, remote_addr: Multiaddr) -> Self { Self { local_peer_id, remote_peer_id, diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index 264eb3ab..6dfc4671 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -19,6 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::multiaddr_ext::MultiaddrExt; use crate::priv_client::Connection; use crate::RequestId; use futures::channel::mpsc; @@ -247,7 +248,7 @@ struct RelayedMultiaddr { /// Parse a [`Multiaddr`] containing a [`Protocol::P2pCircuit`]. fn parse_relayed_multiaddr(addr: Multiaddr) -> Result> { - if !addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) { + if !addr.is_relayed() { return Err(TransportError::MultiaddrNotSupported(addr)); } diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs index 274432bc..d12dd5ce 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/protocols/rendezvous/examples/discover.rs @@ -95,7 +95,7 @@ async fn main() { address.clone() }; - swarm.dial(address_with_p2p).unwrap() + swarm.dial(address_with_p2p).unwrap(); } } } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index ea722e7a..8cb91a71 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -30,11 +30,12 @@ use futures::stream::StreamExt; use instant::Duration; use libp2p_core::identity::error::SigningError; use libp2p_core::identity::Keypair; -use libp2p_core::{Multiaddr, PeerId, PeerRecord}; +use libp2p_core::{Endpoint, Multiaddr, PeerId, PeerRecord}; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ - CloseConnection, ConnectionId, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, + CloseConnection, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, + NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, THandlerInEvent, + THandlerOutEvent, }; use std::collections::{HashMap, VecDeque}; use std::iter::FromIterator; @@ -168,19 +169,51 @@ impl NetworkBehaviour for Behaviour { SubstreamConnectionHandler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - let initial_keep_alive = Duration::from_secs(30); - - SubstreamConnectionHandler::new_outbound_only(initial_keep_alive) + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(SubstreamConnectionHandler::new_outbound_only( + Duration::from_secs(30), + )) } - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { - self.discovered_peers + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let peer = match maybe_peer { + None => return Ok(vec![]), + Some(peer) => peer, + }; + + let addresses = self + .discovered_peers .iter() - .filter_map(|((candidate, _), addresses)| (candidate == peer).then_some(addresses)) + .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses)) .flatten() .cloned() - .collect() + .collect(); + + Ok(addresses) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(SubstreamConnectionHandler::new_outbound_only( + Duration::from_secs(30), + )) } fn on_connection_handler_event( diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index c38b8e76..2d38d201 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -27,11 +27,11 @@ use futures::future::BoxFuture; use futures::ready; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; -use libp2p_core::PeerId; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ - CloseConnection, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, THandlerInEvent, THandlerOutEvent, + CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::iter::FromIterator; @@ -111,10 +111,28 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = SubstreamConnectionHandler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - let initial_keep_alive = Duration::from_secs(30); + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(SubstreamConnectionHandler::new_inbound_only( + Duration::from_secs(30), + )) + } - SubstreamConnectionHandler::new_inbound_only(initial_keep_alive) + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(SubstreamConnectionHandler::new_inbound_only( + Duration::from_secs(30), + )) } fn on_connection_handler_event( diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 2986bf51..008a6aa3 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -70,12 +70,12 @@ pub use handler::ProtocolSupport; use futures::channel::oneshot; use handler::{Handler, RequestProtocol}; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr, PeerId}; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, dial_opts::DialOpts, - ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, - THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; use smallvec::SmallVec; use std::{ @@ -726,25 +726,59 @@ where type ConnectionHandler = Handler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Handler::new( + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( self.inbound_protocols.clone(), self.codec.clone(), self.config.connection_keep_alive, self.config.request_timeout, self.next_inbound_id.clone(), - ) + )) } - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let peer = match maybe_peer { + None => return Ok(vec![]), + Some(peer) => peer, + }; + let mut addresses = Vec::new(); - if let Some(connections) = self.connected.get(peer) { + if let Some(connections) = self.connected.get(&peer) { addresses.extend(connections.iter().filter_map(|c| c.address.clone())) } - if let Some(more) = self.addresses.get(peer) { + if let Some(more) = self.addresses.get(&peer) { addresses.extend(more.into_iter().cloned()); } - addresses + + Ok(addresses) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( + self.inbound_protocols.clone(), + self.codec.clone(), + self.config.connection_keep_alive, + self.config.request_timeout, + self.next_inbound_id.clone(), + )) } fn on_swarm_event(&mut self, event: FromSwarm) { diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 5c31aff1..2ebc4aec 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -55,9 +55,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let trait_to_impl = quote! { #prelude_path::NetworkBehaviour }; let either_ident = quote! { #prelude_path::Either }; let network_behaviour_action = quote! { #prelude_path::NetworkBehaviourAction }; - let into_connection_handler = quote! { #prelude_path::IntoConnectionHandler }; let connection_handler = quote! { #prelude_path::ConnectionHandler }; - let into_proto_select_ident = quote! { #prelude_path::IntoConnectionHandlerSelect }; + let proto_select_ident = quote! { #prelude_path::ConnectionHandlerSelect }; let peer_id = quote! { #prelude_path::PeerId }; let connection_id = quote! { #prelude_path::ConnectionId }; let poll_parameters = quote! { #prelude_path::PollParameters }; @@ -74,7 +73,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let expired_external_addr = quote! { #prelude_path::ExpiredExternalAddr }; let listener_error = quote! { #prelude_path::ListenerError }; let listener_closed = quote! { #prelude_path::ListenerClosed }; + let t_handler = quote! { #prelude_path::THandler }; let t_handler_in_event = quote! { #prelude_path::THandlerInEvent }; + let t_handler_out_event = quote! { #prelude_path::THandlerOutEvent }; + let endpoint = quote! { #prelude_path::Endpoint }; + let connection_denied = quote! { #prelude_path::ConnectionDenied }; // Build the generics. let impl_generics = { @@ -209,18 +212,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } }; - // Build the list of statements to put in the body of `addresses_of_peer()`. - let addresses_of_peer_stmts = { - data_struct - .fields - .iter() - .enumerate() - .map(move |(field_n, field)| match field.ident { - Some(ref i) => quote! { out.extend(self.#i.addresses_of_peer(peer_id)); }, - None => quote! { out.extend(self.#field_n.addresses_of_peer(peer_id)); }, - }) - }; - // Build the list of statements to put in the body of `on_swarm_event()` // for the `FromSwarm::ConnectionEstablished` variant. let on_connection_established_stmts = { @@ -561,9 +552,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let mut ph_ty = None; for field in data_struct.fields.iter() { let ty = &field.ty; - let field_info = quote! { <#ty as #trait_to_impl>::ConnectionHandler }; + let field_info = quote! { #t_handler<#ty> }; match ph_ty { - Some(ev) => ph_ty = Some(quote! { #into_proto_select_ident<#ev, #field_info> }), + Some(ev) => ph_ty = Some(quote! { #proto_select_ident<#ev, #field_info> }), ref mut ev @ None => *ev = Some(field_info), } } @@ -571,9 +562,25 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { ph_ty.unwrap_or(quote! {()}) // TODO: `!` instead }; - // The content of `new_handler()`. - // Example output: `self.field1.select(self.field2.select(self.field3))`. - let new_handler = { + // The content of `handle_pending_inbound_connection`. + let handle_pending_inbound_connection_stmts = + data_struct + .fields + .iter() + .enumerate() + .map(|(field_n, field)| { + match field.ident { + Some(ref i) => quote! { + #trait_to_impl::handle_pending_inbound_connection(&mut self.#i, connection_id, local_addr, remote_addr)?; + }, + None => quote! { + #trait_to_impl::handle_pending_inbound_connection(&mut self.#field_n, connection_id, local_addr, remote_addr)?; + } + } + }); + + // The content of `handle_established_inbound_connection`. + let handle_established_inbound_connection = { let mut out_handler = None; for (field_n, field) in data_struct.fields.iter().enumerate() { @@ -583,13 +590,61 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }; let builder = quote! { - #field_name.new_handler() + #field_name.handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr)? }; match out_handler { - Some(h) => { - out_handler = Some(quote! { #into_connection_handler::select(#h, #builder) }) - } + Some(h) => out_handler = Some(quote! { #connection_handler::select(#h, #builder) }), + ref mut h @ None => *h = Some(builder), + } + } + + out_handler.unwrap_or(quote! {()}) // TODO: See test `empty`. + }; + + // The content of `handle_pending_outbound_connection`. + let handle_pending_outbound_connection = { + let extend_stmts = + data_struct + .fields + .iter() + .enumerate() + .map(|(field_n, field)| { + match field.ident { + Some(ref i) => quote! { + combined_addresses.extend(#trait_to_impl::handle_pending_outbound_connection(&mut self.#i, connection_id, maybe_peer, addresses, effective_role)?); + }, + None => quote! { + combined_addresses.extend(#trait_to_impl::handle_pending_outbound_connection(&mut self.#field_n, connection_id, maybe_peer, addresses, effective_role)?); + } + } + }); + + quote! { + let mut combined_addresses = vec![]; + + #(#extend_stmts)* + + Ok(combined_addresses) + } + }; + + // The content of `handle_established_outbound_connection`. + let handle_established_outbound_connection = { + let mut out_handler = None; + + for (field_n, field) in data_struct.fields.iter().enumerate() { + let field_name = match field.ident { + Some(ref i) => quote! { self.#i }, + None => quote! { self.#field_n }, + }; + + let builder = quote! { + #field_name.handle_established_outbound_connection(connection_id, peer, addr, role_override)? + }; + + match out_handler { + Some(h) => out_handler = Some(quote! { #connection_handler::select(#h, #builder) }), ref mut h @ None => *h = Some(builder), } } @@ -678,22 +733,56 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { type ConnectionHandler = #connection_handler_ty; type OutEvent = #out_event_reference; - fn new_handler(&mut self) -> Self::ConnectionHandler { - use #into_connection_handler; - #new_handler + #[allow(clippy::needless_question_mark)] + fn handle_pending_inbound_connection( + &mut self, + connection_id: #connection_id, + local_addr: &#multiaddr, + remote_addr: &#multiaddr, + ) -> Result<(), #connection_denied> { + #(#handle_pending_inbound_connection_stmts)* + + Ok(()) } - fn addresses_of_peer(&mut self, peer_id: &#peer_id) -> Vec<#multiaddr> { - let mut out = Vec::new(); - #(#addresses_of_peer_stmts);* - out + #[allow(clippy::needless_question_mark)] + fn handle_established_inbound_connection( + &mut self, + connection_id: #connection_id, + peer: #peer_id, + local_addr: &#multiaddr, + remote_addr: &#multiaddr, + ) -> Result<#t_handler, #connection_denied> { + Ok(#handle_established_inbound_connection) + } + + #[allow(clippy::needless_question_mark)] + fn handle_pending_outbound_connection( + &mut self, + connection_id: #connection_id, + maybe_peer: Option<#peer_id>, + addresses: &[#multiaddr], + effective_role: #endpoint, + ) -> Result<::std::vec::Vec<#multiaddr>, #connection_denied> { + #handle_pending_outbound_connection + } + + #[allow(clippy::needless_question_mark)] + fn handle_established_outbound_connection( + &mut self, + connection_id: #connection_id, + peer: #peer_id, + addr: &#multiaddr, + role_override: #endpoint, + ) -> Result<#t_handler, #connection_denied> { + Ok(#handle_established_outbound_connection) } fn on_connection_handler_event( &mut self, peer_id: #peer_id, connection_id: #connection_id, - event: <::Handler as #connection_handler>::OutEvent + event: #t_handler_out_event ) { match event { #(#on_node_event_stmts),* diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 4760b8cd..32c3a8c0 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,26 @@ # 0.42.0 [unreleased] +- Allow `NetworkBehaviour`s to manage connections. + We deprecate `NetworkBehaviour::new_handler` and `NetworkBehaviour::addresses_of_peer` in favor of four new callbacks: + + - `NetworkBehaviour::handle_pending_inbound_connection` + - `NetworkBehaviour::handle_pending_outbound_connection` + - `NetworkBehaviour::handle_established_inbound_connection` + - `NetworkBehaviour::handle_established_outbound_connection` + + Please note that due to [limitations](https://github.com/rust-lang/rust/issues/98990) in the Rust compiler, _implementations_ of `new_handler` and `addresses_of_peer` are not flagged as deprecated. + Nevertheless, they will be removed in the future. + + All four are fallible and returning an error from any of them will abort the given connection. + This allows you to create dedicated `NetworkBehaviour`s that only concern themselves with managing connections. + For example: + - checking the `PeerId` of a newly established connection against an allow/block list + - only allowing X connection upgrades at any one time + - denying incoming or outgoing connections from a certain IP range + - only allowing N connections to or from the same peer + + See [PR 3254]. + - Remove `handler` field from `NetworkBehaviourAction::Dial`. Instead of constructing the handler early, you can now access the `ConnectionId` of the future connection on `DialOpts`. `ConnectionId`s are `Copy` and will be used throughout the entire lifetime of the connection to report events. @@ -79,6 +100,7 @@ [PR 3373]: https://github.com/libp2p/rust-libp2p/pull/3373 [PR 3374]: https://github.com/libp2p/rust-libp2p/pull/3374 [PR 3375]: https://github.com/libp2p/rust-libp2p/pull/3375 +[PR 3254]: https://github.com/libp2p/rust-libp2p/pull/3254 [PR 3497]: https://github.com/libp2p/rust-libp2p/pull/3497 # 0.41.1 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 9200ed73..cefb6470 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -28,11 +28,13 @@ pub use listen_addresses::ListenAddresses; use crate::connection::ConnectionId; use crate::dial_opts::DialOpts; +#[allow(deprecated)] use crate::handler::IntoConnectionHandler; use crate::{ - AddressRecord, AddressScore, DialError, ListenError, THandlerInEvent, THandlerOutEvent, + AddressRecord, AddressScore, ConnectionDenied, DialError, ListenError, THandler, + THandlerInEvent, THandlerOutEvent, }; -use libp2p_core::{transport::ListenerId, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{transport::ListenerId, ConnectedPoint, Endpoint, Multiaddr, PeerId}; use std::{task::Context, task::Poll}; /// A [`NetworkBehaviour`] defines the behaviour of the local node on the network. @@ -119,6 +121,7 @@ use std::{task::Context, task::Poll}; /// ``` pub trait NetworkBehaviour: 'static { /// Handler for all the protocols the network behaviour supports. + #[allow(deprecated)] type ConnectionHandler: IntoConnectionHandler; /// Event generated by the `NetworkBehaviour` and that the swarm will report back. @@ -141,7 +144,102 @@ pub trait NetworkBehaviour: 'static { /// /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and /// connection closing. - fn new_handler(&mut self) -> Self::ConnectionHandler; + #[deprecated( + since = "0.42.0", + note = "Use one or more of `NetworkBehaviour::{handle_pending_inbound_connection,handle_established_inbound_connection,handle_pending_outbound_connection,handle_established_outbound_connection}` instead." + )] + fn new_handler(&mut self) -> Self::ConnectionHandler { + panic!("You must implement `handle_established_inbound_connection` and `handle_established_outbound_connection`.") + } + + /// Callback that is invoked for every new inbound connection. + /// + /// At this point in the connection lifecycle, only the remote's and our local address are known. + /// We have also already allocated a [`ConnectionId`]. + /// + /// Any error returned from this function will immediately abort the dial attempt. + fn handle_pending_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + Ok(()) + } + + /// Callback that is invoked for every established inbound connection. + /// + /// This is invoked once another peer has successfully dialed us. + /// + /// At this point, we have verified their [`PeerId`] and we know, which particular [`Multiaddr`] succeeded in the dial. + /// In order to actually use this connection, this function must return a [`ConnectionHandler`](crate::ConnectionHandler). + /// Returning an error will immediately close the connection. + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + #[allow(deprecated)] + Ok(self.new_handler().into_handler( + &peer, + &ConnectedPoint::Listener { + local_addr: local_addr.clone(), + send_back_addr: remote_addr.clone(), + }, + )) + } + + /// Callback that is invoked for every outbound connection attempt. + /// + /// We have access to: + /// + /// - The [`PeerId`], if known. Remember that we can dial without a [`PeerId`]. + /// - All addresses passed to [`DialOpts`] are passed in here too. + /// - The effective [`Role`](Endpoint) of this peer in the dial attempt. Typically, this is set to [`Endpoint::Dialer`] except if we are attempting a hole-punch. + /// - The [`ConnectionId`] identifying the future connection resulting from this dial, if successful. + /// + /// Note that the addresses returned from this function are only used for dialing if [`WithPeerIdWithAddresses::extend_addresses_through_behaviour`](crate::dial_opts::WithPeerIdWithAddresses::extend_addresses_through_behaviour) is set. + /// + /// Any error returned from this function will immediately abort the dial attempt. + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + #[allow(deprecated)] + if let Some(peer_id) = maybe_peer { + Ok(self.addresses_of_peer(&peer_id)) + } else { + Ok(vec![]) + } + } + + /// Callback that is invoked for every established outbound connection. + /// + /// This is invoked once we have successfully dialed a peer. + /// At this point, we have verified their [`PeerId`] and we know, which particular [`Multiaddr`] succeeded in the dial. + /// In order to actually use this connection, this function must return a [`ConnectionHandler`](crate::ConnectionHandler). + /// Returning an error will immediately close the connection. + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + #[allow(deprecated)] + Ok(self.new_handler().into_handler( + &peer, + &ConnectedPoint::Dialer { + address: addr.clone(), + role_override, + }, + )) + } /// Addresses that this behaviour is aware of for this specific peer, and that may allow /// reaching the peer. @@ -149,6 +247,7 @@ pub trait NetworkBehaviour: 'static { /// The addresses will be tried in the order returned by this function, which means that they /// should be ordered by decreasing likelihood of reachability. In other words, the first /// address should be the most likely to be reachable. + #[deprecated(note = "Use `NetworkBehaviour::handle_pending_outbound_connection` instead.")] fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { vec![] } @@ -380,6 +479,7 @@ pub enum CloseConnection { /// Enumeration with the list of the possible events /// to pass to [`on_swarm_event`](NetworkBehaviour::on_swarm_event). +#[allow(deprecated)] pub enum FromSwarm<'a, Handler: IntoConnectionHandler> { /// Informs the behaviour about a newly established connection to a peer. ConnectionEstablished(ConnectionEstablished<'a>), @@ -434,6 +534,7 @@ pub struct ConnectionEstablished<'a> { /// This event is always paired with an earlier /// [`FromSwarm::ConnectionEstablished`] with the same peer ID, connection ID /// and endpoint. +#[allow(deprecated)] pub struct ConnectionClosed<'a, Handler: IntoConnectionHandler> { pub peer_id: PeerId, pub connection_id: ConnectionId, @@ -524,6 +625,7 @@ pub struct ExpiredExternalAddr<'a> { pub addr: &'a Multiaddr, } +#[allow(deprecated)] impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> { fn map_handler( self, diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index f77a3e02..ef62b51f 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -20,11 +20,9 @@ use crate::behaviour::{self, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use crate::connection::ConnectionId; -use crate::handler::either::IntoEitherHandler; -use crate::THandlerInEvent; -use crate::THandlerOutEvent; +use crate::{ConnectionDenied, THandler, THandlerInEvent, THandlerOutEvent}; use either::Either; -use libp2p_core::{Multiaddr, PeerId}; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use std::{task::Context, task::Poll}; /// Implementation of [`NetworkBehaviour`] that can be either of two implementations. @@ -33,21 +31,94 @@ where L: NetworkBehaviour, R: NetworkBehaviour, { - type ConnectionHandler = IntoEitherHandler; + type ConnectionHandler = Either, THandler>; type OutEvent = Either; - fn new_handler(&mut self) -> Self::ConnectionHandler { + fn handle_pending_inbound_connection( + &mut self, + id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { match self { - Either::Left(a) => IntoEitherHandler::Left(a.new_handler()), - Either::Right(b) => IntoEitherHandler::Right(b.new_handler()), + Either::Left(a) => a.handle_pending_inbound_connection(id, local_addr, remote_addr), + Either::Right(b) => b.handle_pending_inbound_connection(id, local_addr, remote_addr), } } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - match self { - Either::Left(a) => a.addresses_of_peer(peer_id), - Either::Right(b) => b.addresses_of_peer(peer_id), - } + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + let handler = match self { + Either::Left(inner) => Either::Left(inner.handle_established_inbound_connection( + _connection_id, + peer, + local_addr, + remote_addr, + )?), + Either::Right(inner) => Either::Right(inner.handle_established_inbound_connection( + _connection_id, + peer, + local_addr, + remote_addr, + )?), + }; + + Ok(handler) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let addresses = match self { + Either::Left(inner) => inner.handle_pending_outbound_connection( + _connection_id, + maybe_peer, + _addresses, + _effective_role, + )?, + Either::Right(inner) => inner.handle_pending_outbound_connection( + _connection_id, + maybe_peer, + _addresses, + _effective_role, + )?, + }; + + Ok(addresses) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + let handler = match self { + Either::Left(inner) => Either::Left(inner.handle_established_outbound_connection( + _connection_id, + peer, + addr, + role_override, + )?), + Either::Right(inner) => Either::Right(inner.handle_established_outbound_connection( + _connection_id, + peer, + addr, + role_override, + )?), + }; + + Ok(handler) } fn on_swarm_event(&mut self, event: behaviour::FromSwarm) { diff --git a/swarm/src/behaviour/external_addresses.rs b/swarm/src/behaviour/external_addresses.rs index 48de038e..2090d4b3 100644 --- a/swarm/src/behaviour/external_addresses.rs +++ b/swarm/src/behaviour/external_addresses.rs @@ -1,4 +1,5 @@ use crate::behaviour::{ExpiredExternalAddr, FromSwarm, NewExternalAddr}; +#[allow(deprecated)] use crate::IntoConnectionHandler; use libp2p_core::Multiaddr; use std::collections::HashSet; @@ -31,6 +32,7 @@ impl ExternalAddresses { } /// Feed a [`FromSwarm`] event to this struct. + #[allow(deprecated)] pub fn on_swarm_event(&mut self, event: &FromSwarm) where THandler: IntoConnectionHandler, diff --git a/swarm/src/behaviour/listen_addresses.rs b/swarm/src/behaviour/listen_addresses.rs index 0259ebd0..07bd003b 100644 --- a/swarm/src/behaviour/listen_addresses.rs +++ b/swarm/src/behaviour/listen_addresses.rs @@ -1,4 +1,5 @@ use crate::behaviour::{ExpiredListenAddr, FromSwarm, NewListenAddr}; +#[allow(deprecated)] use crate::IntoConnectionHandler; use libp2p_core::Multiaddr; use std::collections::HashSet; @@ -16,6 +17,7 @@ impl ListenAddresses { } /// Feed a [`FromSwarm`] event to this struct. + #[allow(deprecated)] pub fn on_swarm_event(&mut self, event: &FromSwarm) where THandler: IntoConnectionHandler, diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 1546a16b..0566eafb 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -23,15 +23,16 @@ use crate::connection::ConnectionId; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, + KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use crate::{ - NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, }; use either::Either; use futures::future; -use libp2p_core::{upgrade::DeniedUpgrade, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr, PeerId}; use std::{task::Context, task::Poll}; /// Implementation of `NetworkBehaviour` that can be either in the disabled or enabled state. @@ -68,20 +69,93 @@ impl NetworkBehaviour for Toggle where TBehaviour: NetworkBehaviour, { - type ConnectionHandler = ToggleIntoConnectionHandler; + type ConnectionHandler = ToggleConnectionHandler>; type OutEvent = TBehaviour::OutEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - ToggleIntoConnectionHandler { - inner: self.inner.as_mut().map(|i| i.new_handler()), - } + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + let inner = match self.inner.as_mut() { + None => return Ok(()), + Some(inner) => inner, + }; + + inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?; + + Ok(()) } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.inner - .as_mut() - .map(|b| b.addresses_of_peer(peer_id)) - .unwrap_or_else(Vec::new) + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + let inner = match self.inner.as_mut() { + None => return Ok(ToggleConnectionHandler { inner: None }), + Some(inner) => inner, + }; + + let handler = inner.handle_established_inbound_connection( + _connection_id, + peer, + local_addr, + remote_addr, + )?; + + Ok(ToggleConnectionHandler { + inner: Some(handler), + }) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let inner = match self.inner.as_mut() { + None => return Ok(vec![]), + Some(inner) => inner, + }; + + let addresses = inner.handle_pending_outbound_connection( + _connection_id, + maybe_peer, + _addresses, + _effective_role, + )?; + + Ok(addresses) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + let inner = match self.inner.as_mut() { + None => return Ok(ToggleConnectionHandler { inner: None }), + Some(inner) => inner, + }; + + let handler = inner.handle_established_outbound_connection( + _connection_id, + peer, + addr, + role_override, + )?; + + Ok(ToggleConnectionHandler { + inner: Some(handler), + }) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -116,38 +190,6 @@ where } } -/// Implementation of `IntoConnectionHandler` that can be in the disabled state. -pub struct ToggleIntoConnectionHandler { - inner: Option, -} - -impl IntoConnectionHandler for ToggleIntoConnectionHandler -where - TInner: IntoConnectionHandler, -{ - type Handler = ToggleConnectionHandler; - - fn into_handler( - self, - remote_peer_id: &PeerId, - connected_point: &ConnectedPoint, - ) -> Self::Handler { - ToggleConnectionHandler { - inner: self - .inner - .map(|h| h.into_handler(remote_peer_id, connected_point)), - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - if let Some(inner) = self.inner.as_ref() { - Either::Left(SendWrapper(inner.inbound_protocol())) - } else { - Either::Right(SendWrapper(DeniedUpgrade)) - } - } -} - /// Implementation of [`ConnectionHandler`] that can be in the disabled state. pub struct ToggleConnectionHandler { inner: Option, diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index ed132acc..8d80a8f7 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -18,15 +18,16 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. - use crate::connection::{Connection, ConnectionId, PendingPoint}; +#[allow(deprecated)] +use crate::IntoConnectionHandler; use crate::{ connection::{ Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, transport::TransportError, - ConnectedPoint, ConnectionHandler, Executor, IntoConnectionHandler, Multiaddr, PeerId, + ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId, }; use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; @@ -84,7 +85,7 @@ impl ExecSwitch { /// A connection `Pool` manages a set of connections for each peer. pub struct Pool where - THandler: IntoConnectionHandler, + THandler: ConnectionHandler, { local_id: PeerId, @@ -92,13 +93,8 @@ where counters: ConnectionCounters, /// The managed connections of each peer that are currently considered established. - established: FnvHashMap< - PeerId, - FnvHashMap< - ConnectionId, - EstablishedConnection<::InEvent>, - >, - >, + established: + FnvHashMap>>, /// The pending connections that are currently being negotiated. pending: HashMap, @@ -136,7 +132,10 @@ where /// Receivers for events reported from established connections. established_connection_events: - SelectAll>>, + SelectAll>>, + + /// Receivers for [`NewConnection`] objects that are dropped. + new_connection_dropped_listeners: FuturesUnordered>, } #[derive(Debug)] @@ -211,7 +210,7 @@ impl PendingConnection { } } -impl fmt::Debug for Pool { +impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") .field("counters", &self.counters) @@ -221,13 +220,13 @@ impl fmt::Debug for Pool { /// Event that can happen on the `Pool`. #[derive(Debug)] -pub enum PoolEvent { +pub enum PoolEvent { /// A new connection has been established. ConnectionEstablished { id: ConnectionId, peer_id: PeerId, endpoint: ConnectedPoint, - connection: StreamMuxerBox, + connection: NewConnection, /// [`Some`] when the new connection is an outgoing connection. /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. @@ -253,10 +252,10 @@ pub enum PoolEvent { connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option::Error>>, + error: Option>, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, - handler: THandler::Handler, + handler: THandler, }, /// An outbound connection attempt failed. @@ -286,7 +285,7 @@ pub enum PoolEvent { id: ConnectionId, peer_id: PeerId, /// The produced event. - event: <::Handler as ConnectionHandler>::OutEvent, + event: THandler::OutEvent, }, /// The connection to a node has changed its address. @@ -302,7 +301,7 @@ pub enum PoolEvent { impl Pool where - THandler: IntoConnectionHandler, + THandler: ConnectionHandler, { /// Creates a new empty `Pool`. pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { @@ -326,6 +325,7 @@ where pending_connection_events_rx, no_established_connections_waker: None, established_connection_events: Default::default(), + new_connection_dropped_listeners: Default::default(), } } @@ -338,11 +338,7 @@ where pub fn get_established( &mut self, id: ConnectionId, - ) -> Option< - &mut EstablishedConnection< - <::Handler as ConnectionHandler>::InEvent, - >, - > { + ) -> Option<&mut EstablishedConnection> { self.established .values_mut() .find_map(|connections| connections.get_mut(&id)) @@ -498,17 +494,21 @@ where accepted_at: Instant::now(), }, ); + Ok(()) } + #[allow(deprecated)] pub fn spawn_connection( &mut self, id: ConnectionId, obtained_peer_id: PeerId, endpoint: &ConnectedPoint, - muxer: StreamMuxerBox, + connection: NewConnection, handler: ::Handler, ) { + let connection = connection.extract(); + let conns = self.established.entry(obtained_peer_id).or_default(); self.counters.inc_established(endpoint); @@ -528,7 +528,7 @@ where } let connection = Connection::new( - muxer, + connection, handler, self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, @@ -543,18 +543,11 @@ where )) } - pub fn close_connection(&mut self, muxer: StreamMuxerBox) { - self.executor.spawn(async move { - let _ = muxer.close().await; - }); - } - /// Polls the connection pool for events. pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> where - THandler: IntoConnectionHandler + 'static, - THandler::Handler: ConnectionHandler + Send, - ::OutboundOpenInfo: Send, + THandler: ConnectionHandler + 'static, + ::OutboundOpenInfo: Send, { // Poll for events of established connections. // @@ -622,6 +615,17 @@ where // Poll for events of pending connections. loop { + if let Poll::Ready(Some(result)) = + self.new_connection_dropped_listeners.poll_next_unpin(cx) + { + if let Ok(dropped_connection) = result { + self.executor.spawn(async move { + let _ = dropped_connection.close().await; + }); + } + continue; + } + let event = match self.pending_connection_events_rx.poll_next_unpin(cx) { Poll::Ready(Some(event)) => event, Poll::Pending => break, @@ -753,11 +757,14 @@ where let established_in = accepted_at.elapsed(); + let (connection, drop_listener) = NewConnection::new(muxer); + self.new_connection_dropped_listeners.push(drop_listener); + return Poll::Ready(PoolEvent::ConnectionEstablished { peer_id: obtained_peer_id, endpoint, id, - connection: muxer, + connection, concurrent_dial_errors, established_in, }); @@ -812,6 +819,48 @@ where } } +/// Opaque type for a new connection. +/// +/// This connection has just been established but isn't part of the [`Pool`] yet. +/// It either needs to be spawned via [`Pool::spawn_connection`] or dropped if undesired. +/// +/// On drop, this type send the connection back to the [`Pool`] where it will be gracefully closed. +#[derive(Debug)] +pub struct NewConnection { + connection: Option, + drop_sender: Option>, +} + +impl NewConnection { + fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver) { + let (sender, receiver) = oneshot::channel(); + + ( + Self { + connection: Some(conn), + drop_sender: Some(sender), + }, + receiver, + ) + } + + fn extract(mut self) -> StreamMuxerBox { + self.connection.take().unwrap() + } +} + +impl Drop for NewConnection { + fn drop(&mut self) { + if let Some(connection) = self.connection.take() { + let _ = self + .drop_sender + .take() + .expect("`drop_sender` to always be `Some`") + .send(connection); + } + } +} + /// Network connection information. #[derive(Debug, Clone)] pub struct ConnectionCounters { diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 2a7c1a94..8d9fa74c 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -4,12 +4,12 @@ use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use crate::{ - ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol, - THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, }; use libp2p_core::upgrade::DeniedUpgrade; -use libp2p_core::PeerId; -use libp2p_core::UpgradeError; +use libp2p_core::{Endpoint, PeerId}; +use libp2p_core::{Multiaddr, UpgradeError}; use std::task::{Context, Poll}; use void::Void; @@ -20,8 +20,24 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = ConnectionHandler; type OutEvent = Void; - fn new_handler(&mut self) -> Self::ConnectionHandler { - ConnectionHandler + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) } fn on_connection_handler_event( diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index f60e94e2..cf8b9c1f 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -486,6 +486,9 @@ where } /// Prototype for a [`ConnectionHandler`]. +#[deprecated( + note = "Implement `ConnectionHandler` directly and use `NetworkBehaviour::{handle_pending_inbound_connection,handle_pending_outbound_connection}` to handle pending connections." +)] pub trait IntoConnectionHandler: Send + 'static { /// The protocols handler. type Handler: ConnectionHandler; @@ -512,6 +515,7 @@ pub trait IntoConnectionHandler: Send + 'static { } } +#[allow(deprecated)] impl IntoConnectionHandler for T where T: ConnectionHandler, diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index e2c72bb3..a10dbc03 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -18,16 +18,17 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +#[allow(deprecated)] +use crate::handler::IntoConnectionHandler; use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - FullyNegotiatedInbound, InboundUpgradeSend, IntoConnectionHandler, KeepAlive, - ListenUpgradeError, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, FullyNegotiatedInbound, + InboundUpgradeSend, KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; +use crate::ConnectionHandlerUpgrErr; use either::Either; use futures::future; -use libp2p_core::upgrade::UpgradeError; -use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_core::{ConnectedPoint, PeerId, UpgradeError}; use std::task::{Context, Poll}; /// Auxiliary type to allow implementing [`IntoConnectionHandler`]. As [`IntoConnectionHandler`] is @@ -39,6 +40,7 @@ pub enum IntoEitherHandler { /// Implementation of a [`IntoConnectionHandler`] that represents either of two [`IntoConnectionHandler`] /// implementations. +#[allow(deprecated)] impl IntoConnectionHandler for IntoEitherHandler where L: IntoConnectionHandler, @@ -96,7 +98,7 @@ where RIP: InboundUpgradeSend, LIP: InboundUpgradeSend, { - fn transpose( + pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedInbound> { match self { diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 35f0ebd7..47a095d9 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -21,10 +21,12 @@ //! A [`ConnectionHandler`] implementation that combines multiple other [`ConnectionHandler`]s //! indexed by some key. +#[allow(deprecated)] +use crate::handler::IntoConnectionHandler; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, + KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::NegotiatedSubstream; @@ -388,10 +390,12 @@ impl IntoIterator for MultiHandler { /// A [`IntoConnectionHandler`] for multiple other `IntoConnectionHandler`s. #[derive(Clone)] +#[deprecated(note = "Use `MultiHandler` directly.")] pub struct IntoMultiHandler { handlers: HashMap, } +#[allow(deprecated)] impl fmt::Debug for IntoMultiHandler where K: fmt::Debug + Eq + Hash, @@ -404,6 +408,7 @@ where } } +#[allow(deprecated)] impl IntoMultiHandler where K: Hash + Eq, @@ -424,6 +429,7 @@ where } } +#[allow(deprecated)] impl IntoConnectionHandler for IntoMultiHandler where K: Debug + Clone + Eq + Hash + Send + 'static, diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 8c38ffe7..a1dd844c 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -18,14 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +#[allow(deprecated)] +use crate::handler::IntoConnectionHandler; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - InboundUpgradeSend, IntoConnectionHandler, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, - SubstreamProtocol, + InboundUpgradeSend, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol, }; use crate::upgrade::SendWrapper; - use either::Either; use futures::future; use libp2p_core::{ @@ -54,6 +54,7 @@ impl IntoConnectionHandlerSelect { } } +#[allow(deprecated)] impl IntoConnectionHandler for IntoConnectionHandlerSelect where TProto1: IntoConnectionHandler, diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index f6452249..cbc778cf 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -4,10 +4,9 @@ use crate::handler::{ ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; -use crate::THandlerInEvent; -use crate::THandlerOutEvent; +use crate::{ConnectionDenied, THandler, THandlerInEvent, THandlerOutEvent}; use libp2p_core::upgrade::DeniedUpgrade; -use libp2p_core::PeerId; +use libp2p_core::{Endpoint, Multiaddr, PeerId}; use std::task::{Context, Poll}; use void::Void; @@ -24,8 +23,24 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = ConnectionHandler; type OutEvent = Void; - fn new_handler(&mut self) -> Self::ConnectionHandler { - ConnectionHandler + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) } fn on_connection_handler_event( diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 90baa287..271246b7 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -85,18 +85,21 @@ pub mod derive_prelude { pub use crate::behaviour::NewListenAddr; pub use crate::behaviour::NewListener; pub use crate::connection::ConnectionId; + pub use crate::ConnectionDenied; pub use crate::ConnectionHandler; + pub use crate::ConnectionHandlerSelect; pub use crate::DialError; - pub use crate::IntoConnectionHandler; - pub use crate::IntoConnectionHandlerSelect; pub use crate::NetworkBehaviour; pub use crate::NetworkBehaviourAction; pub use crate::PollParameters; + pub use crate::THandler; pub use crate::THandlerInEvent; + pub use crate::THandlerOutEvent; pub use either::Either; pub use futures::prelude as futures; pub use libp2p_core::transport::ListenerId; pub use libp2p_core::ConnectedPoint; + pub use libp2p_core::Endpoint; pub use libp2p_core::Multiaddr; pub use libp2p_core::PeerId; } @@ -110,15 +113,18 @@ pub use behaviour::{ pub use connection::pool::{ConnectionCounters, ConnectionLimits}; pub use connection::{ConnectionError, ConnectionId, ConnectionLimit}; pub use executor::Executor; +#[allow(deprecated)] +pub use handler::IntoConnectionHandler; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, - IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, - OneShotHandlerConfig, SubstreamProtocol, + IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, OneShotHandlerConfig, + SubstreamProtocol, }; #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour; pub use registry::{AddAddressResult, AddressRecord, AddressScore}; +use crate::handler::UpgradeInfoSend; use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; use connection::IncomingInfo; use connection::{ @@ -133,8 +139,7 @@ use libp2p_core::{ multihash::Multihash, muxing::StreamMuxerBox, transport::{self, ListenerId, TransportError, TransportEvent}, - upgrade::ProtocolName, - Endpoint, Multiaddr, Negotiated, PeerId, Transport, + Endpoint, Multiaddr, Negotiated, PeerId, ProtocolName, Transport, }; use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; @@ -146,7 +151,6 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use upgrade::UpgradeInfoSend as _; /// Substream for which a protocol has been chosen. /// @@ -159,20 +163,19 @@ type TBehaviourOutEvent = ::OutEvent /// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`] /// supports. -type THandler = ::ConnectionHandler; +#[allow(deprecated)] +pub type THandler = + <::ConnectionHandler as IntoConnectionHandler>::Handler; /// Custom event that can be received by the [`ConnectionHandler`] of the /// [`NetworkBehaviour`]. -pub type THandlerInEvent = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent; +pub type THandlerInEvent = as ConnectionHandler>::InEvent; /// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -pub type THandlerOutEvent = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent; +pub type THandlerOutEvent = as ConnectionHandler>::OutEvent; /// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -type THandlerErr = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::Error; +pub type THandlerErr = as ConnectionHandler>::Error; /// Event generated by the `Swarm`. #[derive(Debug)] @@ -542,21 +545,46 @@ where } let addresses = { - let mut addresses = dial_opts.get_addresses(); + let mut addresses_from_opts = dial_opts.get_addresses(); - if let Some(peer_id) = peer_id { - if dial_opts.extend_addresses_through_behaviour() { - addresses.extend(self.behaviour.addresses_of_peer(&peer_id)); + match self.behaviour.handle_pending_outbound_connection( + connection_id, + peer_id, + addresses_from_opts.as_slice(), + dial_opts.role_override(), + ) { + Ok(addresses) => { + if dial_opts.extend_addresses_through_behaviour() { + addresses_from_opts.extend(addresses) + } else { + let num_addresses = addresses.len(); + + if num_addresses > 0 { + log::debug!("discarding {num_addresses} addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection {connection_id:?}") + } + } + } + Err(cause) => { + let error = DialError::Denied { cause }; + + self.behaviour + .on_swarm_event(FromSwarm::DialFailure(DialFailure { + peer_id, + error: &error, + connection_id, + })); + + return Err(error); } } let mut unique_addresses = HashSet::new(); - addresses.retain(|addr| { + addresses_from_opts.retain(|addr| { !self.listened_addrs.values().flatten().any(|a| a == addr) && unique_addresses.insert(addr.clone()) }); - if addresses.is_empty() { + if addresses_from_opts.is_empty() { let error = DialError::NoAddresses; self.behaviour .on_swarm_event(FromSwarm::DialFailure(DialFailure { @@ -567,7 +595,7 @@ where return Err(error); }; - addresses + addresses_from_opts }; let dials = addresses @@ -756,15 +784,76 @@ where established_in, } => { if self.banned_peers.contains(&peer_id) { - self.pool.close_connection(connection); return Some(SwarmEvent::BannedPeer { peer_id, endpoint }); } - let handler = self - .behaviour - .new_handler() - .into_handler(&peer_id, &endpoint); + let handler = match endpoint.clone() { + ConnectedPoint::Dialer { + address, + role_override, + } => { + match self.behaviour.handle_established_outbound_connection( + id, + peer_id, + &address, + role_override, + ) { + Ok(handler) => handler, + Err(cause) => { + let dial_error = DialError::Denied { cause }; + self.behaviour.on_swarm_event(FromSwarm::DialFailure( + DialFailure { + connection_id: id, + error: &dial_error, + peer_id: Some(peer_id), + }, + )); + return Some(SwarmEvent::OutgoingConnectionError { + peer_id: Some(peer_id), + error: dial_error, + }); + } + } + } + ConnectedPoint::Listener { + local_addr, + send_back_addr, + } => { + match self.behaviour.handle_established_inbound_connection( + id, + peer_id, + &local_addr, + &send_back_addr, + ) { + Ok(handler) => handler, + Err(cause) => { + let listen_error = ListenError::Denied { cause }; + self.behaviour.on_swarm_event(FromSwarm::ListenFailure( + ListenFailure { + local_addr: &local_addr, + send_back_addr: &send_back_addr, + error: &listen_error, + connection_id: id, + }, + )); + + return Some(SwarmEvent::IncomingConnectionError { + send_back_addr, + local_addr, + error: listen_error, + }); + } + } + } + }; + + let supported_protocols = handler + .listen_protocol() + .upgrade() + .protocol_info() + .map(|p| p.protocol_name().to_owned()) + .collect(); let other_established_connection_ids = self .pool .iter_established_connections_of_peer(&peer_id) @@ -802,6 +891,7 @@ where other_established: other_established_connection_ids.len(), }, )); + self.supported_protocols = supported_protocols; return Some(SwarmEvent::ConnectionEstablished { peer_id, num_established, @@ -938,6 +1028,31 @@ where } => { let connection_id = ConnectionId::next(); + match self.behaviour.handle_pending_inbound_connection( + connection_id, + &local_addr, + &send_back_addr, + ) { + Ok(()) => {} + Err(cause) => { + let listen_error = ListenError::Denied { cause }; + + self.behaviour + .on_swarm_event(FromSwarm::ListenFailure(ListenFailure { + local_addr: &local_addr, + send_back_addr: &send_back_addr, + error: &listen_error, + connection_id, + })); + + return Some(SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error: listen_error, + }); + } + } + match self.pool.add_incoming( upgrade, IncomingInfo { @@ -1277,8 +1392,7 @@ fn notify_any( ) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)> where TBehaviour: NetworkBehaviour, - THandler: IntoConnectionHandler, - THandler::Handler: ConnectionHandler< + THandler: ConnectionHandler< InEvent = THandlerInEvent, OutEvent = THandlerOutEvent, >, @@ -1532,21 +1646,13 @@ where } /// Builds a `Swarm` with the current configuration. - pub fn build(mut self) -> Swarm { - let supported_protocols = self - .behaviour - .new_handler() - .inbound_protocol() - .protocol_info() - .map(|info| info.protocol_name().to_vec()) - .collect(); - + pub fn build(self) -> Swarm { Swarm { local_peer_id: self.local_peer_id, transport: self.transport, pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits), behaviour: self.behaviour, - supported_protocols, + supported_protocols: Default::default(), listened_addrs: HashMap::new(), external_addrs: Addresses::default(), banned_peers: HashSet::new(), @@ -1564,7 +1670,9 @@ pub enum DialError { /// has been reached. ConnectionLimit(ConnectionLimit), /// The peer identity obtained on the connection matches the local peer. - LocalPeerId { endpoint: ConnectedPoint }, + LocalPeerId { + endpoint: ConnectedPoint, + }, /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses /// for the peer to dial. NoAddresses, @@ -1580,6 +1688,9 @@ pub enum DialError { obtained: PeerId, endpoint: ConnectedPoint, }, + Denied { + cause: ConnectionDenied, + }, /// An error occurred while negotiating the transport protocol(s) on a connection. Transport(Vec<(Multiaddr, TransportError)>), } @@ -1634,6 +1745,9 @@ impl fmt::Display for DialError { Ok(()) } + DialError::Denied { .. } => { + write!(f, "Dial error") + } } } } @@ -1660,6 +1774,7 @@ impl error::Error for DialError { DialError::InvalidPeerId { .. } => None, DialError::WrongPeerId { .. } => None, DialError::Transport(_) => None, + DialError::Denied { cause } => Some(cause), } } } @@ -1677,8 +1792,13 @@ pub enum ListenError { obtained: PeerId, endpoint: ConnectedPoint, }, - /// The peer identity obtained on the connection did not match the one that was expected. - LocalPeerId { endpoint: ConnectedPoint }, + /// The connection was dropped because it resolved to our own [`PeerId`]. + LocalPeerId { + endpoint: ConnectedPoint, + }, + Denied { + cause: ConnectionDenied, + }, /// An error occurred while negotiating the transport protocol(s) on a connection. Transport(TransportError), } @@ -1716,11 +1836,11 @@ impl fmt::Display for ListenError { ListenError::Transport(_) => { write!(f, "Listen error: Failed to negotiate transport protocol(s)") } + ListenError::Denied { .. } => { + write!(f, "Listen error") + } ListenError::LocalPeerId { endpoint } => { - write!( - f, - "Listen error: Pending connection: Local peer ID at {endpoint:?}." - ) + write!(f, "Listen error: Local peer ID at {endpoint:?}.") } } } @@ -1733,11 +1853,37 @@ impl error::Error for ListenError { ListenError::WrongPeerId { .. } => None, ListenError::Transport(err) => Some(err), ListenError::Aborted => None, + ListenError::Denied { cause } => Some(cause), ListenError::LocalPeerId { .. } => None, } } } +#[derive(Debug)] +pub struct ConnectionDenied { + inner: Box, +} + +impl ConnectionDenied { + pub fn new(cause: impl error::Error + Send + Sync + 'static) -> Self { + Self { + inner: Box::new(cause), + } + } +} + +impl fmt::Display for ConnectionDenied { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "connection denied") + } +} + +impl error::Error for ConnectionDenied { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + Some(self.inner.as_ref()) + } +} + /// Information about the connections obtained by [`Swarm::network_info()`]. #[derive(Clone, Debug)] pub struct NetworkInfo { @@ -1845,7 +1991,7 @@ mod tests { ) -> bool where TBehaviour: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: Clone, + THandlerOutEvent: Clone, { swarm1 .behaviour() @@ -1865,7 +2011,7 @@ mod tests { ) -> bool where TBehaviour: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: Clone + THandlerOutEvent: Clone, { swarm1 .behaviour() @@ -1963,10 +2109,10 @@ mod tests { // connection. Check that it was not reported to the behaviour of the // banning swarm. assert_eq!( - swarm2.behaviour.on_connection_established.len(), - s2_expected_conns, - "No additional closed connections should be reported for the banned peer" - ); + swarm2.behaviour.on_connection_established.len(), + s2_expected_conns, + "No additional closed connections should be reported for the banned peer" + ); // Setup to test that the banned connection is not reported upon closing // even if the peer is unbanned. diff --git a/swarm/src/test.rs b/swarm/src/test.rs index f8f3daaa..aac5b600 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -23,10 +23,10 @@ use crate::behaviour::{ FromSwarm, ListenerClosed, ListenerError, NewExternalAddr, NewListenAddr, NewListener, }; use crate::{ - ConnectionHandler, ConnectionId, IntoConnectionHandler, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, }; -use libp2p_core::{multiaddr::Multiaddr, transport::ListenerId, ConnectedPoint, PeerId}; +use libp2p_core::{multiaddr::Multiaddr, transport::ListenerId, ConnectedPoint, Endpoint, PeerId}; use std::collections::HashMap; use std::task::{Context, Poll}; @@ -35,7 +35,9 @@ use std::task::{Context, Poll}; /// any further state. pub struct MockBehaviour where - THandler: ConnectionHandler, + THandler: ConnectionHandler + Clone, + THandler::OutEvent: Clone, + TOutEvent: Send + 'static, { /// The prototype protocols handler that is cloned for every /// invocation of `new_handler`. @@ -50,7 +52,9 @@ where impl MockBehaviour where - THandler: ConnectionHandler, + THandler: ConnectionHandler + Clone, + THandler::OutEvent: Clone, + TOutEvent: Send + 'static, { pub fn new(handler_proto: THandler) -> Self { MockBehaviour { @@ -70,12 +74,39 @@ where type ConnectionHandler = THandler; type OutEvent = TOutEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.handler_proto.clone() + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result { + Ok(self.handler_proto.clone()) } - fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { - self.addresses.get(p).map_or(Vec::new(), |v| v.clone()) + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result { + Ok(self.handler_proto.clone()) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let p = match maybe_peer { + None => return Ok(vec![]), + Some(peer) => peer, + }; + + Ok(self.addresses.get(&p).map_or(Vec::new(), |v| v.clone())) } fn poll( @@ -121,14 +152,14 @@ where { inner: TInner, - pub addresses_of_peer: Vec, + pub handle_pending_inbound_connection: Vec<(ConnectionId, Multiaddr, Multiaddr)>, + pub handle_pending_outbound_connection: + Vec<(Option, Vec, Endpoint, ConnectionId)>, + pub handle_established_inbound_connection: Vec<(PeerId, ConnectionId, Multiaddr, Multiaddr)>, + pub handle_established_outbound_connection: Vec<(PeerId, Multiaddr, Endpoint, ConnectionId)>, pub on_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub on_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, - pub on_connection_handler_event: Vec<( - PeerId, - ConnectionId, - <::Handler as ConnectionHandler>::OutEvent, - )>, + pub on_connection_handler_event: Vec<(PeerId, ConnectionId, THandlerOutEvent)>, pub on_dial_failure: Vec>, pub on_new_listener: Vec, pub on_new_listen_addr: Vec<(ListenerId, Multiaddr)>, @@ -143,13 +174,15 @@ where impl CallTraceBehaviour where TInner: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: - Clone, + THandlerOutEvent: Clone, { pub fn new(inner: TInner) -> Self { Self { inner, - addresses_of_peer: Vec::new(), + handle_pending_inbound_connection: Vec::new(), + handle_pending_outbound_connection: Vec::new(), + handle_established_inbound_connection: Vec::new(), + handle_established_outbound_connection: Vec::new(), on_connection_established: Vec::new(), on_connection_closed: Vec::new(), on_connection_handler_event: Vec::new(), @@ -167,7 +200,10 @@ where #[allow(dead_code)] pub fn reset(&mut self) { - self.addresses_of_peer = Vec::new(); + self.handle_pending_inbound_connection = Vec::new(); + self.handle_pending_outbound_connection = Vec::new(); + self.handle_established_inbound_connection = Vec::new(); + self.handle_established_outbound_connection = Vec::new(); self.on_connection_established = Vec::new(); self.on_connection_closed = Vec::new(); self.on_connection_handler_event = Vec::new(); @@ -232,8 +268,8 @@ where assert_eq!( self.on_connection_established .iter() - .filter(|(.., reported_additional_connections)| { - *reported_additional_connections == 0 + .filter(|(.., reported_aditional_connections)| { + *reported_aditional_connections == 0 }) .count(), expected_connections @@ -362,19 +398,83 @@ where impl NetworkBehaviour for CallTraceBehaviour where TInner: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: - Clone, + THandlerOutEvent: Clone, { type ConnectionHandler = TInner::ConnectionHandler; type OutEvent = TInner::OutEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.inner.new_handler() + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.handle_pending_inbound_connection.push(( + connection_id, + local_addr.clone(), + remote_addr.clone(), + )); + self.inner + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) } - fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { - self.addresses_of_peer.push(*p); - self.inner.addresses_of_peer(p) + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.handle_established_inbound_connection.push(( + peer, + _connection_id, + local_addr.clone(), + remote_addr.clone(), + )); + self.inner.handle_established_inbound_connection( + _connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + self.handle_pending_outbound_connection.push(( + maybe_peer, + _addresses.to_vec(), + _effective_role, + _connection_id, + )); + self.inner.handle_pending_outbound_connection( + _connection_id, + maybe_peer, + _addresses, + _effective_role, + ) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.handle_established_outbound_connection.push(( + peer, + addr.clone(), + role_override, + _connection_id, + )); + self.inner + .handle_established_outbound_connection(_connection_id, peer, addr, role_override) } fn on_swarm_event(&mut self, event: FromSwarm) { diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index 87e9ce76..a9b868e4 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -19,10 +19,12 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; +use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identify as identify; use libp2p_ping as ping; use libp2p_swarm::{ - behaviour::FromSwarm, dummy, NetworkBehaviour, SwarmEvent, THandlerInEvent, THandlerOutEvent, + behaviour::FromSwarm, dummy, ConnectionDenied, NetworkBehaviour, SwarmEvent, THandler, + THandlerInEvent, THandlerOutEvent, }; use std::fmt::Debug; @@ -421,8 +423,24 @@ fn custom_out_event_no_type_parameters() { type ConnectionHandler = dummy::ConnectionHandler; type OutEvent = void::Void; - fn new_handler(&mut self) -> Self::ConnectionHandler { - dummy::ConnectionHandler + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(dummy::ConnectionHandler) } fn on_connection_handler_event(