diff --git a/mplex/src/read.rs b/mplex/src/read.rs index b7b4df13..47459f6f 100644 --- a/mplex/src/read.rs +++ b/mplex/src/read.rs @@ -101,6 +101,7 @@ fn block_on_wrong_stream>( }); let mut out_consumed = 0; + let mut stream_eof = false; if let Some((tasks, cache)) = lock.open_streams .entry(substream_id) .or_insert_with(|| SubstreamMetadata::new_open()) @@ -130,6 +131,10 @@ fn block_on_wrong_stream>( match lock.stream.read(buf_prefix) { Ok(consumed) => { + if consumed == 0 && !buf_prefix.is_empty() { + stream_eof = true + } + let new_remaining = remaining_bytes - consumed; assert!(cache.extend_from_slice(&buf_prefix[..consumed])); @@ -158,6 +163,10 @@ fn block_on_wrong_stream>( } } + if stream_eof { + lock.close() + } + Ok(out_consumed) }