core/connection/listeners: Create event on remove_listener (#2261)

Create a `ListenersEvent::Closed` when a listener is removed via
`Swarm::remove_listener`. This makes it more consistent with `Swarm::listen_on`,
and also informs the Swarm about the associated expired addresses.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Elena Frank
2021-10-11 22:38:55 +02:00
committed by GitHub
parent 3eb0344832
commit 7718d1de38
5 changed files with 77 additions and 14 deletions

View File

@ -27,7 +27,7 @@ use crate::{
use futures::{prelude::*, task::Context, task::Poll};
use log::debug;
use smallvec::SmallVec;
use std::{collections::VecDeque, fmt, pin::Pin};
use std::{collections::VecDeque, fmt, mem, pin::Pin};
/// Implementation of `futures::Stream` that allows listening on multiaddresses.
///
@ -90,6 +90,8 @@ where
listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
/// The next listener ID to assign.
next_id: ListenerId,
/// Pending listeners events to return from [`ListenersStream::poll`].
pending_events: VecDeque<ListenersEvent<TTrans>>,
}
/// The ID of a single listener.
@ -177,6 +179,7 @@ where
transport,
listeners: VecDeque::new(),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}
@ -187,6 +190,7 @@ where
transport,
listeners: VecDeque::with_capacity(capacity),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}
@ -213,13 +217,24 @@ where
/// Remove the listener matching the given `ListenerId`.
///
/// Return `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
/// Returns `true` if there was a listener with this ID, `false`
/// otherwise.
pub fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
self.listeners.remove(i);
Ok(())
let mut listener = self
.listeners
.remove(i)
.expect("Index can not be out of bounds.");
let listener_project = listener.as_mut().project();
let addresses = mem::take(listener_project.addresses).into_vec();
self.pending_events.push_back(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses,
reason: Ok(()),
});
true
} else {
Err(())
false
}
}
@ -235,6 +250,10 @@ where
/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
// Return pending events from closed listeners.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
// We remove each element from `listeners` one by one and add them back.
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
@ -292,18 +311,20 @@ where
});
}
Poll::Ready(None) => {
let addresses = mem::take(listener_project.addresses).into_vec();
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses: listener_project.addresses.drain(..).collect(),
addresses,
reason: Ok(()),
})
});
}
Poll::Ready(Some(Err(err))) => {
let addresses = mem::take(listener_project.addresses).into_vec();
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses: listener_project.addresses.drain(..).collect(),
addresses,
reason: Err(err),
})
});
}
}
}
@ -537,4 +558,36 @@ mod tests {
}
});
}
#[test]
fn listener_closed() {
async_std::task::block_on(async move {
let mem_transport = transport::MemoryTransport::default();
let mut listeners = ListenersStream::new(mem_transport);
let id = listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
let event = listeners.next().await.unwrap();
let addr;
if let ListenersEvent::NewAddress { listen_addr, .. } = event {
addr = listen_addr
} else {
panic!("Was expecting the listen address to be reported")
}
assert!(listeners.remove_listener(id));
match listeners.next().await.unwrap() {
ListenersEvent::Closed {
listener_id,
addresses,
reason: Ok(()),
} => {
assert_eq!(listener_id, id);
assert!(addresses.contains(&addr));
}
other => panic!("Unexpected listeners event: {:?}", other),
}
});
}
}

View File

@ -147,8 +147,9 @@ where
/// Remove a previously added listener.
///
/// Returns `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
/// Returns `true` if there was a listener with this ID, `false`
/// otherwise.
pub fn remove_listener(&mut self, id: ListenerId) -> bool {
self.listeners.remove_listener(id)
}