diff --git a/transports/webrtc/src/tokio/transport.rs b/transports/webrtc/src/tokio/transport.rs index 0ace3cbb..29ab7614 100644 --- a/transports/webrtc/src/tokio/transport.rs +++ b/transports/webrtc/src/tokio/transport.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{future::BoxFuture, prelude::*, ready, stream::SelectAll, stream::Stream}; +use futures::{future::BoxFuture, prelude::*, stream::SelectAll, stream::Stream}; use if_watch::{tokio::IfWatcher, IfEvent}; use libp2p_core::{ identity, @@ -33,7 +33,7 @@ use std::{ io, net::SocketAddr, pin::Pin, - task::{Context, Poll}, + task::{Context, Poll, Waker}, }; use crate::tokio::{ @@ -196,6 +196,9 @@ struct ListenStream { /// Pending event to reported. pending_event: Option<::Item>, + + /// The stream must be awaken after it has been closed to deliver the last event. + close_listener_waker: Option, } impl ListenStream { @@ -225,6 +228,7 @@ impl ListenStream { report_closed: None, if_watcher, pending_event, + close_listener_waker: None, }) } @@ -241,6 +245,11 @@ impl ListenStream { listener_id: self.listener_id, reason, })); + + // Wake the stream to deliver the last event. + if let Some(waker) = self.close_listener_waker.take() { + waker.wake(); + } } } } @@ -316,8 +325,8 @@ impl Stream for ListenStream { } // Poll UDP muxer for new addresses or incoming data for streams. - match ready!(self.udp_mux.poll(cx)) { - UDPMuxEvent::NewAddr(new_addr) => { + match self.udp_mux.poll(cx) { + Poll::Ready(UDPMuxEvent::NewAddr(new_addr)) => { let local_addr = socketaddr_to_multiaddr(&self.listen_addr, Some(self.config.fingerprint)); let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr, None); @@ -339,10 +348,16 @@ impl Stream for ListenStream { listener_id: self.listener_id, })); } - UDPMuxEvent::Error(e) => { + Poll::Ready(UDPMuxEvent::Error(e)) => { self.close(Err(Error::UDPMux(e))); + continue; } + Poll::Pending => {} } + + self.close_listener_waker = Some(cx.waker().clone()); + + return Poll::Pending; } } }