diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 1c4ace68..d013c7d6 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -36,6 +36,9 @@ - Add `SignedEnvelope` and `PeerRecord` according to [RFC0002] and [RFC0003] (see [PR 2107]). +- Report `ListenersEvent::Closed` when dropping a listener in `ListenersStream::remove_listener`, + return `bool` instead of `Result<(), ()>` (see [PR 2261]). + [PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145 [PR 2213]: https://github.com/libp2p/rust-libp2p/pull/2213 [PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142 @@ -44,6 +47,7 @@ [PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191 [PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195 [PR 2107]: https://github.com/libp2p/rust-libp2p/pull/2107 +[PR 2261]: https://github.com/libp2p/rust-libp2p/pull/2261 [RFC0002]: https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md [RFC0003]: https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index f9a0c6ac..4c394aeb 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -27,7 +27,7 @@ use crate::{ use futures::{prelude::*, task::Context, task::Poll}; use log::debug; use smallvec::SmallVec; -use std::{collections::VecDeque, fmt, pin::Pin}; +use std::{collections::VecDeque, fmt, mem, pin::Pin}; /// Implementation of `futures::Stream` that allows listening on multiaddresses. /// @@ -90,6 +90,8 @@ where listeners: VecDeque>>>, /// The next listener ID to assign. next_id: ListenerId, + /// Pending listeners events to return from [`ListenersStream::poll`]. + pending_events: VecDeque>, } /// The ID of a single listener. @@ -177,6 +179,7 @@ where transport, listeners: VecDeque::new(), next_id: ListenerId(1), + pending_events: VecDeque::new(), } } @@ -187,6 +190,7 @@ where transport, listeners: VecDeque::with_capacity(capacity), next_id: ListenerId(1), + pending_events: VecDeque::new(), } } @@ -213,13 +217,24 @@ where /// Remove the listener matching the given `ListenerId`. /// - /// Return `Ok(())` if a listener with this ID was in the list. - pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> { + /// Returns `true` if there was a listener with this ID, `false` + /// otherwise. + pub fn remove_listener(&mut self, id: ListenerId) -> bool { if let Some(i) = self.listeners.iter().position(|l| l.id == id) { - self.listeners.remove(i); - Ok(()) + let mut listener = self + .listeners + .remove(i) + .expect("Index can not be out of bounds."); + let listener_project = listener.as_mut().project(); + let addresses = mem::take(listener_project.addresses).into_vec(); + self.pending_events.push_back(ListenersEvent::Closed { + listener_id: *listener_project.id, + addresses, + reason: Ok(()), + }); + true } else { - Err(()) + false } } @@ -235,6 +250,10 @@ where /// Provides an API similar to `Stream`, except that it cannot end. pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Return pending events from closed listeners. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); + } // We remove each element from `listeners` one by one and add them back. let mut remaining = self.listeners.len(); while let Some(mut listener) = self.listeners.pop_back() { @@ -292,18 +311,20 @@ where }); } Poll::Ready(None) => { + let addresses = mem::take(listener_project.addresses).into_vec(); return Poll::Ready(ListenersEvent::Closed { listener_id: *listener_project.id, - addresses: listener_project.addresses.drain(..).collect(), + addresses, reason: Ok(()), - }) + }); } Poll::Ready(Some(Err(err))) => { + let addresses = mem::take(listener_project.addresses).into_vec(); return Poll::Ready(ListenersEvent::Closed { listener_id: *listener_project.id, - addresses: listener_project.addresses.drain(..).collect(), + addresses, reason: Err(err), - }) + }); } } } @@ -537,4 +558,36 @@ mod tests { } }); } + + #[test] + fn listener_closed() { + async_std::task::block_on(async move { + let mem_transport = transport::MemoryTransport::default(); + + let mut listeners = ListenersStream::new(mem_transport); + let id = listeners.listen_on("/memory/0".parse().unwrap()).unwrap(); + + let event = listeners.next().await.unwrap(); + let addr; + if let ListenersEvent::NewAddress { listen_addr, .. } = event { + addr = listen_addr + } else { + panic!("Was expecting the listen address to be reported") + } + + assert!(listeners.remove_listener(id)); + + match listeners.next().await.unwrap() { + ListenersEvent::Closed { + listener_id, + addresses, + reason: Ok(()), + } => { + assert_eq!(listener_id, id); + assert!(addresses.contains(&addr)); + } + other => panic!("Unexpected listeners event: {:?}", other), + } + }); + } } diff --git a/core/src/network.rs b/core/src/network.rs index 831a99c4..8c68a55e 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -147,8 +147,9 @@ where /// Remove a previously added listener. /// - /// Returns `Ok(())` if a listener with this ID was in the list. - pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> { + /// Returns `true` if there was a listener with this ID, `false` + /// otherwise. + pub fn remove_listener(&mut self, id: ListenerId) -> bool { self.listeners.remove_listener(id) } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 5894591c..8bc85e4b 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -42,11 +42,15 @@ parameters to `NetworkBehaviourAction`. See [PR 2191]. +- Return `bool` instead of `Result<(), ()>` for `Swarm::remove_listener`(see + [PR 2261]). + [PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150 [PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182 [PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 [PR 2192]: https://github.com/libp2p/rust-libp2p/pull/2192 [PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191 +[PR 2261]: https://github.com/libp2p/rust-libp2p/pull/2261 # 0.30.0 [2021-07-12] diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a903113c..2a2dca66 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -324,8 +324,9 @@ where /// Remove some listener. /// - /// Returns `Ok(())` if there was a listener with this ID. - pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> { + /// Returns `true` if there was a listener with this ID, `false` + /// otherwise. + pub fn remove_listener(&mut self, id: ListenerId) -> bool { self.network.remove_listener(id) }