muxing: adds an error type to streammuxer (#1083)

* muxing: adds an error type to streammuxer

* Update examples/chat.rs

Co-Authored-By: montekki <fedor.sakharov@gmail.com>

* make the trait error type bound to io error
This commit is contained in:
Fedor Sakharov 2019-04-28 14:42:18 +03:00 committed by Pierre Krieger
parent 47a775dbce
commit 68df8c07cf
11 changed files with 91 additions and 78 deletions

View File

@ -134,11 +134,12 @@ where
{ {
type Substream = EitherOutput<A::Substream, B::Substream>; type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>; type OutboundSubstream = EitherOutbound<A, B>;
type Error = IoError;
fn poll_inbound(&self) -> Poll<Self::Substream, IoError> { fn poll_inbound(&self) -> Poll<Self::Substream, Self::Error> {
match self { match self {
EitherOutput::First(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::First)), EitherOutput::First(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::First)).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::Second)), EitherOutput::Second(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into()),
} }
} }
@ -149,13 +150,13 @@ where
} }
} }
fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll<Self::Substream, IoError> { fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll<Self::Substream, Self::Error> {
match (self, substream) { match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => { (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => {
inner.poll_outbound(substream).map(|p| p.map(EitherOutput::First)) inner.poll_outbound(substream).map(|p| p.map(EitherOutput::First)).map_err(|e| e.into())
}, },
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => { (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => {
inner.poll_outbound(substream).map(|p| p.map(EitherOutput::Second)) inner.poll_outbound(substream).map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into())
}, },
_ => panic!("Wrong API usage") _ => panic!("Wrong API usage")
} }
@ -178,49 +179,49 @@ where
} }
} }
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> { fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> {
match (self, sub) { match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.read_substream(sub, buf) inner.read_substream(sub, buf).map_err(|e| e.into())
}, },
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.read_substream(sub, buf) inner.read_substream(sub, buf).map_err(|e| e.into())
}, },
_ => panic!("Wrong API usage") _ => panic!("Wrong API usage")
} }
} }
fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> { fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll<usize, Self::Error> {
match (self, sub) { match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.write_substream(sub, buf) inner.write_substream(sub, buf).map_err(|e| e.into())
}, },
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.write_substream(sub, buf) inner.write_substream(sub, buf).map_err(|e| e.into())
}, },
_ => panic!("Wrong API usage") _ => panic!("Wrong API usage")
} }
} }
fn flush_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { fn flush_substream(&self, sub: &mut Self::Substream) -> Poll<(), Self::Error> {
match (self, sub) { match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.flush_substream(sub) inner.flush_substream(sub).map_err(|e| e.into())
}, },
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.flush_substream(sub) inner.flush_substream(sub).map_err(|e| e.into())
}, },
_ => panic!("Wrong API usage") _ => panic!("Wrong API usage")
} }
} }
fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), Self::Error> {
match (self, sub) { match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.shutdown_substream(sub) inner.shutdown_substream(sub).map_err(|e| e.into())
}, },
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.shutdown_substream(sub) inner.shutdown_substream(sub).map_err(|e| e.into())
}, },
_ => panic!("Wrong API usage") _ => panic!("Wrong API usage")
} }
@ -250,17 +251,17 @@ where
} }
} }
fn close(&self) -> Poll<(), IoError> { fn close(&self) -> Poll<(), Self::Error> {
match self { match self {
EitherOutput::First(inner) => inner.close(), EitherOutput::First(inner) => inner.close().map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.close() EitherOutput::Second(inner) => inner.close().map_err(|e| e.into()),
} }
} }
fn flush_all(&self) -> Poll<(), IoError> { fn flush_all(&self) -> Poll<(), Self::Error> {
match self { match self {
EitherOutput::First(inner) => inner.flush_all(), EitherOutput::First(inner) => inner.flush_all().map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.flush_all() EitherOutput::Second(inner) => inner.flush_all().map_err(|e| e.into()),
} }
} }
} }

View File

@ -83,6 +83,9 @@ pub trait StreamMuxer {
/// Future that will be resolved when the outgoing substream is open. /// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream; type OutboundSubstream;
/// Error type of the muxer
type Error: Into<io::Error>;
/// Polls for an inbound substream. /// Polls for an inbound substream.
/// ///
/// This function behaves the same as a `Stream`. /// This function behaves the same as a `Stream`.
@ -92,7 +95,7 @@ pub trait StreamMuxer {
/// Only the latest task that was used to call this method may be notified. /// Only the latest task that was used to call this method may be notified.
/// ///
/// An error can be generated if the connection has been closed. /// An error can be generated if the connection has been closed.
fn poll_inbound(&self) -> Poll<Self::Substream, io::Error>; fn poll_inbound(&self) -> Poll<Self::Substream, Self::Error>;
/// Opens a new outgoing substream, and produces the equivalent to a future that will be /// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available. /// resolved when it becomes available.
@ -110,7 +113,7 @@ pub trait StreamMuxer {
/// ///
/// May panic or produce an undefined result if an earlier polling of the same substream /// May panic or produce an undefined result if an earlier polling of the same substream
/// returned `Ready` or `Err`. /// returned `Ready` or `Err`.
fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Self::Substream, io::Error>; fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Self::Substream, Self::Error>;
/// Destroys an outbound substream future. Use this after the outbound substream has finished, /// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it. /// or if you want to interrupt it.
@ -127,7 +130,7 @@ pub trait StreamMuxer {
/// ///
/// An error can be generated if the connection has been closed, or if a protocol misbehaviour /// An error can be generated if the connection has been closed, or if a protocol misbehaviour
/// happened. /// happened.
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, io::Error>; fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error>;
/// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`. /// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`.
/// ///
@ -140,7 +143,7 @@ pub trait StreamMuxer {
/// ///
/// It is incorrect to call this method on a substream if you called `shutdown_substream` on /// It is incorrect to call this method on a substream if you called `shutdown_substream` on
/// this substream earlier. /// this substream earlier.
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, io::Error>; fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, Self::Error>;
/// Flushes a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_flush`. /// Flushes a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_flush`.
/// ///
@ -152,7 +155,7 @@ pub trait StreamMuxer {
/// call this method may be notified. /// call this method may be notified.
/// ///
/// > **Note**: This method may be implemented as a call to `flush_all`. /// > **Note**: This method may be implemented as a call to `flush_all`.
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error>; fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), Self::Error>;
/// Attempts to shut down the writing side of a substream. The behaviour is similar to /// Attempts to shut down the writing side of a substream. The behaviour is similar to
/// `tokio_io::AsyncWrite::shutdown`. /// `tokio_io::AsyncWrite::shutdown`.
@ -165,7 +168,7 @@ pub trait StreamMuxer {
/// ///
/// An error can be generated if the connection has been closed, or if a protocol misbehaviour /// An error can be generated if the connection has been closed, or if a protocol misbehaviour
/// happened. /// happened.
fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error>; fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), Self::Error>;
/// Destroys a substream. /// Destroys a substream.
fn destroy_substream(&self, s: Self::Substream); fn destroy_substream(&self, s: Self::Substream);
@ -190,14 +193,14 @@ pub trait StreamMuxer {
/// > that the remote is properly informed of the shutdown. However, apart from /// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and /// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer. /// > immediately dropping the muxer.
fn close(&self) -> Poll<(), io::Error>; fn close(&self) -> Poll<(), Self::Error>;
/// Flush this `StreamMuxer`. /// Flush this `StreamMuxer`.
/// ///
/// This drains any write buffers of substreams and delivers any pending shutdown notifications /// This drains any write buffers of substreams and delivers any pending shutdown notifications
/// due to `shutdown_substream` or `close`. One may thus shutdown groups of substreams /// due to `shutdown_substream` or `close`. One may thus shutdown groups of substreams
/// followed by a final `flush_all` instead of having to do `flush_substream` for each. /// followed by a final `flush_all` instead of having to do `flush_substream` for each.
fn flush_all(&self) -> Poll<(), io::Error>; fn flush_all(&self) -> Poll<(), Self::Error>;
} }
/// Polls for an inbound from the muxer but wraps the output in an object that /// Polls for an inbound from the muxer but wraps the output in an object that
@ -205,7 +208,7 @@ pub trait StreamMuxer {
#[inline] #[inline]
pub fn inbound_from_ref_and_wrap<P>( pub fn inbound_from_ref_and_wrap<P>(
muxer: P, muxer: P,
) -> impl Future<Item = SubstreamRef<P>, Error = io::Error> ) -> impl Future<Item = SubstreamRef<P>, Error = <P::Target as StreamMuxer>::Error>
where where
P: Deref + Clone, P: Deref + Clone,
P::Target: StreamMuxer, P::Target: StreamMuxer,
@ -242,7 +245,7 @@ where
P::Target: StreamMuxer, P::Target: StreamMuxer,
{ {
type Item = SubstreamRef<P>; type Item = SubstreamRef<P>;
type Error = io::Error; type Error = <P::Target as StreamMuxer>::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() { match self.inner.poll() {
@ -286,7 +289,7 @@ where
P::Target: StreamMuxer, P::Target: StreamMuxer,
{ {
type Item = <P::Target as StreamMuxer>::Substream; type Item = <P::Target as StreamMuxer>::Substream;
type Error = io::Error; type Error = <P::Target as StreamMuxer>::Error;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -354,7 +357,7 @@ where
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.read_substream(s, buf)? { match self.muxer.read_substream(s, buf).map_err(|e| e.into())? {
Async::Ready(n) => Ok(n), Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()) Async::NotReady => Err(io::ErrorKind::WouldBlock.into())
} }
@ -369,7 +372,7 @@ where
#[inline] #[inline]
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
self.muxer.read_substream(s, buf) self.muxer.read_substream(s, buf).map_err(|e| e.into())
} }
} }
@ -381,7 +384,7 @@ where
#[inline] #[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> { fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.write_substream(s, buf)? { match self.muxer.write_substream(s, buf).map_err(|e| e.into())? {
Async::Ready(n) => Ok(n), Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()) Async::NotReady => Err(io::ErrorKind::WouldBlock.into())
} }
@ -390,7 +393,7 @@ where
#[inline] #[inline]
fn flush(&mut self) -> Result<(), io::Error> { fn flush(&mut self) -> Result<(), io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.flush_substream(s)? { match self.muxer.flush_substream(s).map_err(|e| e.into())? {
Async::Ready(()) => Ok(()), Async::Ready(()) => Ok(()),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()) Async::NotReady => Err(io::ErrorKind::WouldBlock.into())
} }
@ -405,20 +408,20 @@ where
#[inline] #[inline]
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, io::Error> { fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
self.muxer.write_substream(s, buf) self.muxer.write_substream(s, buf).map_err(|e| e.into())
} }
#[inline] #[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> { fn shutdown(&mut self) -> Poll<(), io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
self.muxer.shutdown_substream(s)?; self.muxer.shutdown_substream(s).map_err(|e| e.into())?;
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
#[inline] #[inline]
fn poll_flush(&mut self) -> Poll<(), io::Error> { fn poll_flush(&mut self) -> Poll<(), io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
self.muxer.flush_substream(s) self.muxer.flush_substream(s).map_err(|e| e.into())
} }
} }
@ -435,7 +438,7 @@ where
/// Abstract `StreamMuxer`. /// Abstract `StreamMuxer`.
pub struct StreamMuxerBox { pub struct StreamMuxerBox {
inner: Box<dyn StreamMuxer<Substream = usize, OutboundSubstream = usize> + Send + Sync>, inner: Box<dyn StreamMuxer<Substream = usize, OutboundSubstream = usize, Error = io::Error> + Send + Sync>,
} }
impl StreamMuxerBox { impl StreamMuxerBox {
@ -463,9 +466,10 @@ impl StreamMuxerBox {
impl StreamMuxer for StreamMuxerBox { impl StreamMuxer for StreamMuxerBox {
type Substream = usize; // TODO: use a newtype type Substream = usize; // TODO: use a newtype
type OutboundSubstream = usize; // TODO: use a newtype type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;
#[inline] #[inline]
fn poll_inbound(&self) -> Poll<Self::Substream, io::Error> { fn poll_inbound(&self) -> Poll<Self::Substream, Self::Error> {
self.inner.poll_inbound() self.inner.poll_inbound()
} }
@ -475,7 +479,7 @@ impl StreamMuxer for StreamMuxerBox {
} }
#[inline] #[inline]
fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Self::Substream, io::Error> { fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Self::Substream, Self::Error> {
self.inner.poll_outbound(s) self.inner.poll_outbound(s)
} }
@ -485,22 +489,22 @@ impl StreamMuxer for StreamMuxerBox {
} }
#[inline] #[inline]
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, io::Error> { fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> {
self.inner.read_substream(s, buf) self.inner.read_substream(s, buf)
} }
#[inline] #[inline]
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, io::Error> { fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, Self::Error> {
self.inner.write_substream(s, buf) self.inner.write_substream(s, buf)
} }
#[inline] #[inline]
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), Self::Error> {
self.inner.flush_substream(s) self.inner.flush_substream(s)
} }
#[inline] #[inline]
fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), Self::Error> {
self.inner.shutdown_substream(s) self.inner.shutdown_substream(s)
} }
@ -510,7 +514,7 @@ impl StreamMuxer for StreamMuxerBox {
} }
#[inline] #[inline]
fn close(&self) -> Poll<(), io::Error> { fn close(&self) -> Poll<(), Self::Error> {
self.inner.close() self.inner.close()
} }
@ -520,7 +524,7 @@ impl StreamMuxer for StreamMuxerBox {
} }
#[inline] #[inline]
fn flush_all(&self) -> Poll<(), io::Error> { fn flush_all(&self) -> Poll<(), Self::Error> {
self.inner.flush_all() self.inner.flush_all()
} }
} }
@ -533,13 +537,17 @@ struct Wrap<T> where T: StreamMuxer {
next_outbound: AtomicUsize, next_outbound: AtomicUsize,
} }
impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer { impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
{
type Substream = usize; // TODO: use a newtype type Substream = usize; // TODO: use a newtype
type OutboundSubstream = usize; // TODO: use a newtype type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;
#[inline] #[inline]
fn poll_inbound(&self) -> Poll<Self::Substream, io::Error> { fn poll_inbound(&self) -> Poll<Self::Substream, Self::Error> {
let substream = try_ready!(self.inner.poll_inbound()); let substream = try_ready!(self.inner.poll_inbound().map_err(|e| e.into()));
let id = self.next_substream.fetch_add(1, Ordering::Relaxed); let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream); self.substreams.lock().insert(id, substream);
Ok(Async::Ready(id)) Ok(Async::Ready(id))
@ -557,9 +565,9 @@ impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
fn poll_outbound( fn poll_outbound(
&self, &self,
substream: &mut Self::OutboundSubstream, substream: &mut Self::OutboundSubstream,
) -> Poll<Self::Substream, io::Error> { ) -> Poll<Self::Substream, Self::Error> {
let mut list = self.outbound.lock(); let mut list = self.outbound.lock();
let substream = try_ready!(self.inner.poll_outbound(list.get_mut(substream).unwrap())); let substream = try_ready!(self.inner.poll_outbound(list.get_mut(substream).unwrap()).map_err(|e| e.into()));
let id = self.next_substream.fetch_add(1, Ordering::Relaxed); let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream); self.substreams.lock().insert(id, substream);
Ok(Async::Ready(id)) Ok(Async::Ready(id))
@ -572,27 +580,27 @@ impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
} }
#[inline] #[inline]
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, io::Error> { fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> {
let mut list = self.substreams.lock(); let mut list = self.substreams.lock();
self.inner.read_substream(list.get_mut(s).unwrap(), buf) self.inner.read_substream(list.get_mut(s).unwrap(), buf).map_err(|e| e.into())
} }
#[inline] #[inline]
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, io::Error> { fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, Self::Error> {
let mut list = self.substreams.lock(); let mut list = self.substreams.lock();
self.inner.write_substream(list.get_mut(s).unwrap(), buf) self.inner.write_substream(list.get_mut(s).unwrap(), buf).map_err(|e| e.into())
} }
#[inline] #[inline]
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), Self::Error> {
let mut list = self.substreams.lock(); let mut list = self.substreams.lock();
self.inner.flush_substream(list.get_mut(s).unwrap()) self.inner.flush_substream(list.get_mut(s).unwrap()).map_err(|e| e.into())
} }
#[inline] #[inline]
fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), Self::Error> {
let mut list = self.substreams.lock(); let mut list = self.substreams.lock();
self.inner.shutdown_substream(list.get_mut(s).unwrap()) self.inner.shutdown_substream(list.get_mut(s).unwrap()).map_err(|e| e.into())
} }
#[inline] #[inline]
@ -602,8 +610,8 @@ impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
} }
#[inline] #[inline]
fn close(&self) -> Poll<(), io::Error> { fn close(&self) -> Poll<(), Self::Error> {
self.inner.close() self.inner.close().map_err(|e| e.into())
} }
#[inline] #[inline]
@ -612,7 +620,7 @@ impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
} }
#[inline] #[inline]
fn flush_all(&self) -> Poll<(), io::Error> { fn flush_all(&self) -> Poll<(), Self::Error> {
self.inner.flush_all() self.inner.flush_all().map_err(|e| e.into())
} }
} }

View File

@ -66,6 +66,7 @@ where
{ {
type Substream = Substream; type Substream = Substream;
type OutboundSubstream = OutboundSubstream; type OutboundSubstream = OutboundSubstream;
type Error = io::Error;
fn poll_inbound(&self) -> Poll<Self::Substream, io::Error> { fn poll_inbound(&self) -> Poll<Self::Substream, io::Error> {
match self.endpoint { match self.endpoint {

View File

@ -145,7 +145,7 @@ where
/// Provides an API similar to `Future`. /// Provides an API similar to `Future`.
pub fn poll(&mut self) -> Poll<NodeEvent<TMuxer, TUserData>, IoError> { pub fn poll(&mut self) -> Poll<NodeEvent<TMuxer, TUserData>, IoError> {
// Polling inbound substream. // Polling inbound substream.
match self.muxer.poll_inbound()? { match self.muxer.poll_inbound().map_err(|e| e.into())? {
Async::Ready(substream) => { Async::Ready(substream) => {
let substream = muxing::substream_from_ref(self.muxer.clone(), substream); let substream = muxing::substream_from_ref(self.muxer.clone(), substream);
return Ok(Async::Ready(NodeEvent::InboundSubstream { return Ok(Async::Ready(NodeEvent::InboundSubstream {
@ -173,7 +173,7 @@ where
} }
Err(err) => { Err(err) => {
self.muxer.destroy_outbound(outbound); self.muxer.destroy_outbound(outbound);
return Err(err); return Err(err.into());
} }
} }
} }
@ -216,7 +216,7 @@ where
type Error = IoError; type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.muxer.close() self.muxer.close().map_err(|e| e.into())
} }
} }

View File

@ -79,6 +79,7 @@ impl DummyMuxer {
impl StreamMuxer for DummyMuxer { impl StreamMuxer for DummyMuxer {
type Substream = DummySubstream; type Substream = DummySubstream;
type OutboundSubstream = DummyOutboundSubstream; type OutboundSubstream = DummyOutboundSubstream;
type Error = IoError;
fn poll_inbound(&self) -> Poll<Self::Substream, IoError> { fn poll_inbound(&self) -> Poll<Self::Substream, IoError> {
match self.in_connection.state { match self.in_connection.state {
DummyConnectionState::Pending => Ok(Async::NotReady), DummyConnectionState::Pending => Ok(Async::NotReady),

View File

@ -350,6 +350,7 @@ where C: AsyncRead + AsyncWrite
{ {
type Substream = Substream; type Substream = Substream;
type OutboundSubstream = OutboundSubstream; type OutboundSubstream = OutboundSubstream;
type Error = IoError;
fn poll_inbound(&self) -> Poll<Self::Substream, IoError> { fn poll_inbound(&self) -> Poll<Self::Substream, IoError> {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();

View File

@ -47,6 +47,7 @@ where
{ {
type Substream = yamux::StreamHandle<C>; type Substream = yamux::StreamHandle<C>;
type OutboundSubstream = FutureResult<Option<Self::Substream>, io::Error>; type OutboundSubstream = FutureResult<Option<Self::Substream>, io::Error>;
type Error = IoError;
fn poll_inbound(&self) -> Poll<Self::Substream, IoError> { fn poll_inbound(&self) -> Poll<Self::Substream, IoError> {
match self.0.poll() { match self.0.poll() {

View File

@ -150,7 +150,7 @@ where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config); self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config);
return Ok(Async::NotReady); return Ok(Async::NotReady);
}, },
Err(err) => return Err(UpgradeError::Apply(err)) Err(err) => return Err(UpgradeError::Apply(err.into()))
} }
}, },
IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => { IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => {

View File

@ -238,7 +238,7 @@ mod tests {
use tokio::runtime::current_thread; use tokio::runtime::current_thread;
fn transport() -> (identity::PublicKey, impl Transport< fn transport() -> (identity::PublicKey, impl Transport<
Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send>), Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send, Error = impl Into<io::Error>>),
Listener = impl Send, Listener = impl Send,
ListenerUpgrade = impl Send, ListenerUpgrade = impl Send,
Dial = impl Send, Dial = impl Send,

View File

@ -34,7 +34,7 @@ use libp2p_yamux as yamux;
use libp2p_secio::SecioConfig; use libp2p_secio::SecioConfig;
use libp2p_tcp::TcpConfig; use libp2p_tcp::TcpConfig;
use futures::{future, prelude::*}; use futures::{future, prelude::*};
use std::{fmt, time::Duration, sync::mpsc::sync_channel}; use std::{fmt, io, time::Duration, sync::mpsc::sync_channel};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
#[test] #[test]
@ -102,7 +102,7 @@ fn ping() {
} }
fn mk_transport() -> (PeerId, impl Transport< fn mk_transport() -> (PeerId, impl Transport<
Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send>), Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send, Error = impl Into<io::Error>>),
Listener = impl Send, Listener = impl Send,
ListenerUpgrade = impl Send, ListenerUpgrade = impl Send,
Dial = impl Send, Dial = impl Send,

View File

@ -216,14 +216,14 @@ pub use self::simple::SimpleProtocol;
pub use self::transport_ext::TransportExt; pub use self::transport_ext::TransportExt;
use futures::prelude::*; use futures::prelude::*;
use std::{error, time::Duration}; use std::{error, io, time::Duration};
/// Builds a `Transport` that supports the most commonly-used protocols that libp2p supports. /// Builds a `Transport` that supports the most commonly-used protocols that libp2p supports.
/// ///
/// > **Note**: This `Transport` is not suitable for production usage, as its implementation /// > **Note**: This `Transport` is not suitable for production usage, as its implementation
/// > reserves the right to support additional protocols or remove deprecated protocols. /// > reserves the right to support additional protocols or remove deprecated protocols.
pub fn build_development_transport(keypair: identity::Keypair) pub fn build_development_transport(keypair: identity::Keypair)
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone -> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
{ {
build_tcp_ws_secio_mplex_yamux(keypair) build_tcp_ws_secio_mplex_yamux(keypair)
} }
@ -235,7 +235,7 @@ pub fn build_development_transport(keypair: identity::Keypair)
/// ///
/// > **Note**: If you ever need to express the type of this `Transport`. /// > **Note**: If you ever need to express the type of this `Transport`.
pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair) pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair)
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone -> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
{ {
CommonTransport::new() CommonTransport::new()
.with_upgrade(secio::SecioConfig::new(keypair)) .with_upgrade(secio::SecioConfig::new(keypair))