diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 1ef42bfd..4db3342a 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -26,8 +26,6 @@ pub use error::{ ConnectionError, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; -pub use pool::{ConnectionCounters, ConnectionLimits}; -pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index c43d5efb..cc6a9bbd 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -69,12 +69,12 @@ where PeerId, FnvHashMap< ConnectionId, - EstablishedConnectionInfo<::InEvent>, + EstablishedConnection<::InEvent>, >, >, /// The pending connections that are currently being negotiated. - pending: HashMap>, + pending: HashMap>, /// Next available identifier for a new connection / task. next_connection_id: ConnectionId, @@ -120,15 +120,41 @@ where } #[derive(Debug)] -struct EstablishedConnectionInfo { - /// [`PeerId`] of the remote peer. - peer_id: PeerId, +pub struct EstablishedConnection { endpoint: ConnectedPoint, /// Channel endpoint to send commands to the task. sender: mpsc::Sender>, } -impl EstablishedConnectionInfo { +impl EstablishedConnection { + /// (Asynchronously) sends an event to the connection handler. + /// + /// If the handler is not ready to receive the event, either because + /// it is busy or the connection is about to close, the given event + /// is returned with an `Err`. + /// + /// If execution of this method is preceded by successful execution of + /// `poll_ready_notify_handler` without another intervening execution + /// of `notify_handler`, it only fails if the connection is now about + /// to close. + pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> { + let cmd = task::Command::NotifyHandler(event); + self.sender.try_send(cmd).map_err(|e| match e.into_inner() { + task::Command::NotifyHandler(event) => event, + _ => unreachable!("Expect failed send to return initial event."), + }) + } + + /// Checks if `notify_handler` is ready to accept an event. + /// + /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`. + /// + /// Returns `Err(())` if the background task associated with the connection + /// is terminating and the connection is about to close. + pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll> { + self.sender.poll_ready(cx).map_err(|_| ()) + } + /// Initiates a graceful close of the connection. /// /// Has no effect if the connection is already closing. @@ -142,7 +168,7 @@ impl EstablishedConnectionInfo { } } -struct PendingConnectionInfo { +struct PendingConnection { /// [`PeerId`] of the remote peer. peer_id: Option, /// Handler to handle connection once no longer pending but established. @@ -152,6 +178,19 @@ struct PendingConnectionInfo { abort_notifier: Option>, } +impl PendingConnection { + fn is_for_same_remote_as(&self, other: PeerId) -> bool { + self.peer_id.map_or(false, |peer| peer == other) + } + + /// Aborts the connection attempt, closing the connection. + fn abort(&mut self) { + if let Some(notifier) = self.abort_notifier.take() { + drop(notifier); + } + } +} + impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") @@ -284,33 +323,14 @@ where &self.counters } - /// Gets an entry representing a connection in the pool. - /// - /// Returns `None` if the pool has no connection with the given ID. - pub fn get(&mut self, id: ConnectionId) -> Option> { - if let hash_map::Entry::Occupied(entry) = self.pending.entry(id) { - Some(PoolConnection::Pending(PendingConnection { entry })) - } else { - self.established - .iter_mut() - .find_map(|(_, cs)| match cs.entry(id) { - hash_map::Entry::Occupied(entry) => { - Some(PoolConnection::Established(EstablishedConnection { entry })) - } - hash_map::Entry::Vacant(_) => None, - }) - } - } - /// Gets an established connection from the pool by ID. pub fn get_established( &mut self, id: ConnectionId, - ) -> Option>> { - match self.get(id) { - Some(PoolConnection::Established(c)) => Some(c), - _ => None, - } + ) -> Option<&mut EstablishedConnection>> { + self.established + .values_mut() + .find_map(|connections| connections.get_mut(&id)) } /// Returns true if we are connected to the given peer. @@ -338,21 +358,12 @@ where } } - #[allow(clippy::needless_collect)] - let pending_connections = self + for connection in self .pending - .iter() - .filter(|(_, PendingConnectionInfo { peer_id, .. })| peer_id.as_ref() == Some(&peer)) - .map(|(id, _)| *id) - .collect::>(); - - for pending_connection in pending_connections { - let entry = self - .pending - .entry(pending_connection) - .expect_occupied("Iterating pending connections"); - - PendingConnection { entry }.abort(); + .iter_mut() + .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info)) + { + connection.abort() } } @@ -367,19 +378,11 @@ where } } - /// Returns an iterator over all pending connection IDs together - /// with associated endpoints and expected peer IDs in the pool. - pub fn iter_pending_info( - &self, - ) -> impl Iterator)> + '_ { - self.pending.iter().map( - |( - id, - PendingConnectionInfo { - peer_id, endpoint, .. - }, - )| (id, endpoint, peer_id), - ) + /// Checks whether we are currently dialing the given peer. + pub fn is_dialing(&self, peer: PeerId) -> bool { + self.pending.iter().any(|(_, info)| { + matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer) + }) } /// Returns an iterator over all connected peers, i.e. those that have @@ -467,7 +470,7 @@ where self.counters.inc_pending(&endpoint); self.pending.insert( connection_id, - PendingConnectionInfo { + PendingConnection { peer_id: peer, handler, endpoint, @@ -514,7 +517,7 @@ where self.counters.inc_pending_incoming(); self.pending.insert( connection_id, - PendingConnectionInfo { + PendingConnection { peer_id: None, handler, endpoint: endpoint.into(), @@ -576,7 +579,7 @@ where .established .get_mut(&peer_id) .expect("`Closed` event for established connection"); - let EstablishedConnectionInfo { endpoint, .. } = + let EstablishedConnection { endpoint, .. } = connections.remove(&id).expect("Connection to be present"); self.counters.dec_established(&endpoint); let remaining_established_connection_ids: Vec = @@ -608,7 +611,7 @@ where output: (obtained_peer_id, mut muxer), outgoing, } => { - let PendingConnectionInfo { + let PendingConnection { peer_id: expected_peer_id, handler, endpoint, @@ -740,8 +743,7 @@ where mpsc::channel(self.task_command_buffer_size); conns.insert( id, - EstablishedConnectionInfo { - peer_id: obtained_peer_id, + EstablishedConnection { endpoint: endpoint.clone(), sender: command_sender, }, @@ -764,21 +766,16 @@ where .boxed(), ); - match self.get(id) { - Some(PoolConnection::Established(connection)) => { - return Poll::Ready(PoolEvent::ConnectionEstablished { - peer_id: connection.peer_id(), - endpoint: connection.endpoint().clone(), - id: connection.id(), - other_established_connection_ids, - concurrent_dial_errors, - }) - } - _ => unreachable!("since `entry` is an `EstablishedEntry`."), - } + return Poll::Ready(PoolEvent::ConnectionEstablished { + peer_id: obtained_peer_id, + endpoint, + id, + other_established_connection_ids, + concurrent_dial_errors, + }); } task::PendingConnectionEvent::PendingFailed { id, error } => { - if let Some(PendingConnectionInfo { + if let Some(PendingConnection { peer_id, handler, endpoint, @@ -830,98 +827,6 @@ where } } -/// A connection in a [`Pool`]. -pub enum PoolConnection<'a, THandler: IntoConnectionHandler> { - Pending(PendingConnection<'a, THandler>), - Established(EstablishedConnection<'a, THandlerInEvent>), -} - -/// A pending connection in a pool. -pub struct PendingConnection<'a, THandler: IntoConnectionHandler> { - entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo>, -} - -impl PendingConnection<'_, THandler> { - /// Aborts the connection attempt, closing the connection. - pub fn abort(mut self) { - if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { - drop(notifier); - } - } -} - -/// An established connection in a pool. -pub struct EstablishedConnection<'a, TInEvent> { - entry: hash_map::OccupiedEntry<'a, ConnectionId, EstablishedConnectionInfo>, -} - -impl fmt::Debug for EstablishedConnection<'_, TInEvent> -where - TInEvent: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("EstablishedConnection") - .field("entry", &self.entry) - .finish() - } -} - -impl EstablishedConnection<'_, TInEvent> { - /// Returns information about the connected endpoint. - pub fn endpoint(&self) -> &ConnectedPoint { - &self.entry.get().endpoint - } - - /// Returns the identity of the connected peer. - pub fn peer_id(&self) -> PeerId { - self.entry.get().peer_id - } - - /// Returns the local connection ID. - pub fn id(&self) -> ConnectionId { - *self.entry.key() - } - - /// (Asynchronously) sends an event to the connection handler. - /// - /// If the handler is not ready to receive the event, either because - /// it is busy or the connection is about to close, the given event - /// is returned with an `Err`. - /// - /// If execution of this method is preceded by successful execution of - /// `poll_ready_notify_handler` without another intervening execution - /// of `notify_handler`, it only fails if the connection is now about - /// to close. - pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> { - let cmd = task::Command::NotifyHandler(event); - self.entry - .get_mut() - .sender - .try_send(cmd) - .map_err(|e| match e.into_inner() { - task::Command::NotifyHandler(event) => event, - _ => unreachable!("Expect failed send to return initial event."), - }) - } - - /// Checks if `notify_handler` is ready to accept an event. - /// - /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`. - /// - /// Returns `Err(())` if the background task associated with the connection - /// is terminating and the connection is about to close. - pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll> { - self.entry.get_mut().sender.poll_ready(cx).map_err(|_| ()) - } - - /// Initiates a graceful close of the connection. - /// - /// Has no effect if the connection is already closing. - pub fn start_close(mut self) { - self.entry.get_mut().start_close() - } -} - /// Network connection information. #[derive(Debug, Clone)] pub struct ConnectionCounters { @@ -1076,7 +981,7 @@ impl ConnectionCounters { /// Counts the number of established connections to the given peer. fn num_peer_established( - established: &FnvHashMap>>, + established: &FnvHashMap>>, peer: PeerId, ) -> u32 { established.get(&peer).map_or(0, |conns| { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 36f314c8..6fbff5db 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -70,9 +70,10 @@ pub mod keep_alive; pub use behaviour::{ CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; +pub use connection::pool::{ConnectionCounters, ConnectionLimits}; pub use connection::{ - ConnectionCounters, ConnectionError, ConnectionLimit, ConnectionLimits, PendingConnectionError, - PendingInboundConnectionError, PendingOutboundConnectionError, + ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError, + PendingOutboundConnectionError, }; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, @@ -81,12 +82,12 @@ pub use handler::{ }; pub use registry::{AddAddressResult, AddressRecord, AddressScore}; -use connection::pool::{Pool, PoolConfig, PoolEvent}; -use connection::{EstablishedConnection, IncomingInfo}; +use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; +use connection::IncomingInfo; use dial_opts::{DialOpts, PeerCondition}; use either::Either; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; -use libp2p_core::connection::{ConnectionId, PendingPoint}; +use libp2p_core::connection::ConnectionId; use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ connection::ConnectedPoint, @@ -395,15 +396,7 @@ where // Check [`PeerCondition`] if provided. let condition_matched = match condition { PeerCondition::Disconnected => !self.is_connected(&peer_id), - PeerCondition::NotDialing => { - !self - .pool - .iter_pending_info() - .any(move |(_, endpoint, peer)| { - matches!(endpoint, PendingPoint::Dialer { .. }) - && peer.as_ref() == Some(&peer_id) - }) - } + PeerCondition::NotDialing => !self.pool.is_dialing(peer_id), PeerCondition::Always => true, }; if !condition_matched { @@ -1042,7 +1035,7 @@ where Some((peer_id, handler, event)) => match handler { PendingNotifyHandler::One(conn_id) => { match this.pool.get_established(conn_id) { - Some(mut conn) => match notify_one(&mut conn, event, cx) { + Some(conn) => match notify_one(conn, event, cx) { None => continue, Some(event) => { this.pending_event = Some((peer_id, handler, event)); @@ -1135,8 +1128,8 @@ enum PendingNotifyHandler { /// /// Returns `None` if the connection is closing or the event has been /// successfully sent, in either case the event is consumed. -fn notify_one<'a, THandlerInEvent>( - conn: &mut EstablishedConnection<'a, THandlerInEvent>, +fn notify_one( + conn: &mut EstablishedConnection, event: THandlerInEvent, cx: &mut Context<'_>, ) -> Option { @@ -1180,7 +1173,7 @@ where let mut pending = SmallVec::new(); let mut event = Some(event); // (1) for id in ids.into_iter() { - if let Some(mut conn) = pool.get_established(id) { + if let Some(conn) = pool.get_established(id) { match conn.poll_ready_notify_handler(cx) { Poll::Pending => pending.push(id), Poll::Ready(Err(())) => {} // connection is closing