fix(quic): unwoken task in quic transport (#3407)

Add an explicit waker to report the last event.
This commit is contained in:
Victor Ermolaev
2023-02-01 06:34:48 +01:00
committed by GitHub
parent 0c94237e16
commit 96c93b9a52

View File

@ -40,12 +40,11 @@ use std::collections::{HashMap, VecDeque};
use std::fmt; use std::fmt;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::net::IpAddr; use std::net::IpAddr;
use std::task::Waker;
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll, Waker},
}; };
/// Implementation of the [`Transport`] trait for QUIC. /// Implementation of the [`Transport`] trait for QUIC.
@ -353,6 +352,9 @@ struct Listener<P: Provider> {
/// Pending event to reported. /// Pending event to reported.
pending_event: Option<<Self as Stream>::Item>, pending_event: Option<<Self as Stream>::Item>,
/// The stream must be awaken after it has been closed to deliver the last event.
close_listener_waker: Option<Waker>,
} }
impl<P: Provider> Listener<P> { impl<P: Provider> Listener<P> {
@ -390,6 +392,7 @@ impl<P: Provider> Listener<P> {
is_closed: false, is_closed: false,
pending_event, pending_event,
dialer_state: DialerState::default(), dialer_state: DialerState::default(),
close_listener_waker: None,
}) })
} }
@ -404,6 +407,11 @@ impl<P: Provider> Listener<P> {
reason, reason,
}); });
self.is_closed = true; self.is_closed = true;
// Wake the stream to deliver the last event.
if let Some(waker) = self.close_listener_waker.take() {
waker.wake();
}
} }
/// Poll for a next If Event. /// Poll for a next If Event.
@ -472,16 +480,12 @@ impl<P: Provider> Stream for Listener<P> {
if self.is_closed { if self.is_closed {
return Poll::Ready(None); return Poll::Ready(None);
} }
match self.poll_if_addr(cx) { if let Poll::Ready(event) = self.poll_if_addr(cx) {
Poll::Ready(event) => return Poll::Ready(Some(event)), return Poll::Ready(Some(event));
Poll::Pending => {}
} }
match self.poll_dialer(cx) { if let Poll::Ready(error) = self.poll_dialer(cx) {
Poll::Ready(error) => { self.close(Err(error));
self.close(Err(error)); continue;
continue;
}
Poll::Pending => {}
} }
match self.new_connections_rx.poll_next_unpin(cx) { match self.new_connections_rx.poll_next_unpin(cx) {
Poll::Ready(Some(connection)) => { Poll::Ready(Some(connection)) => {
@ -502,6 +506,9 @@ impl<P: Provider> Stream for Listener<P> {
} }
Poll::Pending => {} Poll::Pending => {}
}; };
self.close_listener_waker = Some(cx.waker().clone());
return Poll::Pending; return Poll::Pending;
} }
} }