Add half-closed mplex substream support (#984)

This commit is contained in:
Pierre Krieger 2019-03-12 15:59:37 +01:00 committed by GitHub
parent 8059a693a3
commit 1eb7dd8582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -372,6 +372,8 @@ where C: AsyncRead + AsyncWrite
current_data: Bytes::new(), current_data: Bytes::new(),
num, num,
endpoint: Endpoint::Listener, endpoint: Endpoint::Listener,
local_open: true,
remote_open: true,
})) }))
} }
@ -443,6 +445,8 @@ where C: AsyncRead + AsyncWrite
num: substream.num, num: substream.num,
current_data: Bytes::new(), current_data: Bytes::new(),
endpoint: Endpoint::Dialer, endpoint: Endpoint::Dialer,
local_open: true,
remote_open: true,
})); }));
}, },
OutboundSubstreamState::Done => unreachable!(), OutboundSubstreamState::Done => unreachable!(),
@ -464,6 +468,11 @@ where C: AsyncRead + AsyncWrite
return Ok(Async::Ready(len)); return Ok(Async::Ready(len));
} }
// If the remote writing side is closed, return EOF.
if !substream.remote_open {
return Ok(Async::Ready(0));
}
// Try to find a packet of data in the buffer. // Try to find a packet of data in the buffer.
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let next_data_poll = next_match(&mut inner, |elem| { let next_data_poll = next_match(&mut inner, |elem| {
@ -471,7 +480,12 @@ where C: AsyncRead + AsyncWrite
codec::Elem::Data { substream_id, endpoint, data, .. } codec::Elem::Data { substream_id, endpoint, data, .. }
if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId] if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId]
{ {
Some(data.clone()) Some(Some(data.clone()))
}
codec::Elem::Close { substream_id, endpoint }
if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId]
{
Some(None)
} }
_ => None _ => None
} }
@ -479,9 +493,13 @@ where C: AsyncRead + AsyncWrite
// We're in a loop, so all we need to do is set `substream.current_data` to the data we // We're in a loop, so all we need to do is set `substream.current_data` to the data we
// just read and wait for the next iteration. // just read and wait for the next iteration.
match next_data_poll { match next_data_poll? {
Ok(Async::Ready(data)) => substream.current_data = data, Async::Ready(Some(data)) => substream.current_data = data,
Ok(Async::NotReady) => { Async::Ready(None) | Async::Ready(None) => {
substream.remote_open = false;
return Ok(Async::Ready(0));
},
Async::NotReady => {
// There was no data packet in the buffer about this substream; maybe it's // There was no data packet in the buffer about this substream; maybe it's
// because it has been closed. // because it has been closed.
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) { if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
@ -490,12 +508,15 @@ where C: AsyncRead + AsyncWrite
return Ok(Async::Ready(0)) return Ok(Async::Ready(0))
} }
}, },
Err(err) => return Err(err)
} }
} }
} }
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> { fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
if !substream.local_open {
return Err(IoErrorKind::BrokenPipe.into());
}
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let to_write = cmp::min(buf.len(), inner.config.split_send_size); let to_write = cmp::min(buf.len(), inner.config.split_send_size);
@ -529,13 +550,21 @@ where C: AsyncRead + AsyncWrite
} }
fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> {
if !sub.local_open {
return Ok(Async::Ready(()));
}
let elem = codec::Elem::Close { let elem = codec::Elem::Close {
substream_id: sub.num, substream_id: sub.num,
endpoint: sub.endpoint, endpoint: sub.endpoint,
}; };
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
poll_send(&mut inner, elem) let result = poll_send(&mut inner, elem);
if let Ok(Async::Ready(())) = result {
sub.local_open = false;
}
result
} }
fn destroy_substream(&self, sub: Self::Substream) { fn destroy_substream(&self, sub: Self::Substream) {
@ -589,4 +618,8 @@ pub struct Substream {
// Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`. // Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`.
current_data: Bytes, current_data: Bytes,
endpoint: Endpoint, endpoint: Endpoint,
/// If true, our writing side is still open.
local_open: bool,
/// If true, the remote writing side is still open.
remote_open: bool,
} }