swarm/src/connection: Prioritize handler over connection (#2626)

Prioritize work in handler over work on connection, thus prioritizing local work
over work coming from a remote.
This commit is contained in:
Max Inden
2022-05-03 22:26:28 +02:00
committed by GitHub
parent 70d38520fd
commit 3e1ed95cf6

View File

@ -116,28 +116,42 @@ where
(self.handler.into_protocols_handler(), self.muxing.close().0) (self.handler.into_protocols_handler(), self.muxing.close().0)
} }
/// Polls the connection for events produced by the associated handler /// Polls the handler and the substream, forwarding events from the former to the latter and
/// as a result of I/O activity on the substream multiplexer. /// vice versa.
pub fn poll( pub fn poll(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> { ) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
loop { loop {
let mut io_pending = false; // Poll the handler for new events.
match self.handler.poll(cx) {
Poll::Pending => {}
Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data);
continue;
}
Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
}
// Perform I/O on the connection through the muxer, informing the handler // Perform I/O on the connection through the muxer, informing the handler
// of new substreams. // of new substreams.
match self.muxing.poll(cx) { match self.muxing.poll(cx) {
Poll::Pending => io_pending = true, Poll::Pending => {}
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => self Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
.handler self.handler
.inject_substream(substream, SubstreamEndpoint::Listener), .inject_substream(substream, SubstreamEndpoint::Listener);
continue;
}
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
user_data, user_data,
substream, substream,
})) => { })) => {
let endpoint = SubstreamEndpoint::Dialer(user_data); let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint) self.handler.inject_substream(substream, endpoint);
continue;
} }
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address); self.handler.inject_address_change(&address);
@ -146,21 +160,7 @@ where
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))), Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
} }
// Poll the handler for new events. return Poll::Pending;
match self.handler.poll(cx) {
Poll::Pending => {
if io_pending {
return Poll::Pending; // Nothing to do
}
}
Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
}
} }
} }
} }