fix(webrtc): unwoken task in webrtc transport (#3408)

Add an explicit waker to report the last event.
This commit is contained in:
Victor Ermolaev
2023-02-01 07:11:28 +01:00
committed by GitHub
parent 96c93b9a52
commit 6880469340

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::{future::BoxFuture, prelude::*, ready, stream::SelectAll, stream::Stream}; use futures::{future::BoxFuture, prelude::*, stream::SelectAll, stream::Stream};
use if_watch::{tokio::IfWatcher, IfEvent}; use if_watch::{tokio::IfWatcher, IfEvent};
use libp2p_core::{ use libp2p_core::{
identity, identity,
@ -33,7 +33,7 @@ use std::{
io, io,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll, Waker},
}; };
use crate::tokio::{ use crate::tokio::{
@ -196,6 +196,9 @@ struct ListenStream {
/// Pending event to reported. /// Pending event to reported.
pending_event: Option<<Self as Stream>::Item>, pending_event: Option<<Self as Stream>::Item>,
/// The stream must be awaken after it has been closed to deliver the last event.
close_listener_waker: Option<Waker>,
} }
impl ListenStream { impl ListenStream {
@ -225,6 +228,7 @@ impl ListenStream {
report_closed: None, report_closed: None,
if_watcher, if_watcher,
pending_event, pending_event,
close_listener_waker: None,
}) })
} }
@ -241,6 +245,11 @@ impl ListenStream {
listener_id: self.listener_id, listener_id: self.listener_id,
reason, reason,
})); }));
// Wake the stream to deliver the last event.
if let Some(waker) = self.close_listener_waker.take() {
waker.wake();
}
} }
} }
} }
@ -316,8 +325,8 @@ impl Stream for ListenStream {
} }
// Poll UDP muxer for new addresses or incoming data for streams. // Poll UDP muxer for new addresses or incoming data for streams.
match ready!(self.udp_mux.poll(cx)) { match self.udp_mux.poll(cx) {
UDPMuxEvent::NewAddr(new_addr) => { Poll::Ready(UDPMuxEvent::NewAddr(new_addr)) => {
let local_addr = let local_addr =
socketaddr_to_multiaddr(&self.listen_addr, Some(self.config.fingerprint)); socketaddr_to_multiaddr(&self.listen_addr, Some(self.config.fingerprint));
let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr, None); let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr, None);
@ -339,10 +348,16 @@ impl Stream for ListenStream {
listener_id: self.listener_id, listener_id: self.listener_id,
})); }));
} }
UDPMuxEvent::Error(e) => { Poll::Ready(UDPMuxEvent::Error(e)) => {
self.close(Err(Error::UDPMux(e))); self.close(Err(Error::UDPMux(e)));
continue;
} }
Poll::Pending => {}
} }
self.close_listener_waker = Some(cx.waker().clone());
return Poll::Pending;
} }
} }
} }