diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index e5bbaa97..8b689f0b 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -372,6 +372,8 @@ where C: AsyncRead + AsyncWrite current_data: Bytes::new(), num, endpoint: Endpoint::Listener, + local_open: true, + remote_open: true, })) } @@ -443,6 +445,8 @@ where C: AsyncRead + AsyncWrite num: substream.num, current_data: Bytes::new(), endpoint: Endpoint::Dialer, + local_open: true, + remote_open: true, })); }, OutboundSubstreamState::Done => unreachable!(), @@ -464,6 +468,11 @@ where C: AsyncRead + AsyncWrite return Ok(Async::Ready(len)); } + // If the remote writing side is closed, return EOF. + if !substream.remote_open { + return Ok(Async::Ready(0)); + } + // Try to find a packet of data in the buffer. let mut inner = self.inner.lock(); let next_data_poll = next_match(&mut inner, |elem| { @@ -471,7 +480,12 @@ where C: AsyncRead + AsyncWrite codec::Elem::Data { substream_id, endpoint, data, .. } if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId] { - Some(data.clone()) + Some(Some(data.clone())) + } + codec::Elem::Close { substream_id, endpoint } + if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId] + { + Some(None) } _ => None } @@ -479,9 +493,13 @@ where C: AsyncRead + AsyncWrite // We're in a loop, so all we need to do is set `substream.current_data` to the data we // just read and wait for the next iteration. - match next_data_poll { - Ok(Async::Ready(data)) => substream.current_data = data, - Ok(Async::NotReady) => { + match next_data_poll? { + Async::Ready(Some(data)) => substream.current_data = data, + Async::Ready(None) | Async::Ready(None) => { + substream.remote_open = false; + return Ok(Async::Ready(0)); + }, + Async::NotReady => { // There was no data packet in the buffer about this substream; maybe it's // because it has been closed. if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) { @@ -490,12 +508,15 @@ where C: AsyncRead + AsyncWrite return Ok(Async::Ready(0)) } }, - Err(err) => return Err(err) } } } fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Poll { + if !substream.local_open { + return Err(IoErrorKind::BrokenPipe.into()); + } + let mut inner = self.inner.lock(); let to_write = cmp::min(buf.len(), inner.config.split_send_size); @@ -529,13 +550,21 @@ where C: AsyncRead + AsyncWrite } fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { + if !sub.local_open { + return Ok(Async::Ready(())); + } + let elem = codec::Elem::Close { substream_id: sub.num, endpoint: sub.endpoint, }; let mut inner = self.inner.lock(); - poll_send(&mut inner, elem) + let result = poll_send(&mut inner, elem); + if let Ok(Async::Ready(())) = result { + sub.local_open = false; + } + result } fn destroy_substream(&self, sub: Self::Substream) { @@ -589,4 +618,8 @@ pub struct Substream { // Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`. current_data: Bytes, endpoint: Endpoint, + /// If true, our writing side is still open. + local_open: bool, + /// If true, the remote writing side is still open. + remote_open: bool, }