From 96c93b9a5299349d85828fc8e5c45051c421e7bb Mon Sep 17 00:00:00 2001 From: Victor Ermolaev <16148931+vnermolaev@users.noreply.github.com> Date: Wed, 1 Feb 2023 06:34:48 +0100 Subject: [PATCH] fix(quic): unwoken task in quic transport (#3407) Add an explicit waker to report the last event. --- transports/quic/src/transport.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 30dba090..54c1fec9 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -40,12 +40,11 @@ use std::collections::{HashMap, VecDeque}; use std::fmt; use std::hash::{Hash, Hasher}; use std::net::IpAddr; -use std::task::Waker; use std::time::Duration; use std::{ net::SocketAddr, pin::Pin, - task::{Context, Poll}, + task::{Context, Poll, Waker}, }; /// Implementation of the [`Transport`] trait for QUIC. @@ -353,6 +352,9 @@ struct Listener { /// Pending event to reported. pending_event: Option<::Item>, + + /// The stream must be awaken after it has been closed to deliver the last event. + close_listener_waker: Option, } impl Listener

{ @@ -390,6 +392,7 @@ impl Listener

{ is_closed: false, pending_event, dialer_state: DialerState::default(), + close_listener_waker: None, }) } @@ -404,6 +407,11 @@ impl Listener

{ reason, }); self.is_closed = true; + + // Wake the stream to deliver the last event. + if let Some(waker) = self.close_listener_waker.take() { + waker.wake(); + } } /// Poll for a next If Event. @@ -472,16 +480,12 @@ impl Stream for Listener

{ if self.is_closed { return Poll::Ready(None); } - match self.poll_if_addr(cx) { - Poll::Ready(event) => return Poll::Ready(Some(event)), - Poll::Pending => {} + if let Poll::Ready(event) = self.poll_if_addr(cx) { + return Poll::Ready(Some(event)); } - match self.poll_dialer(cx) { - Poll::Ready(error) => { - self.close(Err(error)); - continue; - } - Poll::Pending => {} + if let Poll::Ready(error) = self.poll_dialer(cx) { + self.close(Err(error)); + continue; } match self.new_connections_rx.poll_next_unpin(cx) { Poll::Ready(Some(connection)) => { @@ -502,6 +506,9 @@ impl Stream for Listener

{ } Poll::Pending => {} }; + + self.close_listener_waker = Some(cx.waker().clone()); + return Poll::Pending; } }