core/muxing: Generalise StreamMuxer::poll_address_change to poll (#2797)

This is to allow general-purpose background work to be performed
by implementations.
This commit is contained in:
Thomas Eizinger
2022-08-16 04:50:17 +02:00
committed by GitHub
parent 06aaea67f3
commit cef505685c
8 changed files with 101 additions and 89 deletions

View File

@ -1,16 +1,18 @@
# 0.35.0 [unreleased] # 0.35.0 [unreleased]
- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776]. - Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776].
- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775]. - Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775].
- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765]. - Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765].
- Change `StreamMuxer` interface to be entirely poll-based. All functions on `StreamMuxer` now
require a `Context` and return `Poll`. This gives callers fine-grained control over what they
would like to make progress on. See [PR 2724] and [PR 2797].
[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 [PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762 [PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775 [PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775
[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776 [PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776
[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765 [PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765
[PR 2797]: https://github.com/libp2p/rust-libp2p/pull/2797
# 0.34.0 # 0.34.0

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::muxing::StreamMuxerEvent;
use crate::{ use crate::{
muxing::StreamMuxer, muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent}, transport::{ListenerId, Transport, TransportError, TransportEvent},
@ -236,24 +237,22 @@ where
} }
} }
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => {
inner.poll_address_change(cx).map_err(EitherError::B)
}
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() { match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A), EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
} }
} }
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll(cx).map_err(EitherError::B),
}
}
} }
/// Implements `Future` and dispatches all method calls to either `First` or `Second`. /// Implements `Future` and dispatches all method calls to either `First` or `Second`.

View File

@ -75,6 +75,10 @@ pub trait StreamMuxer {
type Error: std::error::Error; type Error: std::error::Error;
/// Poll for new inbound substreams. /// Poll for new inbound substreams.
///
/// This function should be called whenever callers are ready to accept more inbound streams. In
/// other words, callers may exercise back-pressure on incoming streams by not calling this
/// function if a certain limit is hit.
fn poll_inbound( fn poll_inbound(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -86,25 +90,33 @@ pub trait StreamMuxer {
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>; ) -> Poll<Result<Self::Substream, Self::Error>>;
/// Poll for an address change of the underlying connection. /// Poll to close this [`StreamMuxer`].
/// ///
/// Not all implementations may support this feature. /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely
fn poll_address_change( /// dropped.
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>;
/// Closes this `StreamMuxer`.
///
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All
/// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns,
/// or polls must generate an error or be ignored.
/// ///
/// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart from /// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and /// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer. /// > immediately dropping the muxer.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Poll to allow the underlying connection to make progress.
///
/// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called
/// unconditionally. Because it will be called regardless, this function can be used by
/// implementations to return events about the underlying connection that the caller MUST deal
/// with.
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>>;
}
/// An event produced by a [`StreamMuxer`].
pub enum StreamMuxerEvent {
/// The address of the remote has changed.
AddressChange(Multiaddr),
} }
/// Extension trait for [`StreamMuxer`]. /// Extension trait for [`StreamMuxer`].
@ -131,15 +143,12 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_outbound(cx) Pin::new(self).poll_outbound(cx)
} }
/// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`. /// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_address_change_unpin( fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent, Self::Error>>
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
where where
Self: Unpin, Self: Unpin,
{ {
Pin::new(self).poll_address_change(cx) Pin::new(self).poll(cx)
} }
/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`. /// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.

View File

@ -1,6 +1,5 @@
use crate::StreamMuxer; use crate::muxing::{StreamMuxer, StreamMuxerEvent};
use futures::{AsyncRead, AsyncWrite}; use futures::{AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use pin_project::pin_project; use pin_project::pin_project;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
@ -38,11 +37,6 @@ where
type Substream = SubstreamBox; type Substream = SubstreamBox;
type Error = io::Error; type Error = io::Error;
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}
fn poll_inbound( fn poll_inbound(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -65,14 +59,16 @@ where
.map_err(into_io_error) .map_err(into_io_error)
} }
fn poll_address_change( #[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}
fn poll(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> { ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project() self.project().inner.poll(cx).map_err(into_io_error)
.inner
.poll_address_change(cx)
.map_err(into_io_error)
} }
} }
@ -109,11 +105,6 @@ impl StreamMuxer for StreamMuxerBox {
type Substream = SubstreamBox; type Substream = SubstreamBox;
type Error = io::Error; type Error = io::Error;
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}
fn poll_inbound( fn poll_inbound(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -128,11 +119,16 @@ impl StreamMuxer for StreamMuxerBox {
self.project().poll_outbound(cx) self.project().poll_outbound(cx)
} }
fn poll_address_change( #[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}
fn poll(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> { ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().poll_address_change(cx) self.project().poll(cx)
} }
} }

View File

@ -18,10 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{connection::Endpoint, muxing::StreamMuxer}; use crate::connection::Endpoint;
use crate::muxing::{StreamMuxer, StreamMuxerEvent};
use futures::prelude::*; use futures::prelude::*;
use multiaddr::Multiaddr;
use std::cell::Cell; use std::cell::Cell;
use std::pin::Pin; use std::pin::Pin;
use std::{io, task::Context, task::Poll}; use std::{io, task::Context, task::Poll};
@ -88,14 +88,14 @@ where
} }
} }
fn poll_address_change(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}
} }

View File

@ -27,10 +27,8 @@ pub use config::{MaxBufferBehaviour, MplexConfig};
use bytes::Bytes; use bytes::Bytes;
use codec::LocalStreamId; use codec::LocalStreamId;
use futures::{future, prelude::*, ready}; use futures::{future, prelude::*, ready};
use libp2p_core::{ use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
Multiaddr, StreamMuxer,
};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};
@ -105,10 +103,10 @@ where
.map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) .map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
} }
fn poll_address_change( fn poll(
self: Pin<&mut Self>, self: Pin<&mut Self>,
_: &mut Context<'_>, _: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> { ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending Poll::Pending
} }

View File

@ -26,9 +26,8 @@ use futures::{
prelude::*, prelude::*,
stream::{BoxStream, LocalBoxStream}, stream::{BoxStream, LocalBoxStream},
}; };
use libp2p_core::muxing::StreamMuxer; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::Multiaddr;
use std::{ use std::{
fmt, io, iter, mem, fmt, io, iter, mem,
pin::Pin, pin::Pin,
@ -124,10 +123,10 @@ where
.map_err(YamuxError) .map_err(YamuxError)
} }
fn poll_address_change( fn poll(
self: Pin<&mut Self>, self: Pin<&mut Self>,
_: &mut Context<'_>, _: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> { ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending Poll::Pending
} }

View File

@ -35,7 +35,7 @@ use crate::IntoConnectionHandler;
use handler_wrapper::HandlerWrapper; use handler_wrapper::HandlerWrapper;
use libp2p_core::connection::ConnectedPoint; use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr; use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt};
use libp2p_core::upgrade; use libp2p_core::upgrade;
use libp2p_core::PeerId; use libp2p_core::PeerId;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -153,27 +153,36 @@ where
} }
} }
if !self.open_info.is_empty() { match self.muxing.poll_unpin(cx)? {
if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {}
let user_data = self Poll::Ready(StreamMuxerEvent::AddressChange(address)) => {
.open_info self.handler.inject_address_change(&address);
.pop_front() return Poll::Ready(Ok(Event::AddressChange(address)));
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
} }
} }
if let Poll::Ready(substream) = self.muxing.poll_inbound_unpin(cx)? { if !self.open_info.is_empty() {
self.handler match self.muxing.poll_outbound_unpin(cx)? {
.inject_substream(substream, SubstreamEndpoint::Listener); Poll::Pending => {}
continue; // Go back to the top, handler can potentially make progress again. Poll::Ready(substream) => {
let user_data = self
.open_info
.pop_front()
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
}
}
} }
if let Poll::Ready(address) = self.muxing.poll_address_change_unpin(cx)? { match self.muxing.poll_inbound_unpin(cx)? {
self.handler.inject_address_change(&address); Poll::Pending => {}
return Poll::Ready(Ok(Event::AddressChange(address))); Poll::Ready(substream) => {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue; // Go back to the top, handler can potentially make progress again.
}
} }
return Poll::Pending; // Nothing can make progress, return `Pending`. return Poll::Pending; // Nothing can make progress, return `Pending`.