diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index d613b4cb..c058833d 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -116,28 +116,42 @@ where (self.handler.into_protocols_handler(), self.muxing.close().0) } - /// Polls the connection for events produced by the associated handler - /// as a result of I/O activity on the substream multiplexer. + /// Polls the handler and the substream, forwarding events from the former to the latter and + /// vice versa. pub fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, ConnectionError>> { loop { - let mut io_pending = false; + // Poll the handler for new events. + match self.handler.poll(cx) { + Poll::Pending => {} + Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { + self.muxing.open_substream(user_data); + continue; + } + Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { + return Poll::Ready(Ok(Event::Handler(event))); + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + } // Perform I/O on the connection through the muxer, informing the handler // of new substreams. match self.muxing.poll(cx) { - Poll::Pending => io_pending = true, - Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => self - .handler - .inject_substream(substream, SubstreamEndpoint::Listener), + Poll::Pending => {} + Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => { + self.handler + .inject_substream(substream, SubstreamEndpoint::Listener); + continue; + } Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { user_data, substream, })) => { let endpoint = SubstreamEndpoint::Dialer(user_data); - self.handler.inject_substream(substream, endpoint) + self.handler.inject_substream(substream, endpoint); + continue; } Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { self.handler.inject_address_change(&address); @@ -146,21 +160,7 @@ where Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))), } - // Poll the handler for new events. - match self.handler.poll(cx) { - Poll::Pending => { - if io_pending { - return Poll::Pending; // Nothing to do - } - } - Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { - self.muxing.open_substream(user_data); - } - Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { - return Poll::Ready(Ok(Event::Handler(event))); - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - } + return Poll::Pending; } } }