diff --git a/core/src/either.rs b/core/src/either.rs index 25498801..30a8b18d 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::{prelude::*, future}; -use muxing::StreamMuxer; +use muxing::{Shutdown, StreamMuxer}; use std::io::{Error as IoError, Read, Write}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -146,49 +146,49 @@ where } } - fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { - inner.read_substream(substream, buf) + fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll { + match (self, sub) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { + inner.read_substream(sub, buf) }, - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { - inner.read_substream(substream, buf) + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { + inner.read_substream(sub, buf) }, _ => panic!("Wrong API usage") } } - fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { - inner.write_substream(substream, buf) + fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll { + match (self, sub) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { + inner.write_substream(sub, buf) }, - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { - inner.write_substream(substream, buf) + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { + inner.write_substream(sub, buf) }, _ => panic!("Wrong API usage") } } - fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { - inner.flush_substream(substream) + fn flush_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { + match (self, sub) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { + inner.flush_substream(sub) }, - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { - inner.flush_substream(substream) + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { + inner.flush_substream(sub) }, _ => panic!("Wrong API usage") } } - fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { - inner.shutdown_substream(substream) + fn shutdown_substream(&self, sub: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> { + match (self, sub) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { + inner.shutdown_substream(sub, kind) }, - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { - inner.shutdown_substream(substream) + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { + inner.shutdown_substream(sub, kind) }, _ => panic!("Wrong API usage") } @@ -211,17 +211,17 @@ where } } - fn close_inbound(&self) { + fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> { match *self { - EitherOutput::First(ref inner) => inner.close_inbound(), - EitherOutput::Second(ref inner) => inner.close_inbound(), + EitherOutput::First(ref inner) => inner.shutdown(kind), + EitherOutput::Second(ref inner) => inner.shutdown(kind) } } - fn close_outbound(&self) { + fn flush_all(&self) -> Poll<(), IoError> { match *self { - EitherOutput::First(ref inner) => inner.close_outbound(), - EitherOutput::Second(ref inner) => inner.close_outbound(), + EitherOutput::First(ref inner) => inner.flush_all(), + EitherOutput::Second(ref inner) => inner.flush_all() } } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index b3c0cc8a..46dc28b8 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -21,12 +21,22 @@ use fnv::FnvHashMap; use futures::{future, prelude::*}; use parking_lot::Mutex; -use std::io::{Error as IoError, Read, Write}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; use std::ops::Deref; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; +/// Ways to shutdown a substream or stream muxer. +pub enum Shutdown { + /// Shutdown inbound direction. + Inbound, + /// Shutdown outbound direction. + Outbound, + /// Shutdown everything. + All +} + /// Implemented on objects that can open and manage substreams. pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. @@ -61,66 +71,61 @@ pub trait StreamMuxer { /// /// May panic or produce an undefined result if an earlier polling of the same substream /// returned `Ready` or `Err`. - fn poll_outbound( - &self, - substream: &mut Self::OutboundSubstream, - ) -> Poll, IoError>; + fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll, IoError>; /// Destroys an outbound substream. Use this after the outbound substream has finished, or if /// you want to interrupt it. - fn destroy_outbound(&self, substream: Self::OutboundSubstream); + fn destroy_outbound(&self, s: Self::OutboundSubstream); - /// Reads data from a substream. The behaviour is the same as `std::io::Read::read`. - /// - /// If `WouldBlock` is returned, then the current task will be notified once the substream - /// is ready to be read, similar to the API of `AsyncRead`. - /// However, for each individual substream, only the latest task that was used to call this - /// method may be notified. - fn read_substream( - &self, - substream: &mut Self::Substream, - buf: &mut [u8], - ) -> Result; - - /// Write data to a substream. The behaviour is the same as `std::io::Write::write`. - /// - /// If `WouldBlock` is returned, then the current task will be notified once the substream - /// is ready to be written, similar to the API of `AsyncWrite`. - /// However, for each individual substream, only the latest task that was used to call this - /// method may be notified. - fn write_substream( - &self, - substream: &mut Self::Substream, - buf: &[u8], - ) -> Result; - - /// Flushes a substream. The behaviour is the same as `std::io::Write::flush`. - /// - /// If `WouldBlock` is returned, then the current task will be notified once the substream - /// is ready to be flushed, similar to the API of `AsyncWrite`. - /// However, for each individual substream, only the latest task that was used to call this - /// method may be notified. - fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError>; - - /// Attempts to shut down a substream. The behaviour is the same as - /// `tokio_io::AsyncWrite::shutdown`. + /// Reads data from a substream. The behaviour is the same as `tokio_io::AsyncRead::poll_read`. /// /// If `NotReady` is returned, then the current task will be notified once the substream - /// is ready to be shut down, similar to the API of `AsyncWrite::shutdown()`. - /// However, for each individual substream, only the latest task that was used to call this - /// method may be notified. - fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError>; + /// is ready to be read. However, for each individual substream, only the latest task that + /// was used to call this method may be notified. + fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll; + + /// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`. + /// + /// If `NotReady` is returned, then the current task will be notified once the substream + /// is ready to be read. However, for each individual substream, only the latest task that + /// was used to call this method may be notified. + fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll; + + /// Flushes a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_flush`. + /// + /// If `NotReady` is returned, then the current task will be notified once the substream + /// is ready to be read. However, for each individual substream, only the latest task that + /// was used to call this method may be notified. + fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError>; + + /// Attempts to shut down a substream. The behaviour is similar to + /// `tokio_io::AsyncWrite::shutdown`. + /// + /// Shutting down a substream does not imply `flush_substream`. If you want to make sure + /// that the remote is immediately informed about the shutdown, use `flush_substream` or + /// `flush`. + fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError>; /// Destroys a substream. - fn destroy_substream(&self, substream: Self::Substream); - - /// If supported, sends a hint to the remote that we may no longer accept any further inbound - /// substream. Calling `poll_inbound` afterwards may or may not produce `None`. - fn close_inbound(&self); + fn destroy_substream(&self, s: Self::Substream); + /// Shutdown this `StreamMuxer`. + /// /// If supported, sends a hint to the remote that we may no longer open any further outbound - /// substream. Calling `poll_outbound` afterwards may or may not produce `None`. - fn close_outbound(&self); + /// or inbound substream. Calling `poll_outbound` or `poll_inbound` afterwards may or may not + /// produce `None`. + /// + /// Shutting down the muxer does not imply `flush_all`. If you want to make sure that the + /// remote is immediately informed about the shutdown, use `flush_all`. + fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError>; + + /// Flush this `StreamMuxer`. + /// + /// This drains any write buffers of substreams and otherwise and delivers any pending shutdown + /// notifications due to `shutdown_substream` or `shutdown`. One may thus shutdown groups of + /// substreams followed by a final `flush_all` instead of having to do `flush_substream` for + /// each. + fn flush_all(&self) -> Poll<(), IoError>; } /// Polls for an inbound from the muxer but wraps the output in an object that @@ -276,8 +281,11 @@ where { #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { - self.muxer - .read_substream(self.substream.as_mut().expect("substream was empty"), buf) + let s = self.substream.as_mut().expect("substream was empty"); + match self.muxer.read_substream(s, buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(IoErrorKind::WouldBlock.into()) + } } } @@ -286,6 +294,11 @@ where P: Deref, P::Target: StreamMuxer, { + #[inline] + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + let s = self.substream.as_mut().expect("substream was empty"); + self.muxer.read_substream(s, buf) + } } impl

Write for SubstreamRef

@@ -295,14 +308,20 @@ where { #[inline] fn write(&mut self, buf: &[u8]) -> Result { - self.muxer - .write_substream(self.substream.as_mut().expect("substream was empty"), buf) + let s = self.substream.as_mut().expect("substream was empty"); + match self.muxer.write_substream(s, buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(IoErrorKind::WouldBlock.into()) + } } #[inline] fn flush(&mut self) -> Result<(), IoError> { - self.muxer - .flush_substream(self.substream.as_mut().expect("substream was empty")) + let s = self.substream.as_mut().expect("substream was empty"); + match self.muxer.flush_substream(s)? { + Async::Ready(()) => Ok(()), + Async::NotReady => Err(IoErrorKind::WouldBlock.into()) + } } } @@ -311,10 +330,23 @@ where P: Deref, P::Target: StreamMuxer, { + #[inline] + fn poll_write(&mut self, buf: &[u8]) -> Poll { + let s = self.substream.as_mut().expect("substream was empty"); + self.muxer.write_substream(s, buf) + } + #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { - self.muxer - .shutdown_substream(self.substream.as_mut().expect("substream was empty")) + let s = self.substream.as_mut().expect("substream was empty"); + self.muxer.shutdown_substream(s, Shutdown::All)?; + Ok(Async::Ready(())) + } + + #[inline] + fn poll_flush(&mut self) -> Poll<(), IoError> { + let s = self.substream.as_mut().expect("substream was empty"); + self.muxer.flush_substream(s) } } @@ -325,8 +357,7 @@ where { #[inline] fn drop(&mut self) { - self.muxer - .destroy_substream(self.substream.take().expect("substream was empty")) + self.muxer.destroy_substream(self.substream.take().expect("substream was empty")) } } @@ -372,11 +403,8 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn poll_outbound( - &self, - substream: &mut Self::OutboundSubstream, - ) -> Poll, IoError> { - self.inner.poll_outbound(substream) + fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll, IoError> { + self.inner.poll_outbound(s) } #[inline] @@ -385,47 +413,38 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn read_substream( - &self, - substream: &mut Self::Substream, - buf: &mut [u8], - ) -> Result - { - self.inner.read_substream(substream, buf) + fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { + self.inner.read_substream(s, buf) } #[inline] - fn write_substream( - &self, - substream: &mut Self::Substream, - buf: &[u8], - ) -> Result { - self.inner.write_substream(substream, buf) + fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll { + self.inner.write_substream(s, buf) } #[inline] - fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> { - self.inner.flush_substream(substream) + fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> { + self.inner.flush_substream(s) } #[inline] - fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { - self.inner.shutdown_substream(substream) + fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> { + self.inner.shutdown_substream(s, kind) } #[inline] - fn destroy_substream(&self, substream: Self::Substream) { - self.inner.destroy_substream(substream) + fn destroy_substream(&self, s: Self::Substream) { + self.inner.destroy_substream(s) } #[inline] - fn close_inbound(&self) { - self.inner.close_inbound() + fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> { + self.inner.shutdown(kind) } #[inline] - fn close_outbound(&self) { - self.inner.close_outbound() + fn flush_all(&self) -> Poll<(), IoError> { + self.inner.flush_all() } } @@ -484,36 +503,27 @@ impl StreamMuxer for Wrap where T: StreamMuxer { } #[inline] - fn read_substream( - &self, - substream: &mut Self::Substream, - buf: &mut [u8], - ) -> Result - { + fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { let mut list = self.substreams.lock(); - self.inner.read_substream(list.get_mut(substream).unwrap(), buf) + self.inner.read_substream(list.get_mut(s).unwrap(), buf) } #[inline] - fn write_substream( - &self, - substream: &mut Self::Substream, - buf: &[u8], - ) -> Result { + fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll { let mut list = self.substreams.lock(); - self.inner.write_substream(list.get_mut(substream).unwrap(), buf) + self.inner.write_substream(list.get_mut(s).unwrap(), buf) } #[inline] - fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> { + fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> { let mut list = self.substreams.lock(); - self.inner.flush_substream(list.get_mut(substream).unwrap()) + self.inner.flush_substream(list.get_mut(s).unwrap()) } #[inline] - fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { + fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> { let mut list = self.substreams.lock(); - self.inner.shutdown_substream(list.get_mut(substream).unwrap()) + self.inner.shutdown_substream(list.get_mut(s).unwrap(), kind) } #[inline] @@ -523,12 +533,12 @@ impl StreamMuxer for Wrap where T: StreamMuxer { } #[inline] - fn close_inbound(&self) { - self.inner.close_inbound() + fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> { + self.inner.shutdown(kind) } #[inline] - fn close_outbound(&self) { - self.inner.close_outbound() + fn flush_all(&self) -> Poll<(), IoError> { + self.inner.flush_all() } } diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index 7035698a..65c5bac7 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -266,7 +266,7 @@ where mod tests { use super::*; use futures::future; - use muxing::StreamMuxer; + use muxing::{StreamMuxer, Shutdown}; use tokio::runtime::current_thread; // TODO: move somewhere? this could be useful as a dummy @@ -278,13 +278,13 @@ mod tests { fn open_outbound(&self) -> Self::OutboundSubstream { () } fn poll_outbound(&self, _: &mut Self::OutboundSubstream) -> Poll, IoError> { Ok(Async::Ready(None)) } fn destroy_outbound(&self, _: Self::OutboundSubstream) {} - fn read_substream(&self, _: &mut Self::Substream, _: &mut [u8]) -> Result { panic!() } - fn write_substream(&self, _: &mut Self::Substream, _: &[u8]) -> Result { panic!() } - fn flush_substream(&self, _: &mut Self::Substream) -> Result<(), IoError> { panic!() } - fn shutdown_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { panic!() } + fn read_substream(&self, _: &mut Self::Substream, _: &mut [u8]) -> Poll { panic!() } + fn write_substream(&self, _: &mut Self::Substream, _: &[u8]) -> Poll { panic!() } + fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { panic!() } + fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { panic!() } fn destroy_substream(&self, _: Self::Substream) { panic!() } - fn close_inbound(&self) {} - fn close_outbound(&self) {} + fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { Ok(Async::Ready(())) } + fn flush_all(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) } } #[test] diff --git a/core/src/nodes/node.rs b/core/src/nodes/node.rs index c5eafbaf..b22384ab 100644 --- a/core/src/nodes/node.rs +++ b/core/src/nodes/node.rs @@ -310,11 +310,18 @@ where for (_, outbound) in self.outbound_substreams.drain() { self.muxer.destroy_outbound(outbound); } - if !self.inbound_finished { - self.muxer.close_inbound(); - } - if !self.outbound_finished { - self.muxer.close_outbound(); + // TODO: Maybe the shutdown logic should not be part of the destructor? + match (self.inbound_finished, self.outbound_finished) { + (true, true) => {} + (true, false) => { + let _ = self.muxer.shutdown(muxing::Shutdown::Outbound); + } + (false, true) => { + let _ = self.muxer.shutdown(muxing::Shutdown::Inbound); + } + (false, false) => { + let _ = self.muxer.shutdown(muxing::Shutdown::All); + } } } } diff --git a/core/src/tests/dummy_muxer.rs b/core/src/tests/dummy_muxer.rs index bfd2852f..e91620b7 100644 --- a/core/src/tests/dummy_muxer.rs +++ b/core/src/tests/dummy_muxer.rs @@ -25,7 +25,7 @@ extern crate futures; use std::io::Error as IoError; -use muxing::StreamMuxer; +use muxing::{StreamMuxer, Shutdown}; use futures::prelude::*; /// Substream type @@ -91,12 +91,12 @@ impl StreamMuxer for DummyMuxer { DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream{}))), } } - fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {} - fn read_substream(&self, _substream: &mut Self::Substream, _buf: &mut [u8]) -> Result { unreachable!() } - fn write_substream(&self, _substream: &mut Self::Substream, _buf: &[u8]) -> Result { unreachable!() } - fn flush_substream(&self, _substream: &mut Self::Substream) -> Result<(), IoError> { unreachable!() } - fn shutdown_substream(&self, _substream: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() } - fn destroy_substream(&self, _substream: Self::Substream) {} - fn close_inbound(&self) {} - fn close_outbound(&self) {} + fn destroy_outbound(&self, _: Self::OutboundSubstream) {} + fn read_substream(&self, _: &mut Self::Substream, _buf: &mut [u8]) -> Poll { unreachable!() } + fn write_substream(&self, _: &mut Self::Substream, _buf: &[u8]) -> Poll { unreachable!() } + fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() } + fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { unreachable!() } + fn destroy_substream(&self, _: Self::Substream) {} + fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { Ok(Async::Ready(())) } + fn flush_all(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) } } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 9f5ed99f..5959512e 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -36,7 +36,7 @@ use std::{cmp, iter, mem}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; use bytes::Bytes; -use core::{ConnectionUpgrade, Endpoint, StreamMuxer}; +use core::{ConnectionUpgrade, Endpoint, StreamMuxer, muxing::Shutdown}; use parking_lot::Mutex; use fnv::{FnvHashMap, FnvHashSet}; use futures::prelude::*; @@ -423,13 +423,13 @@ where C: AsyncRead + AsyncWrite // Nothing to do. } - fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result { + fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll { loop { // First, transfer from `current_data`. if substream.current_data.len() != 0 { let len = cmp::min(substream.current_data.len(), buf.len()); buf[..len].copy_from_slice(&substream.current_data.split_to(len)); - return Ok(len); + return Ok(Async::Ready(len)); } // Try to find a packet of data in the buffer. @@ -449,22 +449,22 @@ where C: AsyncRead + AsyncWrite // just read and wait for the next iteration. match next_data_poll { Ok(Async::Ready(Some(data))) => substream.current_data = data, - Ok(Async::Ready(None)) => return Ok(0), + Ok(Async::Ready(None)) => return Ok(Async::Ready(0)), Ok(Async::NotReady) => { // There was no data packet in the buffer about this substream ; maybe it's // because it has been closed. if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) { - return Err(IoErrorKind::WouldBlock.into()); + return Ok(Async::NotReady) } else { - return Ok(0); + return Ok(Async::Ready(0)) } }, - Err(err) => return Err(err), + Err(err) => return Err(err) } } } - fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result { + fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Poll { let mut inner = self.inner.lock(); let to_write = cmp::min(buf.len(), inner.config.split_send_size); @@ -475,50 +475,51 @@ where C: AsyncRead + AsyncWrite endpoint: substream.endpoint, }; - match poll_send(&mut inner, elem) { - Ok(Async::Ready(())) => Ok(to_write), - Ok(Async::NotReady) => Err(IoErrorKind::WouldBlock.into()), - Err(err) => Err(err), + match poll_send(&mut inner, elem)? { + Async::Ready(()) => Ok(Async::Ready(to_write)), + Async::NotReady => Ok(Async::NotReady) } } - fn flush_substream(&self, _substream: &mut Self::Substream) -> Result<(), IoError> { + fn flush_substream(&self, _substream: &mut Self::Substream) -> Poll<(), IoError> { let mut inner = self.inner.lock(); let inner = &mut *inner; // Avoids borrow errors - match inner.inner.poll_flush_notify(&inner.notifier_write, 0) { - Ok(Async::Ready(())) => Ok(()), - Ok(Async::NotReady) => { + match inner.inner.poll_flush_notify(&inner.notifier_write, 0)? { + Async::Ready(()) => Ok(Async::Ready(())), + Async::NotReady => { inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); - Err(IoErrorKind::WouldBlock.into()) - }, - Err(err) => Err(err), + Ok(Async::NotReady) + } } } - fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { + fn shutdown_substream(&self, sub: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { let elem = codec::Elem::Reset { - substream_id: substream.num, - endpoint: substream.endpoint, + substream_id: sub.num, + endpoint: sub.endpoint, }; let mut inner = self.inner.lock(); poll_send(&mut inner, elem) } - fn destroy_substream(&self, mut substream: Self::Substream) { - let _ = self.shutdown_substream(&mut substream); // TODO: this doesn't necessarily send the close message + fn destroy_substream(&self, sub: Self::Substream) { self.inner.lock().buffer.retain(|elem| { - elem.substream_id() != substream.num || elem.endpoint() == Some(substream.endpoint) + elem.substream_id() != sub.num || elem.endpoint() == Some(sub.endpoint) }) } #[inline] - fn close_inbound(&self) { + fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { + let inner = &mut *self.inner.lock(); + inner.inner.close_notify(&inner.notifier_write, 0) } #[inline] - fn close_outbound(&self) { + fn flush_all(&self) -> Poll<(), IoError> { + let inner = &mut *self.inner.lock(); + inner.inner.poll_flush_notify(&inner.notifier_write, 0) } } diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 05e0a36a..50fb410d 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -28,11 +28,11 @@ extern crate tokio_io; extern crate yamux; use bytes::Bytes; -use core::Endpoint; +use core::{Endpoint, muxing::Shutdown}; use futures::{future::{self, FutureResult}, prelude::*}; use parking_lot::Mutex; use std::{io, iter}; -use std::io::{Read, Write, Error as IoError}; +use std::io::{Error as IoError}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -79,39 +79,41 @@ where } #[inline] - fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { + fn destroy_outbound(&self, _: Self::OutboundSubstream) { } #[inline] - fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result { - substream.read(buf) + fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll { + sub.poll_read(buf) } #[inline] - fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result { - substream.write(buf) + fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll { + sub.poll_write(buf) } #[inline] - fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> { - substream.flush() + fn flush_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { + sub.poll_flush() } #[inline] - fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { - substream.shutdown() + fn shutdown_substream(&self, sub: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { + sub.shutdown() } #[inline] - fn destroy_substream(&self, _substream: Self::Substream) { + fn destroy_substream(&self, _: Self::Substream) { } #[inline] - fn close_inbound(&self) { + fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { + Ok(Async::Ready(())) // TODO } #[inline] - fn close_outbound(&self) { + fn flush_all(&self) -> Poll<(), IoError> { + self.0.lock().flush() } }