From 097666b09ee0eae0fb8c7918099454b792622a0d Mon Sep 17 00:00:00 2001 From: Vurich Date: Fri, 10 Nov 2017 12:27:11 +0100 Subject: [PATCH] Add circular-buffer --- Cargo.toml | 15 +- circular-buffer/Cargo.toml | 7 + circular-buffer/src/lib.rs | 693 +++++++++++++++++++++++++++++++++++++ multiplex-rs/Cargo.toml | 1 + multiplex-rs/src/lib.rs | 41 ++- 5 files changed, 735 insertions(+), 22 deletions(-) create mode 100644 circular-buffer/Cargo.toml create mode 100644 circular-buffer/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 35a89307..3c1e4621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,21 @@ [workspace] members = [ - "multistream-select", - "datastore", - "multihash", - "example", "libp2p-peerstore", "libp2p-ping", "libp2p-secio", "libp2p-swarm", + "libp2p-transport", + "libp2p-host", + "libp2p-tcp-transport", + "libp2p-stream-muxer", + "multihash", + "multistream-select", + "datastore", "rw-stream-sink", + "circular-buffer", "varint-rs", - "multiplex-rs" + "multiplex-rs", + "example", ] [replace] diff --git a/circular-buffer/Cargo.toml b/circular-buffer/Cargo.toml new file mode 100644 index 00000000..01490924 --- /dev/null +++ b/circular-buffer/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "circular-buffer" +version = "0.1.0" +authors = ["Vurich "] + +[dependencies] +smallvec = "0.5.0" \ No newline at end of file diff --git a/circular-buffer/src/lib.rs b/circular-buffer/src/lib.rs new file mode 100644 index 00000000..b7f8bd05 --- /dev/null +++ b/circular-buffer/src/lib.rs @@ -0,0 +1,693 @@ +#![warn(missing_docs)] + +//! # `circular-buffer` +//! +//! An optimized FIFO queue that allows safe access to the internal storage as a slice (i.e. not +//! just element-by-element). This is useful for circular buffers of bytes. Since it uses +//! `smallvec`'s `Array` trait it can only be backed by an array of static size, this may change in +//! the future. + +extern crate smallvec; + +use std::ops::{Deref, DerefMut, Drop}; +use std::mem::ManuallyDrop; + +use smallvec::Array; + +/// A slice that owns its elements, but not their storage. This is useful for things like +/// `Vec::retain` and `CircularBuffer::pop_slice`, since these functions can return a slice but the +/// elements of these slices would be leaked after the slice goes out of scope. `OwnedSlice` simply +/// manually drops all its elements when it goes out of scope. +#[derive(Debug, Eq, PartialEq)] +pub struct OwnedSlice<'a, T: 'a>(&'a mut [T]); + +impl<'a, T: 'a> OwnedSlice<'a, T> { + /// Construct an owned slice from a mutable slice pointer. + /// + /// # Unsafety + /// You must ensure that the memory pointed to by `inner` will not be accessible after the + /// lifetime of the `OwnedSlice`. + pub unsafe fn new(inner: &'a mut [T]) -> Self { + OwnedSlice(inner) + } +} + +impl<'a, T> AsRef<[T]> for OwnedSlice<'a, T> { + fn as_ref(&self) -> &[T] { + self.0 + } +} + +impl<'a, T> AsMut<[T]> for OwnedSlice<'a, T> { + fn as_mut(&mut self) -> &mut [T] { + self.0 + } +} + +impl<'a, T> Deref for OwnedSlice<'a, T> { + type Target = [T]; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl<'a, T> DerefMut for OwnedSlice<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + +impl<'a, T> Drop for OwnedSlice<'a, T> { + fn drop(&mut self) { + use std::ptr; + + for element in self.iter_mut() { + unsafe { + ptr::drop_in_place(element); + } + } + } +} + +/// A fixed-size FIFO queue with safe access to the backing storage. +/// +/// This type allows access to slices of the backing storage, for efficient, safe circular buffers +/// of bytes or other `Copy` types. +#[derive(Debug)] +pub struct CircularBuffer { + // This must be manually dropped, as some or all of the elements may be uninitialized + buffer: ManuallyDrop, + start: usize, + len: usize, +} + +impl PartialEq for CircularBuffer +where + B::Item: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + if self.len() != other.len() { + return false; + } + + for (a, b) in self.iter().zip(other.iter()) { + if a != b { + return false; + } + } + + return true; + } +} + +impl CircularBuffer { + /// Create an empty `CircularBuffer`. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::<[usize; 4]>::new(); + /// + /// assert!(buffer.is_empty()); + /// ``` + pub fn new() -> Self { + use std::mem; + + CircularBuffer { + buffer: unsafe { mem::uninitialized() }, + start: 0, + len: 0, + } + } + + /// Pop a slice containing the maximum possible contiguous number of elements. Since this buffer + /// is circular it will take a maximum of two calls to this function to drain the buffer + /// entirely. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]); + /// + /// assert_eq!(buffer.pop(), Some(1)); + /// assert!(buffer.push(1).is_none()); + /// + /// assert_eq!( + /// buffer.pop_slice().as_ref().map(|s| &s[..]), + /// Some(&[2, 3, 4][..]) + /// ); + /// assert_eq!(buffer.pop_slice().as_ref().map(|s| &s[..]), Some(&[1][..])); + /// assert!(buffer.pop_slice().is_none()); + /// + /// assert_eq!(buffer.len(), 0); + /// ``` + /// + /// This returns an `OwnedSlice`, which owns the items but not the storage (you cannot have two + /// slices returned from `pop_slice` alive at once, but the elements will be have `drop` called + /// when the slice goes out of scope), if you're using non-`Drop` types you can use + /// `pop_slice_leaky`. + pub fn pop_slice(&mut self) -> Option> { + self.pop_slice_leaky().map(OwnedSlice) + } + + /// Pop a slice containing the maximum possible contiguous number of elements. Since this buffer + /// is circular it will take a maximum of two calls to this function to drain the buffer + /// entirely. This returns a slice and so any `Drop` types returned from this function will be + /// leaked. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]); + /// + /// assert_eq!(buffer.pop(), Some(1)); + /// assert!(buffer.push(1).is_none()); + /// + /// assert_eq!( + /// buffer.pop_slice_leaky(), + /// Some(&mut [2, 3, 4][..]) + /// ); + /// assert_eq!(buffer.pop_slice_leaky(), Some(&mut [1][..])); + /// assert!(buffer.pop_slice_leaky().is_none()); + /// + /// assert_eq!(buffer.len(), 0); + /// ``` + pub fn pop_slice_leaky(&mut self) -> Option<&mut [B::Item]> { + use std::slice; + + if self.is_empty() { + None + } else { + let (start, out_length) = (self.start, self.len.min(B::size() - self.start)); + + self.advance(out_length); + + unsafe { + Some(slice::from_raw_parts_mut( + self.buffer.ptr_mut().offset(start as isize), + out_length, + )) + } + } + } + + /// A borrowed iterator of this buffer's elements + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// assert_eq!( + /// CircularBuffer::from_array([1, 2, 3, 4]).iter().cloned().collect::>(), + /// vec![1, 2, 3, 4] + /// ); + /// ``` + pub fn iter(&self) -> Iter { + self.into_iter() + } + + /// Iterate over slices of the buffer (of arbitrary size, but in order). + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]); + /// + /// assert_eq!(buffer.pop(), Some(1)); + /// assert!(buffer.push(1).is_none()); + /// + /// let mut iter = buffer.slices(); + /// + /// assert_eq!( + /// iter.collect::>(), + /// vec![&[2, 3, 4][..], &[1]] + /// ); + /// ``` + pub fn slices(&self) -> SlicesIter { + SlicesIter { + buffer: self, + start: self.start, + len: self.len, + } + } + + /// Whether the buffer is empty. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::<[usize; 4]>::new(); + /// + /// assert!(buffer.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Whether the buffer is full (i.e. it is no longer possible to push new elements without + /// popping old ones first). + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]); + /// + /// assert!(buffer.is_full()); + /// ``` + pub fn is_full(&self) -> bool { + self.len == B::size() + } + + /// The number of elements in the buffer. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer: CircularBuffer<[usize; 4]> = CircularBuffer::from_slice(&[1, 2]).unwrap(); + /// + /// assert_eq!(buffer.len(), 2); + /// + /// assert!(buffer.push(1).is_none()); + /// assert!(buffer.push(2).is_none()); + /// + /// assert_eq!(buffer.len(), 4); + /// ``` + pub fn len(&self) -> usize { + self.len + } + + /// The maximum number of elements this buffer can take. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer: CircularBuffer<[usize; 4]> = CircularBuffer::new(); + /// + /// assert_eq!(buffer.capacity(), 4); + /// ``` + pub fn capacity(&self) -> usize { + B::size() + } + + /// Append a single element to the end of the buffer, returning it if it could not be added. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer: CircularBuffer<[usize; 4]> = CircularBuffer::new(); + /// + /// assert_eq!(buffer.len(), 0); + /// + /// assert!(buffer.push(1).is_none()); + /// assert!(buffer.push(2).is_none()); + /// assert!(buffer.push(3).is_none()); + /// assert!(buffer.push(4).is_none()); + /// + /// assert!(buffer.push(5).is_some()); + /// + /// assert_eq!(buffer.len(), 4); + /// ``` + pub fn push(&mut self, element: B::Item) -> Option { + use std::ptr; + + debug_assert!(self.len <= B::size()); + + if self.is_full() { + Some(element) + } else { + let dest = (self.start + self.len) % B::size(); + + unsafe { + ptr::write(self.buffer.ptr_mut().offset(dest as isize), element); + } + self.len += 1; + None + } + } + + /// Remove a single element from the start of the buffer. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]); + /// + /// assert_eq!(buffer.pop(), Some(1)); + /// ``` + pub fn pop(&mut self) -> Option { + use std::ptr; + + if self.is_empty() { + None + } else { + let offset = self.start; + self.advance(1); + + unsafe { Some(ptr::read(self.buffer.ptr_mut().offset(offset as _))) } + } + } + + /// Get a borrow to an element at an index safely (if the index is out of bounds, return + /// `None`). + pub fn get(&self, index: usize) -> Option<&B::Item> { + if index < self.len { + unsafe { Some(self.get_unchecked(index)) } + } else { + None + } + } + + /// Get a borrow to an element at an index unsafely (causes undefined behaviour if the index is + /// out of bounds). + pub unsafe fn get_unchecked(&self, index: usize) -> &B::Item { + use std::mem; + + mem::transmute(self.buffer.ptr().offset( + ((index + self.start) % B::size()) as isize, + )) + } + + // This is not unsafe because it can only leak data, not cause uninit to be read. + fn advance(&mut self, by: usize) { + assert!(by <= self.len); + + self.start = (self.start + by) % B::size(); + self.len -= by; + } +} + +impl Drop for CircularBuffer { + fn drop(&mut self) { + while self.pop_slice().is_some() {} + } +} + +impl IntoIterator for CircularBuffer { + type Item = B::Item; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + IntoIter { buffer: self } + } +} + +impl<'a, B: Array + 'a> IntoIterator for &'a CircularBuffer { + type Item = &'a B::Item; + type IntoIter = Iter<'a, B>; + + fn into_iter(self) -> Self::IntoIter { + Iter { + buffer: self, + remaining: self.len(), + } + } +} + +/// The iteration type returning owned elements of the buffer +pub struct IntoIter { + buffer: CircularBuffer, +} + +impl Iterator for IntoIter { + type Item = B::Item; + + fn next(&mut self) -> Option { + self.buffer.pop() + } +} + +/// The iteration type returning borrows to elements of the buffer +pub struct Iter<'a, B: Array + 'a> { + buffer: &'a CircularBuffer, + remaining: usize, +} + +impl<'a, B: Array + 'a> Iterator for Iter<'a, B> { + type Item = &'a B::Item; + + fn next(&mut self) -> Option { + if self.remaining == 0 { + None + } else { + let remaining = self.remaining; + self.remaining -= 1; + self.buffer.get(self.buffer.len() - remaining) + } + } +} + +/// The iteration type for immutable slices of the circular buffer. See `CircularBuffer::slices`. +pub struct SlicesIter<'a, B: Array + 'a> { + buffer: &'a CircularBuffer, + start: usize, + len: usize, +} + +impl<'a, B: Array + 'a> Iterator for SlicesIter<'a, B> { + type Item = &'a [B::Item]; + + fn next(&mut self) -> Option { + use std::slice; + + if self.len == 0 { + None + } else { + let (start, out_length) = (self.start, self.len.min(B::size() - self.start)); + + self.start = (self.start + out_length) % B::size(); + self.len -= out_length; + + unsafe { + Some(slice::from_raw_parts( + self.buffer.buffer.ptr().offset(start as isize), + out_length, + )) + } + } + } +} + +impl CircularBuffer +where + B::Item: Copy, +{ + /// Create a `CircularBuffer` from a slice of elements, returning `None` if not all the elements + /// can fit in the buffer. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// assert!(CircularBuffer::<[usize; 5]>::from_slice(&[1, 2, 3, 4, 5, 6]).is_none()); + /// assert!(CircularBuffer::<[usize; 5]>::from_slice(&[1, 2, 3, 4]).is_some()); + /// ``` + pub fn from_slice(slice: &[B::Item]) -> Option { + let mut out = Self::new(); + if out.extend_from_slice(slice) { + Some(out) + } else { + None + } + } + + /// Create a `CircularBuffer` from a slice of elements, returning the number of elements copied. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let result = CircularBuffer::<[usize; 5]>::from_slice_prefix(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 20]); + /// assert_eq!(result, (CircularBuffer::from_array([1, 2, 3, 4, 5]), 5)); + /// ``` + pub fn from_slice_prefix(slice: &[B::Item]) -> (Self, usize) { + let mut out = Self::new(); + let num_copied = out.extend_from_slice_prefix(slice); + (out, num_copied) + } + + /// Create a circular buffer from a fixed-size array + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let result = CircularBuffer::from_array([1, 2, 3, 4, 5]); + /// assert_eq!(result.into_iter().collect::>(), vec![1, 2, 3, 4, 5]); + /// ``` + pub fn from_array(slice: B) -> Self { + CircularBuffer { + buffer: ManuallyDrop::new(slice), + start: 0, + len: B::size(), + } + } + + fn write_slice(&mut self, index: usize, slice: &[B::Item]) -> bool { + use std::ptr; + + let mut offset = 0; + + assert!(index <= self.len); + + if slice.len() > self.capacity() - index { + return false; + } + + while offset < slice.len() { + unsafe { + let dest = (index + self.start + offset) % B::size(); + let copy_len = if dest < self.start { + self.start - dest + } else { + B::size() - dest + }.min(slice.len() - offset); + + let slice_ptr = slice.as_ptr().offset(offset as isize); + let ptr = self.buffer.ptr_mut().offset(dest as isize); + + ptr::copy(slice_ptr, ptr, copy_len); + + self.len = self.len.max(index + offset + copy_len); + offset += copy_len; + } + } + + true + } + + fn write_slice_prefix(&mut self, index: usize, slice: &[B::Item]) -> usize { + use std::ptr; + + let mut offset = 0; + + assert!(index <= self.len); + + while !self.is_full() && offset < slice.len() { + unsafe { + let dest = (index + self.start + offset) % B::size(); + let copy_len = if dest < self.start { + self.start - dest + } else { + B::size() - dest + }.min(slice.len() - offset); + + let slice_ptr = slice.as_ptr().offset(offset as isize); + let ptr = self.buffer.ptr_mut().offset(dest as isize); + + ptr::copy(slice_ptr, ptr, copy_len); + + self.len = self.len.max(index + offset + copy_len); + offset += copy_len; + } + } + + offset + } + + /// Append the elements from a slice to the buffer, returning the number of elements copied + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::<[usize; 5]>::from_slice(&[1, 2]).unwrap(); + /// + /// assert_eq!(buffer.iter().cloned().collect::>(), vec![1, 2]); + /// + /// let consumed = buffer.extend_from_slice_prefix(&[1, 2, 3, 4, 5]); + /// + /// assert_eq!(consumed, 3); + /// assert_eq!(buffer.iter().cloned().collect::>(), vec![1, 2, 1, 2, 3]); + /// ``` + #[inline] + pub fn extend_from_slice_prefix(&mut self, slice: &[B::Item]) -> usize { + let len = self.len(); + self.write_slice_prefix(len, slice) + } + + /// Append the elements from a slice to the buffer iff there is enough space for all the + /// elements + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::<[usize; 5]>::from_slice(&[1, 2]).unwrap(); + /// + /// assert_eq!(buffer.iter().cloned().collect::>(), vec![1, 2]); + /// + /// assert!(!buffer.extend_from_slice(&[1, 2, 3, 4, 5])); + /// assert!(buffer.extend_from_slice(&[1, 2, 3])); + /// + /// assert_eq!(buffer.iter().cloned().collect::>(), vec![1, 2, 1, 2, 3]); + /// ``` + #[inline] + pub fn extend_from_slice(&mut self, slice: &[B::Item]) -> bool { + let len = self.len(); + self.write_slice(len, slice) + } +} + +#[cfg(test)] +mod tests { + use super::CircularBuffer; + + #[test] + fn push_pop() { + let mut buffer: CircularBuffer<[usize; 4]> = CircularBuffer::new(); + + assert_eq!(buffer.len(), 0); + + assert!(buffer.push(1).is_none()); + assert!(buffer.push(2).is_none()); + assert!(buffer.push(3).is_none()); + assert!(buffer.push(4).is_none()); + + assert!(buffer.push(5).is_some()); + + assert_eq!(buffer.len(), 4); + + assert_eq!(buffer.pop(), Some(1)); + assert_eq!(buffer.pop(), Some(2)); + assert_eq!(buffer.pop(), Some(3)); + assert_eq!(buffer.pop(), Some(4)); + assert_eq!(buffer.pop(), None); + + assert_eq!(buffer.len(), 0); + } + + #[test] + fn pop_slice() { + let mut buffer: CircularBuffer<[usize; 4]> = CircularBuffer::new(); + + assert_eq!(buffer.len(), 0); + + assert!(buffer.push(1).is_none()); + assert!(buffer.push(2).is_none()); + assert!(buffer.push(3).is_none()); + assert!(buffer.push(4).is_none()); + + assert!(buffer.push(5).is_some()); + + assert_eq!(buffer.len(), 4); + + assert_eq!(buffer.pop(), Some(1)); + assert!(buffer.push(1).is_none()); + + assert_eq!( + buffer.pop_slice().as_ref().map(|s| &s[..]), + Some(&[2, 3, 4][..]) + ); + assert_eq!(buffer.pop_slice().as_ref().map(|s| &s[..]), Some(&[1][..])); + assert!(buffer.pop_slice().is_none()); + + assert_eq!(buffer.len(), 0); + } + + #[test] + fn extend_from_slice() { + let mut buffer: CircularBuffer<[usize; 4]> = CircularBuffer::from_slice(&[1, 2]).unwrap(); + + assert_eq!(buffer.pop(), Some(1)); + assert_eq!(buffer.pop(), Some(2)); + + assert!(buffer.extend_from_slice(&[1, 2, 3, 4])); + + assert_eq!(buffer.iter().cloned().collect::>(), vec![1, 2, 3, 4]) + } +} diff --git a/multiplex-rs/Cargo.toml b/multiplex-rs/Cargo.toml index 346598fb..9f3ccfca 100644 --- a/multiplex-rs/Cargo.toml +++ b/multiplex-rs/Cargo.toml @@ -12,3 +12,4 @@ futures = "0.1" parking_lot = "0.4.8" libp2p-stream-muxer = { path = "../libp2p-stream-muxer" } varint = { path = "../varint-rs" } +circular-buffer = { path = "../circular-buffer" } diff --git a/multiplex-rs/src/lib.rs b/multiplex-rs/src/lib.rs index b53cd87a..b32f55bb 100644 --- a/multiplex-rs/src/lib.rs +++ b/multiplex-rs/src/lib.rs @@ -6,12 +6,14 @@ extern crate varint; extern crate num_bigint; extern crate num_traits; extern crate parking_lot; +extern crate circular_buffer; use bytes::Bytes; +use circular_buffer::CircularBuffer; use futures::prelude::*; use libp2p_stream_muxer::StreamMuxer; use parking_lot::Mutex; -use std::collections::{HashSet, HashMap}; +use std::collections::HashMap; use std::io::{self, Read, Write}; use std::sync::Arc; use tokio_io::{AsyncRead, AsyncWrite}; @@ -35,7 +37,7 @@ enum NextMultiplexState { Ignore, } -enum MultiplexState { +enum MultiplexReadState { Header { state: varint::DecoderState }, BodyLength { state: varint::DecoderState, @@ -53,12 +55,16 @@ enum MultiplexState { Ignore { remaining_bytes: usize }, } -impl Default for MultiplexState { +impl Default for MultiplexReadState { fn default() -> Self { - MultiplexState::Header { state: Default::default() } + MultiplexReadState::Header { state: Default::default() } } } +struct MultiplexWriteState { + buffer: CircularBuffer<[u8; 1024]>, +} + // TODO: Add writing. We should also add some form of "pending packet" so that we can always open at // least one new substream. If this is stored on the substream itself then we can open // infinite new substreams. @@ -72,9 +78,9 @@ impl Default for MultiplexState { // Since if we receive a message to a closed stream we just drop it anyway. struct MultiplexShared { // We use `Option` in order to take ownership of heap allocations within `DecoderState` and - // `BytesMut`. If this is ever observably `None` then something has panicked and the `Mutex` - // will be poisoned. - read_state: Option, + // `BytesMut`. If this is ever observably `None` then something has panicked or the underlying + // stream returned an error. + read_state: Option, stream: T, // true if the stream is open, false otherwise open_streams: HashMap, @@ -122,13 +128,14 @@ unsafe fn create_buffer_for(capacity: usize, inner: &R) -> bytes:: buffer } -fn read_stream( - mut stream_data: Option<(usize, &mut [u8])>, +fn read_stream<'a, O: Into>, T: AsyncRead>( lock: &mut MultiplexShared, + stream_data: O, ) -> io::Result { use num_traits::cast::ToPrimitive; - use MultiplexState::*; + use MultiplexReadState::*; + let mut stream_data = stream_data.into(); let stream_has_been_gracefully_closed = stream_data .as_ref() .and_then(|&(id, _)| lock.open_streams.get(&id)) @@ -201,8 +208,8 @@ fn read_stream( )?; lock.read_state = Some(match next { - Ignore => MultiplexState::Ignore { remaining_bytes: length }, - NewStream(substream_id) => MultiplexState::NewStream { + Ignore => MultiplexReadState::Ignore { remaining_bytes: length }, + NewStream(substream_id) => MultiplexReadState::NewStream { // This is safe as long as we only use `lock.stream` to write to // this field name: unsafe { create_buffer_for(length, &lock.stream) }, @@ -215,12 +222,12 @@ fn read_stream( .map(|is_open| *is_open) .unwrap_or(false); if is_open { - MultiplexState::ParsingMessageBody { + MultiplexReadState::ParsingMessageBody { remaining_bytes: length, substream_id, } } else { - MultiplexState::Ignore { remaining_bytes: length } + MultiplexReadState::Ignore { remaining_bytes: length } } } }); @@ -276,7 +283,7 @@ fn read_stream( remaining_bytes, } => { if let Some((ref mut id, ref mut buf)) = stream_data { - use MultiplexState::*; + use MultiplexReadState::*; if substream_id == *id { if remaining_bytes == 0 { @@ -357,7 +364,7 @@ impl Read for Substream { None => return Err(io::Error::from(io::ErrorKind::WouldBlock)), }; - read_stream(Some((self.id, buf)), &mut lock) + read_stream(&mut lock, (self.id, buf)) } } @@ -456,7 +463,7 @@ impl Stream for InboundStream { }; // Attempt to make progress, but don't block if we can't - match read_stream(None, &mut lock) { + match read_stream(&mut lock, None) { Ok(_) => (), Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => (), Err(err) => return Err(err),