Fix error being ignored in RwSinkStream (#989)

* Fix error being ignored in RwSinkStream

* Add test for partial reading

* Update misc/rw-stream-sink/src/lib.rs

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>
This commit is contained in:
Pierre Krieger
2019-03-01 12:08:49 +01:00
committed by GitHub
parent a64b18d5ab
commit f6cbafecbd
2 changed files with 45 additions and 61 deletions

View File

@ -2,7 +2,7 @@
name = "rw-stream-sink" name = "rw-stream-sink"
edition = "2018" edition = "2018"
description = "Adaptator between Stream/Sink and AsyncRead/AsyncWrite" description = "Adaptator between Stream/Sink and AsyncRead/AsyncWrite"
version = "0.1.0" version = "0.1.1"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -63,61 +63,26 @@ where
S::Item: IntoBuf, S::Item: IntoBuf,
{ {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
let mut written = 0; // Grab the item to copy from.
let item_to_copy = loop {
if let Some(ref mut i) = self.current_item {
if i.has_remaining() {
break i;
}
}
loop { self.current_item = Some(match self.inner.poll()? {
let need_new_item = if let Some(ref i) = self.current_item { Async::Ready(Some(i)) => i.into_buf(),
!i.has_remaining() Async::Ready(None) => return Ok(0), // EOF
} else { Async::NotReady => return Err(IoErrorKind::WouldBlock.into()),
true });
}; };
if need_new_item { // Copy it!
loop { debug_assert!(item_to_copy.has_remaining());
self.current_item = match self.inner.poll() { let to_copy = cmp::min(buf.len(), item_to_copy.remaining());
Ok(Async::Ready(None)) => None, item_to_copy.take(to_copy).copy_to_slice(&mut buf[..to_copy]);
Ok(Async::Ready(Some(i))) => { Ok(to_copy)
let b = i.into_buf();
if !b.has_remaining() {
continue // skip over empty items
}
Some(b)
}
Ok(Async::NotReady) => {
if written == 0 {
return Err(IoError::new(IoErrorKind::WouldBlock, "stream not ready"));
} else {
return Ok(written);
}
}
Err(err) => {
if written == 0 {
return Err(err);
} else {
return Ok(written);
}
}
};
break
}
}
let current_item = match self.current_item {
Some(ref mut i) => i,
None => return Ok(written),
};
let to_copy = cmp::min(buf.len() - written, current_item.remaining());
if to_copy == 0 {
return Ok(written);
}
current_item
.by_ref()
.take(to_copy)
.copy_to_slice(&mut buf[written..(written + to_copy)]);
written += to_copy;
}
} }
} }
@ -216,13 +181,9 @@ mod tests {
.wait() .wait()
.unwrap(); .unwrap();
let mut data1 = [0u8; 5]; let mut data = Vec::new();
assert_eq!(wrapper.read(&mut data1).unwrap(), 5); wrapper.read_to_end(&mut data).unwrap();
assert_eq!(&data1, b"hello"); assert_eq!(data, b"hello world");
let mut data2 = Vec::new();
wrapper.read_to_end(&mut data2).unwrap();
assert_eq!(data2, b" world");
} }
#[test] #[test]
@ -230,7 +191,30 @@ mod tests {
let data: Vec<&[u8]> = vec![b"", b"foo", b"", b"bar", b"", b"baz", b""]; let data: Vec<&[u8]> = vec![b"", b"foo", b"", b"bar", b"", b"baz", b""];
let mut rws = RwStreamSink::new(stream::iter_ok::<_, std::io::Error>(data)); let mut rws = RwStreamSink::new(stream::iter_ok::<_, std::io::Error>(data));
let mut buf = [0; 9]; let mut buf = [0; 9];
assert_eq!(9, rws.read(&mut buf).unwrap()); assert_eq!(3, rws.read(&mut buf).unwrap());
assert_eq!(b"foobarbaz", &buf[..]) assert_eq!(3, rws.read(&mut buf[3..]).unwrap());
assert_eq!(3, rws.read(&mut buf[6..]).unwrap());
assert_eq!(0, rws.read(&mut buf).unwrap());
assert_eq!(b"foobarbaz", &buf[..]);
}
#[test]
fn partial_read() {
let data: Vec<&[u8]> = vec![b"hell", b"o world"];
let mut rws = RwStreamSink::new(stream::iter_ok::<_, std::io::Error>(data));
let mut buf = [0; 3];
assert_eq!(3, rws.read(&mut buf).unwrap());
assert_eq!(b"hel", &buf[..3]);
assert_eq!(0, rws.read(&mut buf[..0]).unwrap());
assert_eq!(1, rws.read(&mut buf).unwrap());
assert_eq!(b"l", &buf[..1]);
assert_eq!(3, rws.read(&mut buf).unwrap());
assert_eq!(b"o w", &buf[..3]);
assert_eq!(0, rws.read(&mut buf[..0]).unwrap());
assert_eq!(3, rws.read(&mut buf).unwrap());
assert_eq!(b"orl", &buf[..3]);
assert_eq!(1, rws.read(&mut buf).unwrap());
assert_eq!(b"d", &buf[..1]);
assert_eq!(0, rws.read(&mut buf).unwrap());
} }
} }