swarm/: Report aborted connections (#2517)

Disconnect pending connections with `Swarm::disconnect` and eport aborted
connections via `SwarmEvent::OutgoingConnectionError`.

Co-authored-by: Jack Maloney <git@jmmaloney4.xyz>
Co-authored-by: Marco Munizaga <git@marcopolo.io>
This commit is contained in:
Max Inden 2022-02-15 10:19:55 +01:00 committed by GitHub
parent e66f04f41b
commit 146ed5f45e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 69 additions and 33 deletions

View File

@ -4,7 +4,12 @@
- Update to `libp2p-core` `v0.32.0`. - Update to `libp2p-core` `v0.32.0`.
- Disconnect pending connections with `Swarm::disconnect`. See [PR 2517].
- Report aborted connections via `SwarmEvent::OutgoingConnectionError`. See [PR 2517].
[PR 2492]: https://github.com/libp2p/rust-libp2p/pull/2492 [PR 2492]: https://github.com/libp2p/rust-libp2p/pull/2492
[PR 2517]: https://github.com/libp2p/rust-libp2p/pull/2517
# 0.33.0 [2022-01-27] # 0.33.0 [2022-01-27]

View File

@ -26,6 +26,7 @@ void = "1"
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] } async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.9"
libp2p = { path = "../", default-features = false, features = ["identify", "ping", "plaintext", "yamux"] } libp2p = { path = "../", default-features = false, features = ["identify", "ping", "plaintext", "yamux"] }
libp2p-mplex = { path = "../muxers/mplex" } libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../transports/noise" } libp2p-noise = { path = "../transports/noise" }

View File

@ -136,7 +136,7 @@ struct PendingConnectionInfo<THandler> {
handler: THandler, handler: THandler,
endpoint: PendingPoint, endpoint: PendingPoint,
/// When dropped, notifies the task which then knows to terminate. /// When dropped, notifies the task which then knows to terminate.
_drop_notifier: oneshot::Sender<Void>, abort_notifier: Option<oneshot::Sender<Void>>,
} }
impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> { impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
@ -340,10 +340,7 @@ where
/// Returns `None` if the pool has no connection with the given ID. /// Returns `None` if the pool has no connection with the given ID.
pub fn get(&mut self, id: ConnectionId) -> Option<PoolConnection<'_, THandler>> { pub fn get(&mut self, id: ConnectionId) -> Option<PoolConnection<'_, THandler>> {
if let hash_map::Entry::Occupied(entry) = self.pending.entry(id) { if let hash_map::Entry::Occupied(entry) = self.pending.entry(id) {
Some(PoolConnection::Pending(PendingConnection { Some(PoolConnection::Pending(PendingConnection { entry }))
entry,
counters: &mut self.counters,
}))
} else { } else {
self.established self.established
.iter_mut() .iter_mut()
@ -406,11 +403,7 @@ where
.entry(pending_connection) .entry(pending_connection)
.expect_occupied("Iterating pending connections"); .expect_occupied("Iterating pending connections");
PendingConnection { PendingConnection { entry }.abort();
entry,
counters: &mut self.counters,
}
.abort();
} }
} }
@ -501,13 +494,13 @@ where
let connection_id = self.next_connection_id(); let connection_id = self.next_connection_id();
let (drop_notifier, drop_receiver) = oneshot::channel(); let (abort_notifier, abort_receiver) = oneshot::channel();
self.spawn( self.spawn(
task::new_for_pending_outgoing_connection( task::new_for_pending_outgoing_connection(
connection_id, connection_id,
dial, dial,
drop_receiver, abort_receiver,
self.pending_connection_events_tx.clone(), self.pending_connection_events_tx.clone(),
) )
.boxed(), .boxed(),
@ -521,8 +514,8 @@ where
PendingConnectionInfo { PendingConnectionInfo {
peer_id: peer, peer_id: peer,
handler, handler,
endpoint: endpoint, endpoint,
_drop_notifier: drop_notifier, abort_notifier: Some(abort_notifier),
}, },
); );
Ok(connection_id) Ok(connection_id)
@ -550,13 +543,13 @@ where
let connection_id = self.next_connection_id(); let connection_id = self.next_connection_id();
let (drop_notifier, drop_receiver) = oneshot::channel(); let (abort_notifier, abort_receiver) = oneshot::channel();
self.spawn( self.spawn(
task::new_for_pending_incoming_connection( task::new_for_pending_incoming_connection(
connection_id, connection_id,
future, future,
drop_receiver, abort_receiver,
self.pending_connection_events_tx.clone(), self.pending_connection_events_tx.clone(),
) )
.boxed(), .boxed(),
@ -569,7 +562,7 @@ where
peer_id: None, peer_id: None,
handler, handler,
endpoint: endpoint.into(), endpoint: endpoint.into(),
_drop_notifier: drop_notifier, abort_notifier: Some(abort_notifier),
}, },
); );
Ok(connection_id) Ok(connection_id)
@ -685,7 +678,7 @@ where
peer_id: expected_peer_id, peer_id: expected_peer_id,
handler, handler,
endpoint, endpoint,
_drop_notifier, abort_notifier: _,
} = self } = self
.pending .pending
.remove(&id) .remove(&id)
@ -854,7 +847,7 @@ where
peer_id, peer_id,
handler, handler,
endpoint, endpoint,
_drop_notifier, abort_notifier: _,
}) = self.pending.remove(&id) }) = self.pending.remove(&id)
{ {
self.counters.dec_pending(&endpoint); self.counters.dec_pending(&endpoint);
@ -911,14 +904,14 @@ pub enum PoolConnection<'a, THandler: IntoConnectionHandler> {
/// A pending connection in a pool. /// A pending connection in a pool.
pub struct PendingConnection<'a, THandler: IntoConnectionHandler> { pub struct PendingConnection<'a, THandler: IntoConnectionHandler> {
entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo<THandler>>, entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo<THandler>>,
counters: &'a mut ConnectionCounters,
} }
impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> { impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> {
/// Aborts the connection attempt, closing the connection. /// Aborts the connection attempt, closing the connection.
pub fn abort(self) { pub fn abort(mut self) {
self.counters.dec_pending(&self.entry.get().endpoint); if let Some(notifier) = self.entry.get_mut().abort_notifier.take() {
self.entry.remove(); drop(notifier);
}
} }
} }

View File

@ -42,7 +42,7 @@ use libp2p_core::muxing::StreamMuxer;
use std::pin::Pin; use std::pin::Pin;
use void::Void; use void::Void;
/// Commands that can be sent to a task. /// Commands that can be sent to a task driving an established connection.
#[derive(Debug)] #[derive(Debug)]
pub enum Command<T> { pub enum Command<T> {
/// Notify the connection handler of an event. /// Notify the connection handler of an event.
@ -104,12 +104,12 @@ pub enum EstablishedConnectionEvent<THandler: IntoConnectionHandler> {
pub async fn new_for_pending_outgoing_connection<TTrans>( pub async fn new_for_pending_outgoing_connection<TTrans>(
connection_id: ConnectionId, connection_id: ConnectionId,
dial: ConcurrentDial<TTrans>, dial: ConcurrentDial<TTrans>,
drop_receiver: oneshot::Receiver<Void>, abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>, mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where ) where
TTrans: Transport, TTrans: Transport,
{ {
match futures::future::select(drop_receiver, Box::pin(dial)).await { match futures::future::select(abort_receiver, Box::pin(dial)).await {
Either::Left((Err(oneshot::Canceled), _)) => { Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events let _ = events
.send(PendingConnectionEvent::PendingFailed { .send(PendingConnectionEvent::PendingFailed {
@ -142,13 +142,13 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
pub async fn new_for_pending_incoming_connection<TFut, TTrans>( pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
connection_id: ConnectionId, connection_id: ConnectionId,
future: TFut, future: TFut,
drop_receiver: oneshot::Receiver<Void>, abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>, mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where ) where
TTrans: Transport, TTrans: Transport,
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static, TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
{ {
match futures::future::select(drop_receiver, Box::pin(future)).await { match futures::future::select(abort_receiver, Box::pin(future)).await {
Either::Left((Err(oneshot::Canceled), _)) => { Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events let _ = events
.send(PendingConnectionEvent::PendingFailed { .send(PendingConnectionEvent::PendingFailed {

View File

@ -624,12 +624,14 @@ where
/// with [`ProtocolsHandler::connection_keep_alive`] or directly with /// with [`ProtocolsHandler::connection_keep_alive`] or directly with
/// [`ProtocolsHandlerEvent::Close`]. /// [`ProtocolsHandlerEvent::Close`].
pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> { pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
if self.pool.is_connected(peer_id) { let was_connected = self.pool.is_connected(peer_id);
self.pool.disconnect(peer_id); self.pool.disconnect(peer_id);
return Ok(());
}
Err(()) if was_connected {
Ok(())
} else {
Err(())
}
} }
/// Checks whether there is an established connection to a peer. /// Checks whether there is an established connection to a peer.
@ -2422,4 +2424,39 @@ mod tests {
})) }))
.unwrap(); .unwrap();
} }
#[test]
fn aborting_pending_connection_surfaces_error() {
let _ = env_logger::try_init();
let mut dialer = new_test_swarm::<_, ()>(DummyProtocolsHandler::default()).build();
let mut listener = new_test_swarm::<_, ()>(DummyProtocolsHandler::default()).build();
let listener_peer_id = *listener.local_peer_id();
listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
let listener_address = match block_on(listener.next()).unwrap() {
SwarmEvent::NewListenAddr { address, .. } => address,
e => panic!("Unexpected network event: {:?}", e),
};
dialer
.dial(
DialOpts::peer_id(listener_peer_id)
.addresses(vec![listener_address])
.build(),
)
.unwrap();
dialer
.disconnect_peer_id(listener_peer_id)
.expect_err("Expect peer to not yet be connected.");
match block_on(dialer.next()).unwrap() {
SwarmEvent::OutgoingConnectionError {
error: DialError::Aborted,
..
} => {}
e => panic!("Unexpected swarm event {:?}.", e),
}
}
} }