Add StreamMuxer::flush. (#534)

Update the `StreamMuxer` trait.

- `read_substream`, `write_substream` and `flush_substream` now return `Poll` instead of `Result`.
- A new `Shutdown` enum allows for half-closing of substreams and is used in `shutdown_substream`.
- `close_inbound` and `close_outbound` have been merged into `shutdown` which takes a `Shutdown` parameter to allow closing only one direction.
- Add a new `flush_all` method  to allow flushing after a series of actions (e.g. multiple `shutdown_substream`).

W.r.t. flushing the general idea is that normal use drains buffers over time. Shutting down a substream does not imply flushing, so can be followed by `flush_substream` or (if multiple substreams are to be shut down) a single `flush_all`. Shutting down the muxer itself proceeds likewise, i.e. `shutdown` followed by `flush_all`.
This commit is contained in:
Toralf Wittner
2018-10-11 10:35:14 +02:00
committed by GitHub
parent 0c7f313146
commit 7016b86b3a
7 changed files with 223 additions and 203 deletions

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use futures::{prelude::*, future};
use muxing::StreamMuxer;
use muxing::{Shutdown, StreamMuxer};
use std::io::{Error as IoError, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};
@ -146,49 +146,49 @@ where
}
}
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result<usize, IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.read_substream(substream, buf)
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.read_substream(sub, buf)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.read_substream(substream, buf)
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.read_substream(sub, buf)
},
_ => panic!("Wrong API usage")
}
}
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result<usize, IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.write_substream(substream, buf)
fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.write_substream(sub, buf)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.write_substream(substream, buf)
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.write_substream(sub, buf)
},
_ => panic!("Wrong API usage")
}
}
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.flush_substream(substream)
fn flush_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.flush_substream(sub)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.flush_substream(substream)
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.flush_substream(sub)
},
_ => panic!("Wrong API usage")
}
}
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.shutdown_substream(substream)
fn shutdown_substream(&self, sub: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.shutdown_substream(sub, kind)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.shutdown_substream(substream)
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.shutdown_substream(sub, kind)
},
_ => panic!("Wrong API usage")
}
@ -211,17 +211,17 @@ where
}
}
fn close_inbound(&self) {
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> {
match *self {
EitherOutput::First(ref inner) => inner.close_inbound(),
EitherOutput::Second(ref inner) => inner.close_inbound(),
EitherOutput::First(ref inner) => inner.shutdown(kind),
EitherOutput::Second(ref inner) => inner.shutdown(kind)
}
}
fn close_outbound(&self) {
fn flush_all(&self) -> Poll<(), IoError> {
match *self {
EitherOutput::First(ref inner) => inner.close_outbound(),
EitherOutput::Second(ref inner) => inner.close_outbound(),
EitherOutput::First(ref inner) => inner.flush_all(),
EitherOutput::Second(ref inner) => inner.flush_all()
}
}
}

View File

@ -21,12 +21,22 @@
use fnv::FnvHashMap;
use futures::{future, prelude::*};
use parking_lot::Mutex;
use std::io::{Error as IoError, Read, Write};
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write};
use std::ops::Deref;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_io::{AsyncRead, AsyncWrite};
/// Ways to shutdown a substream or stream muxer.
pub enum Shutdown {
/// Shutdown inbound direction.
Inbound,
/// Shutdown outbound direction.
Outbound,
/// Shutdown everything.
All
}
/// Implemented on objects that can open and manage substreams.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
@ -61,66 +71,61 @@ pub trait StreamMuxer {
///
/// May panic or produce an undefined result if an earlier polling of the same substream
/// returned `Ready` or `Err`.
fn poll_outbound(
&self,
substream: &mut Self::OutboundSubstream,
) -> Poll<Option<Self::Substream>, IoError>;
fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError>;
/// Destroys an outbound substream. Use this after the outbound substream has finished, or if
/// you want to interrupt it.
fn destroy_outbound(&self, substream: Self::OutboundSubstream);
fn destroy_outbound(&self, s: Self::OutboundSubstream);
/// Reads data from a substream. The behaviour is the same as `std::io::Read::read`.
///
/// If `WouldBlock` is returned, then the current task will be notified once the substream
/// is ready to be read, similar to the API of `AsyncRead`.
/// However, for each individual substream, only the latest task that was used to call this
/// method may be notified.
fn read_substream(
&self,
substream: &mut Self::Substream,
buf: &mut [u8],
) -> Result<usize, IoError>;
/// Write data to a substream. The behaviour is the same as `std::io::Write::write`.
///
/// If `WouldBlock` is returned, then the current task will be notified once the substream
/// is ready to be written, similar to the API of `AsyncWrite`.
/// However, for each individual substream, only the latest task that was used to call this
/// method may be notified.
fn write_substream(
&self,
substream: &mut Self::Substream,
buf: &[u8],
) -> Result<usize, IoError>;
/// Flushes a substream. The behaviour is the same as `std::io::Write::flush`.
///
/// If `WouldBlock` is returned, then the current task will be notified once the substream
/// is ready to be flushed, similar to the API of `AsyncWrite`.
/// However, for each individual substream, only the latest task that was used to call this
/// method may be notified.
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError>;
/// Attempts to shut down a substream. The behaviour is the same as
/// `tokio_io::AsyncWrite::shutdown`.
/// Reads data from a substream. The behaviour is the same as `tokio_io::AsyncRead::poll_read`.
///
/// If `NotReady` is returned, then the current task will be notified once the substream
/// is ready to be shut down, similar to the API of `AsyncWrite::shutdown()`.
/// However, for each individual substream, only the latest task that was used to call this
/// method may be notified.
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError>;
/// is ready to be read. However, for each individual substream, only the latest task that
/// was used to call this method may be notified.
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError>;
/// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`.
///
/// If `NotReady` is returned, then the current task will be notified once the substream
/// is ready to be read. However, for each individual substream, only the latest task that
/// was used to call this method may be notified.
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError>;
/// Flushes a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_flush`.
///
/// If `NotReady` is returned, then the current task will be notified once the substream
/// is ready to be read. However, for each individual substream, only the latest task that
/// was used to call this method may be notified.
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError>;
/// Attempts to shut down a substream. The behaviour is similar to
/// `tokio_io::AsyncWrite::shutdown`.
///
/// Shutting down a substream does not imply `flush_substream`. If you want to make sure
/// that the remote is immediately informed about the shutdown, use `flush_substream` or
/// `flush`.
fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError>;
/// Destroys a substream.
fn destroy_substream(&self, substream: Self::Substream);
/// If supported, sends a hint to the remote that we may no longer accept any further inbound
/// substream. Calling `poll_inbound` afterwards may or may not produce `None`.
fn close_inbound(&self);
fn destroy_substream(&self, s: Self::Substream);
/// Shutdown this `StreamMuxer`.
///
/// If supported, sends a hint to the remote that we may no longer open any further outbound
/// substream. Calling `poll_outbound` afterwards may or may not produce `None`.
fn close_outbound(&self);
/// or inbound substream. Calling `poll_outbound` or `poll_inbound` afterwards may or may not
/// produce `None`.
///
/// Shutting down the muxer does not imply `flush_all`. If you want to make sure that the
/// remote is immediately informed about the shutdown, use `flush_all`.
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError>;
/// Flush this `StreamMuxer`.
///
/// This drains any write buffers of substreams and otherwise and delivers any pending shutdown
/// notifications due to `shutdown_substream` or `shutdown`. One may thus shutdown groups of
/// substreams followed by a final `flush_all` instead of having to do `flush_substream` for
/// each.
fn flush_all(&self) -> Poll<(), IoError>;
}
/// Polls for an inbound from the muxer but wraps the output in an object that
@ -276,8 +281,11 @@ where
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
self.muxer
.read_substream(self.substream.as_mut().expect("substream was empty"), buf)
let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.read_substream(s, buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(IoErrorKind::WouldBlock.into())
}
}
}
@ -286,6 +294,11 @@ where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, IoError> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.read_substream(s, buf)
}
}
impl<P> Write for SubstreamRef<P>
@ -295,14 +308,20 @@ where
{
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
self.muxer
.write_substream(self.substream.as_mut().expect("substream was empty"), buf)
let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.write_substream(s, buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(IoErrorKind::WouldBlock.into())
}
}
#[inline]
fn flush(&mut self) -> Result<(), IoError> {
self.muxer
.flush_substream(self.substream.as_mut().expect("substream was empty"))
let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.flush_substream(s)? {
Async::Ready(()) => Ok(()),
Async::NotReady => Err(IoErrorKind::WouldBlock.into())
}
}
}
@ -311,10 +330,23 @@ where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, IoError> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.write_substream(s, buf)
}
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> {
self.muxer
.shutdown_substream(self.substream.as_mut().expect("substream was empty"))
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.shutdown_substream(s, Shutdown::All)?;
Ok(Async::Ready(()))
}
#[inline]
fn poll_flush(&mut self) -> Poll<(), IoError> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.flush_substream(s)
}
}
@ -325,8 +357,7 @@ where
{
#[inline]
fn drop(&mut self) {
self.muxer
.destroy_substream(self.substream.take().expect("substream was empty"))
self.muxer.destroy_substream(self.substream.take().expect("substream was empty"))
}
}
@ -372,11 +403,8 @@ impl StreamMuxer for StreamMuxerBox {
}
#[inline]
fn poll_outbound(
&self,
substream: &mut Self::OutboundSubstream,
) -> Poll<Option<Self::Substream>, IoError> {
self.inner.poll_outbound(substream)
fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> {
self.inner.poll_outbound(s)
}
#[inline]
@ -385,47 +413,38 @@ impl StreamMuxer for StreamMuxerBox {
}
#[inline]
fn read_substream(
&self,
substream: &mut Self::Substream,
buf: &mut [u8],
) -> Result<usize, IoError>
{
self.inner.read_substream(substream, buf)
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
self.inner.read_substream(s, buf)
}
#[inline]
fn write_substream(
&self,
substream: &mut Self::Substream,
buf: &[u8],
) -> Result<usize, IoError> {
self.inner.write_substream(substream, buf)
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
self.inner.write_substream(s, buf)
}
#[inline]
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
self.inner.flush_substream(substream)
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> {
self.inner.flush_substream(s)
}
#[inline]
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
self.inner.shutdown_substream(substream)
fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> {
self.inner.shutdown_substream(s, kind)
}
#[inline]
fn destroy_substream(&self, substream: Self::Substream) {
self.inner.destroy_substream(substream)
fn destroy_substream(&self, s: Self::Substream) {
self.inner.destroy_substream(s)
}
#[inline]
fn close_inbound(&self) {
self.inner.close_inbound()
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> {
self.inner.shutdown(kind)
}
#[inline]
fn close_outbound(&self) {
self.inner.close_outbound()
fn flush_all(&self) -> Poll<(), IoError> {
self.inner.flush_all()
}
}
@ -484,36 +503,27 @@ impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
}
#[inline]
fn read_substream(
&self,
substream: &mut Self::Substream,
buf: &mut [u8],
) -> Result<usize, IoError>
{
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
let mut list = self.substreams.lock();
self.inner.read_substream(list.get_mut(substream).unwrap(), buf)
self.inner.read_substream(list.get_mut(s).unwrap(), buf)
}
#[inline]
fn write_substream(
&self,
substream: &mut Self::Substream,
buf: &[u8],
) -> Result<usize, IoError> {
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
let mut list = self.substreams.lock();
self.inner.write_substream(list.get_mut(substream).unwrap(), buf)
self.inner.write_substream(list.get_mut(s).unwrap(), buf)
}
#[inline]
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> {
let mut list = self.substreams.lock();
self.inner.flush_substream(list.get_mut(substream).unwrap())
self.inner.flush_substream(list.get_mut(s).unwrap())
}
#[inline]
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> {
let mut list = self.substreams.lock();
self.inner.shutdown_substream(list.get_mut(substream).unwrap())
self.inner.shutdown_substream(list.get_mut(s).unwrap(), kind)
}
#[inline]
@ -523,12 +533,12 @@ impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
}
#[inline]
fn close_inbound(&self) {
self.inner.close_inbound()
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> {
self.inner.shutdown(kind)
}
#[inline]
fn close_outbound(&self) {
self.inner.close_outbound()
fn flush_all(&self) -> Poll<(), IoError> {
self.inner.flush_all()
}
}

View File

@ -266,7 +266,7 @@ where
mod tests {
use super::*;
use futures::future;
use muxing::StreamMuxer;
use muxing::{StreamMuxer, Shutdown};
use tokio::runtime::current_thread;
// TODO: move somewhere? this could be useful as a dummy
@ -278,13 +278,13 @@ mod tests {
fn open_outbound(&self) -> Self::OutboundSubstream { () }
fn poll_outbound(&self, _: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> { Ok(Async::Ready(None)) }
fn destroy_outbound(&self, _: Self::OutboundSubstream) {}
fn read_substream(&self, _: &mut Self::Substream, _: &mut [u8]) -> Result<usize, IoError> { panic!() }
fn write_substream(&self, _: &mut Self::Substream, _: &[u8]) -> Result<usize, IoError> { panic!() }
fn flush_substream(&self, _: &mut Self::Substream) -> Result<(), IoError> { panic!() }
fn shutdown_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { panic!() }
fn read_substream(&self, _: &mut Self::Substream, _: &mut [u8]) -> Poll<usize, IoError> { panic!() }
fn write_substream(&self, _: &mut Self::Substream, _: &[u8]) -> Poll<usize, IoError> { panic!() }
fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { panic!() }
fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { panic!() }
fn destroy_substream(&self, _: Self::Substream) { panic!() }
fn close_inbound(&self) {}
fn close_outbound(&self) {}
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { Ok(Async::Ready(())) }
fn flush_all(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) }
}
#[test]

View File

@ -310,11 +310,18 @@ where
for (_, outbound) in self.outbound_substreams.drain() {
self.muxer.destroy_outbound(outbound);
}
if !self.inbound_finished {
self.muxer.close_inbound();
}
if !self.outbound_finished {
self.muxer.close_outbound();
// TODO: Maybe the shutdown logic should not be part of the destructor?
match (self.inbound_finished, self.outbound_finished) {
(true, true) => {}
(true, false) => {
let _ = self.muxer.shutdown(muxing::Shutdown::Outbound);
}
(false, true) => {
let _ = self.muxer.shutdown(muxing::Shutdown::Inbound);
}
(false, false) => {
let _ = self.muxer.shutdown(muxing::Shutdown::All);
}
}
}
}

View File

@ -25,7 +25,7 @@
extern crate futures;
use std::io::Error as IoError;
use muxing::StreamMuxer;
use muxing::{StreamMuxer, Shutdown};
use futures::prelude::*;
/// Substream type
@ -91,12 +91,12 @@ impl StreamMuxer for DummyMuxer {
DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream{}))),
}
}
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {}
fn read_substream(&self, _substream: &mut Self::Substream, _buf: &mut [u8]) -> Result<usize, IoError> { unreachable!() }
fn write_substream(&self, _substream: &mut Self::Substream, _buf: &[u8]) -> Result<usize, IoError> { unreachable!() }
fn flush_substream(&self, _substream: &mut Self::Substream) -> Result<(), IoError> { unreachable!() }
fn shutdown_substream(&self, _substream: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() }
fn destroy_substream(&self, _substream: Self::Substream) {}
fn close_inbound(&self) {}
fn close_outbound(&self) {}
fn destroy_outbound(&self, _: Self::OutboundSubstream) {}
fn read_substream(&self, _: &mut Self::Substream, _buf: &mut [u8]) -> Poll<usize, IoError> { unreachable!() }
fn write_substream(&self, _: &mut Self::Substream, _buf: &[u8]) -> Poll<usize, IoError> { unreachable!() }
fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() }
fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { unreachable!() }
fn destroy_substream(&self, _: Self::Substream) {}
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { Ok(Async::Ready(())) }
fn flush_all(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) }
}

View File

@ -36,7 +36,7 @@ use std::{cmp, iter, mem};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use bytes::Bytes;
use core::{ConnectionUpgrade, Endpoint, StreamMuxer};
use core::{ConnectionUpgrade, Endpoint, StreamMuxer, muxing::Shutdown};
use parking_lot::Mutex;
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
@ -423,13 +423,13 @@ where C: AsyncRead + AsyncWrite
// Nothing to do.
}
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result<usize, IoError> {
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
loop {
// First, transfer from `current_data`.
if substream.current_data.len() != 0 {
let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
return Ok(len);
return Ok(Async::Ready(len));
}
// Try to find a packet of data in the buffer.
@ -449,22 +449,22 @@ where C: AsyncRead + AsyncWrite
// just read and wait for the next iteration.
match next_data_poll {
Ok(Async::Ready(Some(data))) => substream.current_data = data,
Ok(Async::Ready(None)) => return Ok(0),
Ok(Async::Ready(None)) => return Ok(Async::Ready(0)),
Ok(Async::NotReady) => {
// There was no data packet in the buffer about this substream ; maybe it's
// because it has been closed.
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
return Err(IoErrorKind::WouldBlock.into());
return Ok(Async::NotReady)
} else {
return Ok(0);
return Ok(Async::Ready(0))
}
},
Err(err) => return Err(err),
Err(err) => return Err(err)
}
}
}
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result<usize, IoError> {
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
let mut inner = self.inner.lock();
let to_write = cmp::min(buf.len(), inner.config.split_send_size);
@ -475,50 +475,51 @@ where C: AsyncRead + AsyncWrite
endpoint: substream.endpoint,
};
match poll_send(&mut inner, elem) {
Ok(Async::Ready(())) => Ok(to_write),
Ok(Async::NotReady) => Err(IoErrorKind::WouldBlock.into()),
Err(err) => Err(err),
match poll_send(&mut inner, elem)? {
Async::Ready(()) => Ok(Async::Ready(to_write)),
Async::NotReady => Ok(Async::NotReady)
}
}
fn flush_substream(&self, _substream: &mut Self::Substream) -> Result<(), IoError> {
fn flush_substream(&self, _substream: &mut Self::Substream) -> Poll<(), IoError> {
let mut inner = self.inner.lock();
let inner = &mut *inner; // Avoids borrow errors
match inner.inner.poll_flush_notify(&inner.notifier_write, 0) {
Ok(Async::Ready(())) => Ok(()),
Ok(Async::NotReady) => {
match inner.inner.poll_flush_notify(&inner.notifier_write, 0)? {
Async::Ready(()) => Ok(Async::Ready(())),
Async::NotReady => {
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
Err(IoErrorKind::WouldBlock.into())
},
Err(err) => Err(err),
Ok(Async::NotReady)
}
}
}
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
fn shutdown_substream(&self, sub: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> {
let elem = codec::Elem::Reset {
substream_id: substream.num,
endpoint: substream.endpoint,
substream_id: sub.num,
endpoint: sub.endpoint,
};
let mut inner = self.inner.lock();
poll_send(&mut inner, elem)
}
fn destroy_substream(&self, mut substream: Self::Substream) {
let _ = self.shutdown_substream(&mut substream); // TODO: this doesn't necessarily send the close message
fn destroy_substream(&self, sub: Self::Substream) {
self.inner.lock().buffer.retain(|elem| {
elem.substream_id() != substream.num || elem.endpoint() == Some(substream.endpoint)
elem.substream_id() != sub.num || elem.endpoint() == Some(sub.endpoint)
})
}
#[inline]
fn close_inbound(&self) {
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> {
let inner = &mut *self.inner.lock();
inner.inner.close_notify(&inner.notifier_write, 0)
}
#[inline]
fn close_outbound(&self) {
fn flush_all(&self) -> Poll<(), IoError> {
let inner = &mut *self.inner.lock();
inner.inner.poll_flush_notify(&inner.notifier_write, 0)
}
}

View File

@ -28,11 +28,11 @@ extern crate tokio_io;
extern crate yamux;
use bytes::Bytes;
use core::Endpoint;
use core::{Endpoint, muxing::Shutdown};
use futures::{future::{self, FutureResult}, prelude::*};
use parking_lot::Mutex;
use std::{io, iter};
use std::io::{Read, Write, Error as IoError};
use std::io::{Error as IoError};
use tokio_io::{AsyncRead, AsyncWrite};
@ -79,39 +79,41 @@ where
}
#[inline]
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
fn destroy_outbound(&self, _: Self::OutboundSubstream) {
}
#[inline]
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result<usize, IoError> {
substream.read(buf)
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
sub.poll_read(buf)
}
#[inline]
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result<usize, IoError> {
substream.write(buf)
fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
sub.poll_write(buf)
}
#[inline]
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
substream.flush()
fn flush_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> {
sub.poll_flush()
}
#[inline]
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
substream.shutdown()
fn shutdown_substream(&self, sub: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> {
sub.shutdown()
}
#[inline]
fn destroy_substream(&self, _substream: Self::Substream) {
fn destroy_substream(&self, _: Self::Substream) {
}
#[inline]
fn close_inbound(&self) {
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> {
Ok(Async::Ready(())) // TODO
}
#[inline]
fn close_outbound(&self) {
fn flush_all(&self) -> Poll<(), IoError> {
self.0.lock().flush()
}
}