core/muxing: Flatten StreamMuxer interface to poll_{inbound,outbound,address_change,close} (#2724)

Instead of having a mix of `poll_event`, `poll_outbound` and `poll_close`, we
flatten the entire interface of `StreamMuxer` into 4 individual functions:

- `poll_inbound`
- `poll_outbound`
- `poll_address_change`
- `poll_close`

This design is closer to the design of other async traits like `AsyncRead` and
`AsyncWrite`. It also allows us to delete the `StreamMuxerEvent`.
This commit is contained in:
Thomas Eizinger
2022-07-18 04:20:11 +01:00
committed by GitHub
parent d4f8ec2d48
commit 1a553db596
61 changed files with 281 additions and 696 deletions

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
@ -202,60 +202,38 @@ where
B: StreamMuxer,
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = EitherError<A::Error, B::Error>;
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
}
}
fn open_outbound(&self) -> Self::OutboundSubstream {
match self {
EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
}
}
fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
EitherOutput::Second(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::First(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B),
}
}
@ -267,13 +245,6 @@ where
}
}
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}
/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]

View File

@ -63,62 +63,25 @@ mod singleton;
/// Provides multiplexing for a connection by allowing users to open substreams.
///
/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
///
/// Inbound substreams are reported via [`StreamMuxer::poll_event`].
/// Outbound substreams can be opened via [`StreamMuxer::open_outbound`] and subsequent polling via
/// [`StreamMuxer::poll_outbound`].
/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style
/// functions that allow the implementation to make progress on various tasks.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite;
/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream;
/// Error type of the muxer
type Error: std::error::Error;
/// Polls for a connection-wide event.
///
/// This function behaves the same as a `Stream`.
///
/// If `Pending` is returned, then the current task will be notified once the muxer
/// is ready to be polled, similar to the API of `Stream::poll()`.
/// Only the latest task that was used to call this method may be notified.
///
/// It is permissible and common to use this method to perform background
/// work, such as processing incoming packets and polling timers.
///
/// An error can be generated if the connection has been closed.
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
/// Poll for new inbound substreams.
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
/// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available.
///
/// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced
/// through the methods on the `StreamMuxer` trait.
fn open_outbound(&self) -> Self::OutboundSubstream;
/// Poll for a new, outbound substream.
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
/// Polls the outbound substream.
/// Poll for an address change of the underlying connection.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be polled, similar to the API of `Future::poll()`.
/// However, for each individual outbound substream, only the latest task that was used to
/// call this method may be notified.
///
/// May panic or produce an undefined result if an earlier polling of the same substream
/// returned `Ready` or `Err`.
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>>;
/// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it.
fn destroy_outbound(&self, s: Self::OutboundSubstream);
/// Not all implementations may support this feature.
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>>;
/// Closes this `StreamMuxer`.
///
@ -132,38 +95,3 @@ pub trait StreamMuxer {
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}
/// Event about a connection, reported by an implementation of [`StreamMuxer`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamMuxerEvent<T> {
/// Remote has opened a new substream. Contains the substream in question.
InboundSubstream(T),
/// Address to the remote has changed. The previous one is now obsolete.
///
/// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
/// > can change their IP address while retaining the same QUIC connection.
AddressChange(Multiaddr),
}
impl<T> StreamMuxerEvent<T> {
/// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise
/// returns `None`.
pub fn into_inbound_substream(self) -> Option<T> {
if let StreamMuxerEvent::InboundSubstream(s) = self {
Some(s)
} else {
None
}
}
/// Map the stream within [`StreamMuxerEvent::InboundSubstream`] to a new type.
pub fn map_inbound_stream<O>(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent<O> {
match self {
StreamMuxerEvent::InboundSubstream(stream) => {
StreamMuxerEvent::InboundSubstream(map(stream))
}
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
}
}
}

View File

@ -1,23 +1,16 @@
use crate::muxing::StreamMuxerEvent;
use crate::StreamMuxer;
use fnv::FnvHashMap;
use futures::{ready, AsyncRead, AsyncWrite};
use parking_lot::Mutex;
use futures::{AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use std::error::Error;
use std::fmt;
use std::io;
use std::io::{IoSlice, IoSliceMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
inner: Box<
dyn StreamMuxer<Substream = SubstreamBox, OutboundSubstream = usize, Error = io::Error>
+ Send
+ Sync,
>,
inner: Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send + Sync>,
}
/// Abstract type for asynchronous reading and writing.
@ -31,8 +24,6 @@ where
T: StreamMuxer,
{
inner: T,
outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
next_outbound: AtomicUsize,
}
impl<T> StreamMuxer for Wrap<T>
@ -42,54 +33,30 @@ where
T::Error: Send + Sync + 'static,
{
type Substream = SubstreamBox;
type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;
#[inline]
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
let event = ready!(self.inner.poll_event(cx).map_err(into_io_error)?)
.map_inbound_stream(SubstreamBox::new);
Poll::Ready(Ok(event))
}
#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
let outbound = self.inner.open_outbound();
let id = self.next_outbound.fetch_add(1, Ordering::Relaxed);
self.outbound.lock().insert(id, outbound);
id
}
#[inline]
fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut list = self.outbound.lock();
let stream = ready!(self
.inner
.poll_outbound(cx, list.get_mut(substream).unwrap())
.map_err(into_io_error)?);
Poll::Ready(Ok(SubstreamBox::new(stream)))
}
#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
let mut list = self.outbound.lock();
self.inner
.destroy_outbound(list.remove(&substream).unwrap())
}
#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(into_io_error)
}
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner
.poll_inbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
}
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner
.poll_outbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
}
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
self.inner.poll_address_change(cx).map_err(into_io_error)
}
}
fn into_io_error<E>(err: E) -> io::Error
@ -104,15 +71,10 @@ impl StreamMuxerBox {
pub fn new<T>(muxer: T) -> StreamMuxerBox
where
T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send,
T::Substream: Send + Unpin + 'static,
T::Error: Send + Sync + 'static,
{
let wrap = Wrap {
inner: muxer,
outbound: Mutex::new(Default::default()),
next_outbound: AtomicUsize::new(0),
};
let wrap = Wrap { inner: muxer };
StreamMuxerBox {
inner: Box::new(wrap),
@ -122,40 +84,24 @@ impl StreamMuxerBox {
impl StreamMuxer for StreamMuxerBox {
type Substream = SubstreamBox;
type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;
#[inline]
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
self.inner.poll_event(cx)
}
#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
self.inner.open_outbound()
}
#[inline]
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_outbound(cx, s)
}
#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
self.inner.destroy_outbound(substream)
}
#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx)
}
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_inbound(cx)
}
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_outbound(cx)
}
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
self.inner.poll_address_change(cx)
}
}
impl SubstreamBox {

View File

@ -18,12 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
connection::Endpoint,
muxing::{StreamMuxer, StreamMuxerEvent},
};
use crate::{connection::Endpoint, muxing::StreamMuxer};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::cell::Cell;
use std::{io, task::Context, task::Poll};
@ -52,55 +50,36 @@ impl<TSocket> SingletonMuxer<TSocket> {
}
}
/// Outbound substream attempt of the `SingletonMuxer`.
pub struct OutboundSubstream {}
impl<TSocket> StreamMuxer for SingletonMuxer<TSocket>
where
TSocket: AsyncRead + AsyncWrite + Unpin,
{
type Substream = TSocket;
type OutboundSubstream = OutboundSubstream;
type Error = io::Error;
fn poll_event(
&self,
_: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, io::Error>> {
fn poll_inbound(&self, _: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self.endpoint {
Endpoint::Dialer => return Poll::Pending,
Endpoint::Listener => {}
}
if let Some(stream) = self.inner.replace(None) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
} else {
Poll::Pending
Endpoint::Dialer => Poll::Pending,
Endpoint::Listener => match self.inner.replace(None) {
None => Poll::Pending,
Some(stream) => Poll::Ready(Ok(stream)),
},
}
}
fn open_outbound(&self) -> Self::OutboundSubstream {
OutboundSubstream {}
}
fn poll_outbound(
&self,
_: &mut Context<'_>,
_: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, io::Error>> {
fn poll_outbound(&self, _: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self.endpoint {
Endpoint::Listener => return Poll::Pending,
Endpoint::Dialer => {}
}
if let Some(stream) = self.inner.replace(None) {
Poll::Ready(Ok(stream))
} else {
Poll::Pending
Endpoint::Listener => Poll::Pending,
Endpoint::Dialer => match self.inner.replace(None) {
None => Poll::Pending,
Some(stream) => Poll::Ready(Ok(stream)),
},
}
}
fn destroy_outbound(&self, _: Self::OutboundSubstream) {}
fn poll_address_change(&self, _: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
Poll::Pending
}
fn poll_close(&self, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))

View File

@ -301,7 +301,6 @@ impl<T> Multiplexed<T> {
T::Error: Send + Sync,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + Unpin + 'static,
M::OutboundSubstream: Send + 'static,
M::Error: Send + Sync + 'static,
{
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))