From f6cbafecbd734fd5ddf94797547f11acb5ef4664 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 1 Mar 2019 12:08:49 +0100 Subject: [PATCH] Fix error being ignored in RwSinkStream (#989) * Fix error being ignored in RwSinkStream * Add test for partial reading * Update misc/rw-stream-sink/src/lib.rs Co-Authored-By: tomaka --- misc/rw-stream-sink/Cargo.toml | 2 +- misc/rw-stream-sink/src/lib.rs | 104 ++++++++++++++------------------- 2 files changed, 45 insertions(+), 61 deletions(-) diff --git a/misc/rw-stream-sink/Cargo.toml b/misc/rw-stream-sink/Cargo.toml index 2f2d5f23..f3ab64f1 100644 --- a/misc/rw-stream-sink/Cargo.toml +++ b/misc/rw-stream-sink/Cargo.toml @@ -2,7 +2,7 @@ name = "rw-stream-sink" edition = "2018" description = "Adaptator between Stream/Sink and AsyncRead/AsyncWrite" -version = "0.1.0" +version = "0.1.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/misc/rw-stream-sink/src/lib.rs b/misc/rw-stream-sink/src/lib.rs index 6e1086af..69485961 100644 --- a/misc/rw-stream-sink/src/lib.rs +++ b/misc/rw-stream-sink/src/lib.rs @@ -63,61 +63,26 @@ where S::Item: IntoBuf, { fn read(&mut self, buf: &mut [u8]) -> Result { - let mut written = 0; - - loop { - let need_new_item = if let Some(ref i) = self.current_item { - !i.has_remaining() - } else { - true - }; - - if need_new_item { - loop { - self.current_item = match self.inner.poll() { - Ok(Async::Ready(None)) => None, - Ok(Async::Ready(Some(i))) => { - let b = i.into_buf(); - if !b.has_remaining() { - continue // skip over empty items - } - Some(b) - } - Ok(Async::NotReady) => { - if written == 0 { - return Err(IoError::new(IoErrorKind::WouldBlock, "stream not ready")); - } else { - return Ok(written); - } - } - Err(err) => { - if written == 0 { - return Err(err); - } else { - return Ok(written); - } - } - }; - break + // Grab the item to copy from. + let item_to_copy = loop { + if let Some(ref mut i) = self.current_item { + if i.has_remaining() { + break i; } } - let current_item = match self.current_item { - Some(ref mut i) => i, - None => return Ok(written), - }; + self.current_item = Some(match self.inner.poll()? { + Async::Ready(Some(i)) => i.into_buf(), + Async::Ready(None) => return Ok(0), // EOF + Async::NotReady => return Err(IoErrorKind::WouldBlock.into()), + }); + }; - let to_copy = cmp::min(buf.len() - written, current_item.remaining()); - if to_copy == 0 { - return Ok(written); - } - - current_item - .by_ref() - .take(to_copy) - .copy_to_slice(&mut buf[written..(written + to_copy)]); - written += to_copy; - } + // Copy it! + debug_assert!(item_to_copy.has_remaining()); + let to_copy = cmp::min(buf.len(), item_to_copy.remaining()); + item_to_copy.take(to_copy).copy_to_slice(&mut buf[..to_copy]); + Ok(to_copy) } } @@ -216,13 +181,9 @@ mod tests { .wait() .unwrap(); - let mut data1 = [0u8; 5]; - assert_eq!(wrapper.read(&mut data1).unwrap(), 5); - assert_eq!(&data1, b"hello"); - - let mut data2 = Vec::new(); - wrapper.read_to_end(&mut data2).unwrap(); - assert_eq!(data2, b" world"); + let mut data = Vec::new(); + wrapper.read_to_end(&mut data).unwrap(); + assert_eq!(data, b"hello world"); } #[test] @@ -230,7 +191,30 @@ mod tests { let data: Vec<&[u8]> = vec![b"", b"foo", b"", b"bar", b"", b"baz", b""]; let mut rws = RwStreamSink::new(stream::iter_ok::<_, std::io::Error>(data)); let mut buf = [0; 9]; - assert_eq!(9, rws.read(&mut buf).unwrap()); - assert_eq!(b"foobarbaz", &buf[..]) + assert_eq!(3, rws.read(&mut buf).unwrap()); + assert_eq!(3, rws.read(&mut buf[3..]).unwrap()); + assert_eq!(3, rws.read(&mut buf[6..]).unwrap()); + assert_eq!(0, rws.read(&mut buf).unwrap()); + assert_eq!(b"foobarbaz", &buf[..]); + } + + #[test] + fn partial_read() { + let data: Vec<&[u8]> = vec![b"hell", b"o world"]; + let mut rws = RwStreamSink::new(stream::iter_ok::<_, std::io::Error>(data)); + let mut buf = [0; 3]; + assert_eq!(3, rws.read(&mut buf).unwrap()); + assert_eq!(b"hel", &buf[..3]); + assert_eq!(0, rws.read(&mut buf[..0]).unwrap()); + assert_eq!(1, rws.read(&mut buf).unwrap()); + assert_eq!(b"l", &buf[..1]); + assert_eq!(3, rws.read(&mut buf).unwrap()); + assert_eq!(b"o w", &buf[..3]); + assert_eq!(0, rws.read(&mut buf[..0]).unwrap()); + assert_eq!(3, rws.read(&mut buf).unwrap()); + assert_eq!(b"orl", &buf[..3]); + assert_eq!(1, rws.read(&mut buf).unwrap()); + assert_eq!(b"d", &buf[..1]); + assert_eq!(0, rws.read(&mut buf).unwrap()); } }