swarm/pool: Misc refactoring (#3073)

* Remove unreachable error case

Instead of taking the connection out of the map again, construct
the event to be returned with the data we already have available.

* Remove `Pool::get` and `PoolConnection`

These are effectively not used.

* Replace `iter_pending_info` with its only usage: `is_dialing`

* Add `is_for_same_remote_as` convenience function

* Remove `PendingConnection`

* Rename `PendingConnectionInfo` to `PendingConnection`

With the latter being gone, the name is now free.

* Merge `EstablishedConnectionInfo` and `EstablishedConnection`

This is a leftover from when `Pool` was still in `libp2p-core` and
one of them was a public API and the other one wasn't.

All of this is private to `libp2p-swarm` so we no longer need to
differentiate.

* Don't `pub use` out of `pub(crate)` modules
This commit is contained in:
Thomas Eizinger
2022-11-03 05:47:00 +11:00
committed by GitHub
parent 64e38bd6b4
commit f4ce1fe9ae
3 changed files with 85 additions and 189 deletions

View File

@ -26,8 +26,6 @@ pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, PendingConnection};
use crate::handler::ConnectionHandler;
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};

View File

@ -69,12 +69,12 @@ where
PeerId,
FnvHashMap<
ConnectionId,
EstablishedConnectionInfo<<THandler::Handler as ConnectionHandler>::InEvent>,
EstablishedConnection<<THandler::Handler as ConnectionHandler>::InEvent>,
>,
>,
/// The pending connections that are currently being negotiated.
pending: HashMap<ConnectionId, PendingConnectionInfo<THandler>>,
pending: HashMap<ConnectionId, PendingConnection<THandler>>,
/// Next available identifier for a new connection / task.
next_connection_id: ConnectionId,
@ -120,15 +120,41 @@ where
}
#[derive(Debug)]
struct EstablishedConnectionInfo<TInEvent> {
/// [`PeerId`] of the remote peer.
peer_id: PeerId,
pub struct EstablishedConnection<TInEvent> {
endpoint: ConnectedPoint,
/// Channel endpoint to send commands to the task.
sender: mpsc::Sender<task::Command<TInEvent>>,
}
impl<TInEvent> EstablishedConnectionInfo<TInEvent> {
impl<TInEvent> EstablishedConnection<TInEvent> {
/// (Asynchronously) sends an event to the connection handler.
///
/// If the handler is not ready to receive the event, either because
/// it is busy or the connection is about to close, the given event
/// is returned with an `Err`.
///
/// If execution of this method is preceded by successful execution of
/// `poll_ready_notify_handler` without another intervening execution
/// of `notify_handler`, it only fails if the connection is now about
/// to close.
pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
let cmd = task::Command::NotifyHandler(event);
self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event,
_ => unreachable!("Expect failed send to return initial event."),
})
}
/// Checks if `notify_handler` is ready to accept an event.
///
/// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
///
/// Returns `Err(())` if the background task associated with the connection
/// is terminating and the connection is about to close.
pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.sender.poll_ready(cx).map_err(|_| ())
}
/// Initiates a graceful close of the connection.
///
/// Has no effect if the connection is already closing.
@ -142,7 +168,7 @@ impl<TInEvent> EstablishedConnectionInfo<TInEvent> {
}
}
struct PendingConnectionInfo<THandler> {
struct PendingConnection<THandler> {
/// [`PeerId`] of the remote peer.
peer_id: Option<PeerId>,
/// Handler to handle connection once no longer pending but established.
@ -152,6 +178,19 @@ struct PendingConnectionInfo<THandler> {
abort_notifier: Option<oneshot::Sender<Void>>,
}
impl<THandler> PendingConnection<THandler> {
fn is_for_same_remote_as(&self, other: PeerId) -> bool {
self.peer_id.map_or(false, |peer| peer == other)
}
/// Aborts the connection attempt, closing the connection.
fn abort(&mut self) {
if let Some(notifier) = self.abort_notifier.take() {
drop(notifier);
}
}
}
impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Pool")
@ -284,33 +323,14 @@ where
&self.counters
}
/// Gets an entry representing a connection in the pool.
///
/// Returns `None` if the pool has no connection with the given ID.
pub fn get(&mut self, id: ConnectionId) -> Option<PoolConnection<'_, THandler>> {
if let hash_map::Entry::Occupied(entry) = self.pending.entry(id) {
Some(PoolConnection::Pending(PendingConnection { entry }))
} else {
self.established
.iter_mut()
.find_map(|(_, cs)| match cs.entry(id) {
hash_map::Entry::Occupied(entry) => {
Some(PoolConnection::Established(EstablishedConnection { entry }))
}
hash_map::Entry::Vacant(_) => None,
})
}
}
/// Gets an established connection from the pool by ID.
pub fn get_established(
&mut self,
id: ConnectionId,
) -> Option<EstablishedConnection<'_, THandlerInEvent<THandler>>> {
match self.get(id) {
Some(PoolConnection::Established(c)) => Some(c),
_ => None,
}
) -> Option<&mut EstablishedConnection<THandlerInEvent<THandler>>> {
self.established
.values_mut()
.find_map(|connections| connections.get_mut(&id))
}
/// Returns true if we are connected to the given peer.
@ -338,21 +358,12 @@ where
}
}
#[allow(clippy::needless_collect)]
let pending_connections = self
for connection in self
.pending
.iter()
.filter(|(_, PendingConnectionInfo { peer_id, .. })| peer_id.as_ref() == Some(&peer))
.map(|(id, _)| *id)
.collect::<Vec<_>>();
for pending_connection in pending_connections {
let entry = self
.pending
.entry(pending_connection)
.expect_occupied("Iterating pending connections");
PendingConnection { entry }.abort();
.iter_mut()
.filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
{
connection.abort()
}
}
@ -367,19 +378,11 @@ where
}
}
/// Returns an iterator over all pending connection IDs together
/// with associated endpoints and expected peer IDs in the pool.
pub fn iter_pending_info(
&self,
) -> impl Iterator<Item = (&ConnectionId, &PendingPoint, &Option<PeerId>)> + '_ {
self.pending.iter().map(
|(
id,
PendingConnectionInfo {
peer_id, endpoint, ..
},
)| (id, endpoint, peer_id),
)
/// Checks whether we are currently dialing the given peer.
pub fn is_dialing(&self, peer: PeerId) -> bool {
self.pending.iter().any(|(_, info)| {
matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
})
}
/// Returns an iterator over all connected peers, i.e. those that have
@ -467,7 +470,7 @@ where
self.counters.inc_pending(&endpoint);
self.pending.insert(
connection_id,
PendingConnectionInfo {
PendingConnection {
peer_id: peer,
handler,
endpoint,
@ -514,7 +517,7 @@ where
self.counters.inc_pending_incoming();
self.pending.insert(
connection_id,
PendingConnectionInfo {
PendingConnection {
peer_id: None,
handler,
endpoint: endpoint.into(),
@ -576,7 +579,7 @@ where
.established
.get_mut(&peer_id)
.expect("`Closed` event for established connection");
let EstablishedConnectionInfo { endpoint, .. } =
let EstablishedConnection { endpoint, .. } =
connections.remove(&id).expect("Connection to be present");
self.counters.dec_established(&endpoint);
let remaining_established_connection_ids: Vec<ConnectionId> =
@ -608,7 +611,7 @@ where
output: (obtained_peer_id, mut muxer),
outgoing,
} => {
let PendingConnectionInfo {
let PendingConnection {
peer_id: expected_peer_id,
handler,
endpoint,
@ -740,8 +743,7 @@ where
mpsc::channel(self.task_command_buffer_size);
conns.insert(
id,
EstablishedConnectionInfo {
peer_id: obtained_peer_id,
EstablishedConnection {
endpoint: endpoint.clone(),
sender: command_sender,
},
@ -764,21 +766,16 @@ where
.boxed(),
);
match self.get(id) {
Some(PoolConnection::Established(connection)) => {
return Poll::Ready(PoolEvent::ConnectionEstablished {
peer_id: connection.peer_id(),
endpoint: connection.endpoint().clone(),
id: connection.id(),
peer_id: obtained_peer_id,
endpoint,
id,
other_established_connection_ids,
concurrent_dial_errors,
})
}
_ => unreachable!("since `entry` is an `EstablishedEntry`."),
}
});
}
task::PendingConnectionEvent::PendingFailed { id, error } => {
if let Some(PendingConnectionInfo {
if let Some(PendingConnection {
peer_id,
handler,
endpoint,
@ -830,98 +827,6 @@ where
}
}
/// A connection in a [`Pool`].
pub enum PoolConnection<'a, THandler: IntoConnectionHandler> {
Pending(PendingConnection<'a, THandler>),
Established(EstablishedConnection<'a, THandlerInEvent<THandler>>),
}
/// A pending connection in a pool.
pub struct PendingConnection<'a, THandler: IntoConnectionHandler> {
entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo<THandler>>,
}
impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> {
/// Aborts the connection attempt, closing the connection.
pub fn abort(mut self) {
if let Some(notifier) = self.entry.get_mut().abort_notifier.take() {
drop(notifier);
}
}
}
/// An established connection in a pool.
pub struct EstablishedConnection<'a, TInEvent> {
entry: hash_map::OccupiedEntry<'a, ConnectionId, EstablishedConnectionInfo<TInEvent>>,
}
impl<TInEvent> fmt::Debug for EstablishedConnection<'_, TInEvent>
where
TInEvent: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("EstablishedConnection")
.field("entry", &self.entry)
.finish()
}
}
impl<TInEvent> EstablishedConnection<'_, TInEvent> {
/// Returns information about the connected endpoint.
pub fn endpoint(&self) -> &ConnectedPoint {
&self.entry.get().endpoint
}
/// Returns the identity of the connected peer.
pub fn peer_id(&self) -> PeerId {
self.entry.get().peer_id
}
/// Returns the local connection ID.
pub fn id(&self) -> ConnectionId {
*self.entry.key()
}
/// (Asynchronously) sends an event to the connection handler.
///
/// If the handler is not ready to receive the event, either because
/// it is busy or the connection is about to close, the given event
/// is returned with an `Err`.
///
/// If execution of this method is preceded by successful execution of
/// `poll_ready_notify_handler` without another intervening execution
/// of `notify_handler`, it only fails if the connection is now about
/// to close.
pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
let cmd = task::Command::NotifyHandler(event);
self.entry
.get_mut()
.sender
.try_send(cmd)
.map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event,
_ => unreachable!("Expect failed send to return initial event."),
})
}
/// Checks if `notify_handler` is ready to accept an event.
///
/// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
///
/// Returns `Err(())` if the background task associated with the connection
/// is terminating and the connection is about to close.
pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.entry.get_mut().sender.poll_ready(cx).map_err(|_| ())
}
/// Initiates a graceful close of the connection.
///
/// Has no effect if the connection is already closing.
pub fn start_close(mut self) {
self.entry.get_mut().start_close()
}
}
/// Network connection information.
#[derive(Debug, Clone)]
pub struct ConnectionCounters {
@ -1076,7 +981,7 @@ impl ConnectionCounters {
/// Counts the number of established connections to the given peer.
fn num_peer_established<TInEvent>(
established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, EstablishedConnectionInfo<TInEvent>>>,
established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, EstablishedConnection<TInEvent>>>,
peer: PeerId,
) -> u32 {
established.get(&peer).map_or(0, |conns| {

View File

@ -70,9 +70,10 @@ pub mod keep_alive;
pub use behaviour::{
CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
pub use connection::pool::{ConnectionCounters, ConnectionLimits};
pub use connection::{
ConnectionCounters, ConnectionError, ConnectionLimit, ConnectionLimits, PendingConnectionError,
PendingInboundConnectionError, PendingOutboundConnectionError,
ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr,
@ -81,12 +82,12 @@ pub use handler::{
};
pub use registry::{AddAddressResult, AddressRecord, AddressScore};
use connection::pool::{Pool, PoolConfig, PoolEvent};
use connection::{EstablishedConnection, IncomingInfo};
use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
use connection::IncomingInfo;
use dial_opts::{DialOpts, PeerCondition};
use either::Either;
use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream};
use libp2p_core::connection::{ConnectionId, PendingPoint};
use libp2p_core::connection::ConnectionId;
use libp2p_core::muxing::SubstreamBox;
use libp2p_core::{
connection::ConnectedPoint,
@ -395,15 +396,7 @@ where
// Check [`PeerCondition`] if provided.
let condition_matched = match condition {
PeerCondition::Disconnected => !self.is_connected(&peer_id),
PeerCondition::NotDialing => {
!self
.pool
.iter_pending_info()
.any(move |(_, endpoint, peer)| {
matches!(endpoint, PendingPoint::Dialer { .. })
&& peer.as_ref() == Some(&peer_id)
})
}
PeerCondition::NotDialing => !self.pool.is_dialing(peer_id),
PeerCondition::Always => true,
};
if !condition_matched {
@ -1042,7 +1035,7 @@ where
Some((peer_id, handler, event)) => match handler {
PendingNotifyHandler::One(conn_id) => {
match this.pool.get_established(conn_id) {
Some(mut conn) => match notify_one(&mut conn, event, cx) {
Some(conn) => match notify_one(conn, event, cx) {
None => continue,
Some(event) => {
this.pending_event = Some((peer_id, handler, event));
@ -1135,8 +1128,8 @@ enum PendingNotifyHandler {
///
/// Returns `None` if the connection is closing or the event has been
/// successfully sent, in either case the event is consumed.
fn notify_one<'a, THandlerInEvent>(
conn: &mut EstablishedConnection<'a, THandlerInEvent>,
fn notify_one<THandlerInEvent>(
conn: &mut EstablishedConnection<THandlerInEvent>,
event: THandlerInEvent,
cx: &mut Context<'_>,
) -> Option<THandlerInEvent> {
@ -1180,7 +1173,7 @@ where
let mut pending = SmallVec::new();
let mut event = Some(event); // (1)
for id in ids.into_iter() {
if let Some(mut conn) = pool.get_established(id) {
if let Some(conn) = pool.get_established(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Err(())) => {} // connection is closing