core/muxing: Force StreamMuxer::Substream to implement Async{Read,Write} (#2707)

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Thomas Eizinger
2022-06-23 13:52:11 +02:00
committed by GitHub
parent 118046c9e0
commit eb490c08e9
65 changed files with 380 additions and 792 deletions

View File

@ -259,85 +259,6 @@ where
}
}
fn read_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.read_substream(cx, sub, buf).map_err(|e| e.into())
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.read_substream(cx, sub, buf).map_err(|e| e.into())
}
_ => panic!("Wrong API usage"),
}
}
fn write_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.write_substream(cx, sub, buf).map_err(|e| e.into())
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.write_substream(cx, sub, buf).map_err(|e| e.into())
}
_ => panic!("Wrong API usage"),
}
}
fn flush_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.flush_substream(cx, sub).map_err(|e| e.into())
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.flush_substream(cx, sub).map_err(|e| e.into())
}
_ => panic!("Wrong API usage"),
}
}
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.shutdown_substream(cx, sub).map_err(|e| e.into())
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.shutdown_substream(cx, sub).map_err(|e| e.into())
}
_ => panic!("Wrong API usage"),
}
}
fn destroy_substream(&self, substream: Self::Substream) {
match self {
EitherOutput::First(inner) => match substream {
EitherOutput::First(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutput::Second(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage"),
},
}
}
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()),

View File

@ -21,8 +21,7 @@
//! Muxing is the process of splitting a connection into multiple substreams.
//!
//! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer`
//! has ownership of a connection, lets you open and close substreams, and read/write data
//! on open substreams.
//! has ownership of a connection, lets you open and close substreams.
//!
//! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this
//! > is managed by the library's internals.
@ -51,31 +50,27 @@
//! The upgrade process will take ownership of the connection, which makes it possible for the
//! implementation of `StreamMuxer` to control everything that happens on the wire.
use futures::{future, prelude::*, task::Context, task::Poll};
use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use std::{fmt, io, ops::Deref, pin::Pin};
use std::io;
pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
pub use self::singleton::SingletonMuxer;
mod boxed;
mod singleton;
/// Implemented on objects that can open and manage substreams.
/// Provides multiplexing for a connection by allowing users to open substreams.
///
/// The state of a muxer, as exposed by this API, is the following:
///
/// - A connection to the remote. The `poll_event`, `flush_all` and `close` methods operate
/// on this.
/// - A list of substreams that are open. The `poll_outbound`, `read_substream`, `write_substream`,
/// `flush_substream`, `shutdown_substream` and `destroy_substream` methods allow controlling
/// these entries.
/// - A list of outbound substreams being opened. The `open_outbound`, `poll_outbound` and
/// `destroy_outbound` methods allow controlling these entries.
/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
///
/// Inbound substreams are reported via [`StreamMuxer::poll_event`].
/// Outbound substreams can be opened via [`StreamMuxer::open_outbound`] and subsequent polling via
/// [`StreamMuxer::poll_outbound`].
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream;
type Substream: AsyncRead + AsyncWrite;
/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream;
@ -126,86 +121,12 @@ pub trait StreamMuxer {
/// or if you want to interrupt it.
fn destroy_outbound(&self, s: Self::OutboundSubstream);
/// Reads data from a substream. The behaviour is the same as `futures::AsyncRead::poll_read`.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be read. However, for each individual substream, only the latest task that
/// was used to call this method may be notified.
///
/// If `Async::Ready(0)` is returned, the substream has been closed by the remote and should
/// no longer be read afterwards.
///
/// An error can be generated if the connection has been closed, or if a protocol misbehaviour
/// happened.
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>>;
/// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be read. For each individual substream, only the latest task that was used to
/// call this method may be notified.
///
/// Calling `write_substream` does not guarantee that data will arrive to the remote. To
/// ensure that, you should call `flush_substream`.
///
/// It is incorrect to call this method on a substream if you called `shutdown_substream` on
/// this substream earlier.
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>>;
/// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`.
///
/// After this method has been called, data written earlier on the substream is guaranteed to
/// be received by the remote.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be read. For each individual substream, only the latest task that was used to
/// call this method may be notified.
///
/// > **Note**: This method may be implemented as a call to `flush_all`.
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>>;
/// Attempts to shut down the writing side of a substream. The behaviour is similar to
/// `AsyncWrite::poll_close`.
///
/// Contrary to `AsyncWrite::poll_close`, shutting down a substream does not imply
/// `flush_substream`. If you want to make sure that the remote is immediately informed about
/// the shutdown, use `flush_substream` or `flush_all`.
///
/// After this method has been called, you should no longer attempt to write to this substream.
///
/// An error can be generated if the connection has been closed, or if a protocol misbehaviour
/// happened.
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>>;
/// Destroys a substream.
fn destroy_substream(&self, s: Self::Substream);
/// Closes this `StreamMuxer`.
///
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All
/// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns,
/// or polls must generate an error or be ignored.
///
/// Calling this method implies `flush_all`.
///
/// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
@ -247,252 +168,3 @@ impl<T> StreamMuxerEvent<T> {
}
}
}
/// Polls for an event from the muxer and, if an inbound substream, wraps this substream in an
/// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`.
pub fn event_from_ref_and_wrap<P>(
muxer: P,
) -> impl Future<Output = Result<StreamMuxerEvent<SubstreamRef<P>>, <P::Target as StreamMuxer>::Error>>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let muxer2 = muxer.clone();
future::poll_fn(move |cx| muxer.poll_event(cx)).map_ok(|event| match event {
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(substream_from_ref(muxer2, substream))
}
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
})
}
/// Same as `outbound_from_ref`, but wraps the output in an object that
/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`.
pub fn outbound_from_ref_and_wrap<P>(muxer: P) -> OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let inner = outbound_from_ref(muxer);
OutboundSubstreamRefWrapFuture { inner }
}
/// Future returned by `outbound_from_ref_and_wrap`.
pub struct OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
inner: OutboundSubstreamRefFuture<P>,
}
impl<P> Future for OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
type Output = Result<SubstreamRef<P>, <P::Target as StreamMuxer>::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.inner), cx) {
Poll::Ready(Ok(substream)) => {
let out = substream_from_ref(self.inner.muxer.clone(), substream);
Poll::Ready(Ok(out))
}
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
}
}
}
/// Builds a new future for an outbound substream, where the muxer is a reference.
pub fn outbound_from_ref<P>(muxer: P) -> OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
let outbound = muxer.open_outbound();
OutboundSubstreamRefFuture {
muxer,
outbound: Some(outbound),
}
}
/// Future returned by `outbound_from_ref`.
pub struct OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
outbound: Option<<P::Target as StreamMuxer>::OutboundSubstream>,
}
impl<P> Unpin for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
}
impl<P> Future for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
type Output = Result<<P::Target as StreamMuxer>::Substream, <P::Target as StreamMuxer>::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// We use a `this` because the compiler isn't smart enough to allow mutably borrowing
// multiple different fields from the `Pin` at the same time.
let this = &mut *self;
this.muxer
.poll_outbound(cx, this.outbound.as_mut().expect("outbound was empty"))
}
}
impl<P> Drop for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn drop(&mut self) {
self.muxer
.destroy_outbound(self.outbound.take().expect("outbound was empty"))
}
}
/// Builds an implementation of `Read`/`Write`/`AsyncRead`/`AsyncWrite` from an `Arc` to the
/// muxer and a substream.
pub fn substream_from_ref<P>(
muxer: P,
substream: <P::Target as StreamMuxer>::Substream,
) -> SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
SubstreamRef {
muxer,
substream: Some(substream),
shutdown_state: ShutdownState::Shutdown,
}
}
/// Stream returned by `substream_from_ref`.
pub struct SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
substream: Option<<P::Target as StreamMuxer>::Substream>,
shutdown_state: ShutdownState,
}
enum ShutdownState {
Shutdown,
Flush,
Done,
}
impl<P> fmt::Debug for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
<P::Target as StreamMuxer>::Substream: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Substream({:?})", self.substream)
}
}
impl<P> Unpin for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
}
impl<P> AsyncRead for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
// We use a `this` because the compiler isn't smart enough to allow mutably borrowing
// multiple different fields from the `Pin` at the same time.
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
this.muxer.read_substream(cx, s, buf).map_err(|e| e.into())
}
}
impl<P> AsyncWrite for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
// We use a `this` because the compiler isn't smart enough to allow mutably borrowing
// multiple different fields from the `Pin` at the same time.
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
this.muxer.write_substream(cx, s, buf).map_err(|e| e.into())
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// We use a `this` because the compiler isn't smart enough to allow mutably borrowing
// multiple different fields from the `Pin` at the same time.
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
loop {
match this.shutdown_state {
ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) {
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => return Poll::Pending,
},
ShutdownState::Flush => match this.muxer.flush_substream(cx, s) {
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => return Poll::Pending,
},
ShutdownState::Done => {
return Poll::Ready(Ok(()));
}
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// We use a `this` because the compiler isn't smart enough to allow mutably borrowing
// multiple different fields from the `Pin` at the same time.
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
this.muxer.flush_substream(cx, s).map_err(|e| e.into())
}
}
impl<P> Drop for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn drop(&mut self) {
self.muxer
.destroy_substream(self.substream.take().expect("substream was empty"))
}
}

View File

@ -1,27 +1,35 @@
use crate::muxing::StreamMuxerEvent;
use crate::StreamMuxer;
use fnv::FnvHashMap;
use futures::{AsyncRead, AsyncWrite};
use parking_lot::Mutex;
use std::fmt;
use std::io;
use std::io::{IoSlice, IoSliceMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
inner: Box<
dyn StreamMuxer<Substream = usize, OutboundSubstream = usize, Error = io::Error>
dyn StreamMuxer<Substream = SubstreamBox, OutboundSubstream = usize, Error = io::Error>
+ Send
+ Sync,
>,
}
/// Abstract type for asynchronous reading and writing.
///
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead`
/// and `AsyncWrite` capabilities.
pub struct SubstreamBox(Box<dyn AsyncReadWrite + Send + Unpin>);
struct Wrap<T>
where
T: StreamMuxer,
{
inner: T,
substreams: Mutex<FnvHashMap<usize, T::Substream>>,
next_substream: AtomicUsize,
outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
next_outbound: AtomicUsize,
}
@ -29,8 +37,9 @@ where
impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
T::Substream: Send + Unpin + 'static,
{
type Substream = usize; // TODO: use a newtype
type Substream = SubstreamBox;
type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;
@ -48,9 +57,9 @@ where
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id)))
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(SubstreamBox::new(
substream,
))))
}
#[inline]
@ -76,9 +85,8 @@ where
Poll::Ready(Ok(s)) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Poll::Ready(Ok(id))
Poll::Ready(Ok(SubstreamBox::new(substream)))
}
#[inline]
@ -88,63 +96,6 @@ where
.destroy_outbound(list.remove(&substream).unwrap())
}
#[inline]
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.read_substream(cx, list.get_mut(s).unwrap(), buf)
.map_err(|e| e.into())
}
#[inline]
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.write_substream(cx, list.get_mut(s).unwrap(), buf)
.map_err(|e| e.into())
}
#[inline]
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.flush_substream(cx, list.get_mut(s).unwrap())
.map_err(|e| e.into())
}
#[inline]
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.shutdown_substream(cx, list.get_mut(s).unwrap())
.map_err(|e| e.into())
}
#[inline]
fn destroy_substream(&self, substream: Self::Substream) {
let mut list = self.substreams.lock();
self.inner
.destroy_substream(list.remove(&substream).unwrap())
}
#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(|e| e.into())
@ -157,12 +108,10 @@ impl StreamMuxerBox {
where
T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send,
T::Substream: Send,
T::Substream: Send + Unpin + 'static,
{
let wrap = Wrap {
inner: muxer,
substreams: Mutex::new(Default::default()),
next_substream: AtomicUsize::new(0),
outbound: Mutex::new(Default::default()),
next_outbound: AtomicUsize::new(0),
};
@ -174,7 +123,7 @@ impl StreamMuxerBox {
}
impl StreamMuxer for StreamMuxerBox {
type Substream = usize; // TODO: use a newtype
type Substream = SubstreamBox;
type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;
@ -205,51 +154,82 @@ impl StreamMuxer for StreamMuxerBox {
self.inner.destroy_outbound(substream)
}
#[inline]
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
self.inner.read_substream(cx, s, buf)
}
#[inline]
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
self.inner.write_substream(cx, s, buf)
}
#[inline]
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
self.inner.flush_substream(cx, s)
}
#[inline]
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
self.inner.shutdown_substream(cx, s)
}
#[inline]
fn destroy_substream(&self, s: Self::Substream) {
self.inner.destroy_substream(s)
}
#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx)
}
}
impl SubstreamBox {
/// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`].
pub fn new<S: AsyncRead + AsyncWrite + Send + Unpin + 'static>(stream: S) -> Self {
Self(Box::new(stream))
}
}
impl fmt::Debug for SubstreamBox {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SubstreamBox({})", self.0.type_name())
}
}
/// Workaround because Rust does not allow `Box<dyn AsyncRead + AsyncWrite>`.
trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin {
/// Helper function to capture the erased inner type.
///
/// Used to make the [`Debug`] implementation of [`SubstreamBox`] more useful.
fn type_name(&self) -> &'static str;
}
impl<S> AsyncReadWrite for S
where
S: AsyncRead + AsyncWrite + Unpin,
{
fn type_name(&self) -> &'static str {
std::any::type_name::<S>()
}
}
impl AsyncRead for SubstreamBox {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs)
}
}
impl AsyncWrite for SubstreamBox {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_close(cx)
}
}

View File

@ -24,14 +24,8 @@ use crate::{
};
use futures::prelude::*;
use parking_lot::Mutex;
use std::{
io,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::Context,
task::Poll,
};
use std::cell::Cell;
use std::{io, task::Context, task::Poll};
/// Implementation of `StreamMuxer` that allows only one substream on top of a connection,
/// yielding the connection itself.
@ -40,9 +34,7 @@ use std::{
/// Most notably, no protocol is negotiated.
pub struct SingletonMuxer<TSocket> {
/// The inner connection.
inner: Mutex<TSocket>,
/// If true, a substream has been produced and any further attempt should fail.
substream_extracted: AtomicBool,
inner: Cell<Option<TSocket>>,
/// Our local endpoint. Always the same value as was passed to `new`.
endpoint: Endpoint,
}
@ -54,15 +46,12 @@ impl<TSocket> SingletonMuxer<TSocket> {
/// If `endpoint` is `Listener`, then only one inbound substream will be permitted.
pub fn new(inner: TSocket, endpoint: Endpoint) -> Self {
SingletonMuxer {
inner: Mutex::new(inner),
substream_extracted: AtomicBool::new(false),
inner: Cell::new(Some(inner)),
endpoint,
}
}
}
/// Substream of the `SingletonMuxer`.
pub struct Substream {}
/// Outbound substream attempt of the `SingletonMuxer`.
pub struct OutboundSubstream {}
@ -70,7 +59,7 @@ impl<TSocket> StreamMuxer for SingletonMuxer<TSocket>
where
TSocket: AsyncRead + AsyncWrite + Unpin,
{
type Substream = Substream;
type Substream = TSocket;
type OutboundSubstream = OutboundSubstream;
type Error = io::Error;
@ -83,8 +72,8 @@ where
Endpoint::Listener => {}
}
if !self.substream_extracted.swap(true, Ordering::Relaxed) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream {})))
if let Some(stream) = self.inner.replace(None) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
} else {
Poll::Pending
}
@ -104,8 +93,8 @@ where
Endpoint::Dialer => {}
}
if !self.substream_extracted.swap(true, Ordering::Relaxed) {
Poll::Ready(Ok(Substream {}))
if let Some(stream) = self.inner.replace(None) {
Poll::Ready(Ok(stream))
} else {
Poll::Pending
}
@ -113,42 +102,6 @@ where
fn destroy_outbound(&self, _: Self::OutboundSubstream) {}
fn read_substream(
&self,
cx: &mut Context<'_>,
_: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
AsyncRead::poll_read(Pin::new(&mut *self.inner.lock()), cx, buf)
}
fn write_substream(
&self,
cx: &mut Context<'_>,
_: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
AsyncWrite::poll_write(Pin::new(&mut *self.inner.lock()), cx, buf)
}
fn flush_substream(
&self,
cx: &mut Context<'_>,
_: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx)
}
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
_: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_close(Pin::new(&mut *self.inner.lock()), cx)
}
fn destroy_substream(&self, _: Self::Substream) {}
fn poll_close(&self, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

View File

@ -300,7 +300,7 @@ impl<T> Multiplexed<T> {
T::ListenerUpgrade: Send + 'static,
T::Error: Send + Sync,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + 'static,
M::Substream: Send + Unpin + 'static,
M::OutboundSubstream: Send + 'static,
{
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))