diff --git a/circular-buffer/Cargo.toml b/circular-buffer/Cargo.toml index d0b977c4..2f95cf7e 100644 --- a/circular-buffer/Cargo.toml +++ b/circular-buffer/Cargo.toml @@ -4,4 +4,4 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -smallvec = "0.5.0" \ No newline at end of file +smallvec = { git = "https://github.com/Vurich/rust-smallvec.git", branch = "array-zero" } \ No newline at end of file diff --git a/circular-buffer/src/lib.rs b/circular-buffer/src/lib.rs index fe171460..c74505ee 100644 --- a/circular-buffer/src/lib.rs +++ b/circular-buffer/src/lib.rs @@ -32,7 +32,7 @@ extern crate smallvec; use std::ops::Drop; use std::mem::ManuallyDrop; -use smallvec::Array; +pub use smallvec::Array; use owned_slice::OwnedSlice; @@ -248,12 +248,59 @@ impl CircularBuffer { /// assert_eq!(buffer.len(), 0); /// ``` pub fn pop_slice_leaky(&mut self) -> Option<&mut [B::Item]> { + let len = self.len; + self.pop_first_n_leaky(len) + } + + /// Pop a slice containing up to `n` contiguous elements. Since this buffer is circular it will + /// take a maximum of two calls to this function to drain the buffer entirely. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]); + /// + /// assert_eq!(buffer.pop(), Some(1)); + /// assert!(buffer.push(1).is_none()); + /// + /// assert_eq!(buffer.pop_first_n(2).as_ref().map(<_>::as_ref), Some(&[2, 3][..])); + /// assert_eq!(buffer.pop_first_n(2).as_ref().map(<_>::as_ref), Some(&[4][..])); + /// assert_eq!(buffer.pop_first_n(2).as_ref().map(<_>::as_ref), Some(&[1][..])); + /// assert!(buffer.pop_first_n(2).is_none()); + /// + /// assert_eq!(buffer.len(), 0); + /// ``` + pub fn pop_first_n(&mut self, n: usize) -> Option> { + self.pop_first_n_leaky(n) + .map(|x| unsafe { OwnedSlice::new(x) }) + } + + /// Pop a slice containing up to `n` contiguous elements. Since this buffer is circular it will + /// take a maximum of two calls to this function to drain the buffer entirely. This returns a + /// slice and so any `Drop` types returned from this function will be leaked. + /// + /// ```rust + /// use circular_buffer::CircularBuffer; + /// + /// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]); + /// + /// assert_eq!(buffer.pop(), Some(1)); + /// assert!(buffer.push(1).is_none()); + /// + /// assert_eq!(buffer.pop_first_n_leaky(2), Some(&mut [2, 3][..])); + /// assert_eq!(buffer.pop_first_n_leaky(2), Some(&mut [4][..])); + /// assert_eq!(buffer.pop_first_n_leaky(2), Some(&mut [1][..])); + /// assert!(buffer.pop_first_n_leaky(2).is_none()); + /// + /// assert_eq!(buffer.len(), 0); + /// ``` + pub fn pop_first_n_leaky(&mut self, n: usize) -> Option<&mut [B::Item]> { use std::slice; if self.is_empty() { None } else { - let (start, out_length) = (self.start, self.len.min(B::size() - self.start)); + let (start, out_length) = (self.start, self.len.min(B::size() - self.start).min(n)); self.advance(out_length); @@ -359,6 +406,9 @@ impl CircularBuffer { /// /// assert_eq!(buffer.capacity(), 4); /// ``` + // We inline-always so that this can get const-folded properly. This is really important for + // stuff like `if buf.capacity() > 0 { ... }`. + #[inline(always)] pub fn capacity(&self) -> usize { B::size() } diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index d490e7d1..8cbbd0f3 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -76,8 +76,8 @@ fn main() { }) // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(multiplex::MultiplexConfig) - // The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a + .with_upgrade(multiplex::MultiplexConfig::new()) + // The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 5055e2c8..aa0fca83 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -75,8 +75,8 @@ fn main() { }) // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(multiplex::MultiplexConfig) - // The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a + .with_upgrade(multiplex::MultiplexConfig::new()) + // The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. diff --git a/example/examples/floodsub.rs b/example/examples/floodsub.rs index 4b5dd4c6..e8228615 100644 --- a/example/examples/floodsub.rs +++ b/example/examples/floodsub.rs @@ -78,8 +78,8 @@ fn main() { }) // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(multiplex::MultiplexConfig) - // The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a + .with_upgrade(multiplex::MultiplexConfig::new()) + // The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. diff --git a/example/examples/kademlia.rs b/example/examples/kademlia.rs index f30b9a52..cfb3b763 100644 --- a/example/examples/kademlia.rs +++ b/example/examples/kademlia.rs @@ -83,8 +83,8 @@ fn main() { }) // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(multiplex::MultiplexConfig) - // The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a + .with_upgrade(multiplex::MultiplexConfig::new()) + // The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. diff --git a/example/examples/ping-client.rs b/example/examples/ping-client.rs index e3c98ad3..2a721408 100644 --- a/example/examples/ping-client.rs +++ b/example/examples/ping-client.rs @@ -68,8 +68,8 @@ fn main() { }) // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(multiplex::MultiplexConfig) - // The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a + .with_upgrade(multiplex::MultiplexConfig::new()) + // The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. diff --git a/multiplex-rs/Cargo.toml b/multiplex-rs/Cargo.toml index f1f5516f..7afbb8b1 100644 --- a/multiplex-rs/Cargo.toml +++ b/multiplex-rs/Cargo.toml @@ -12,6 +12,7 @@ futures = "0.1" parking_lot = "0.4.8" arrayvec = "0.4.6" rand = "0.3.17" +circular-buffer = { path = "../circular-buffer" } libp2p-swarm = { path = "../swarm" } varint = { path = "../varint-rs" } error-chain = "0.11.0" diff --git a/multiplex-rs/src/header.rs b/multiplex-rs/src/header.rs index e3a01de1..f0346eb3 100644 --- a/multiplex-rs/src/header.rs +++ b/multiplex-rs/src/header.rs @@ -53,20 +53,6 @@ impl MultiplexHeader { } } - pub fn close(id: u32, end: Endpoint) -> Self { - MultiplexHeader { - substream_id: id, - packet_type: PacketType::Close(end), - } - } - - pub fn reset(id: u32, end: Endpoint) -> Self { - MultiplexHeader { - substream_id: id, - packet_type: PacketType::Reset(end), - } - } - pub fn message(id: u32, end: Endpoint) -> Self { MultiplexHeader { substream_id: id, diff --git a/multiplex-rs/src/lib.rs b/multiplex-rs/src/lib.rs index 2a6d665e..ba85fc81 100644 --- a/multiplex-rs/src/lib.rs +++ b/multiplex-rs/src/lib.rs @@ -20,6 +20,7 @@ extern crate arrayvec; extern crate bytes; +extern crate circular_buffer; #[macro_use] extern crate error_chain; extern crate futures; @@ -38,6 +39,7 @@ mod shared; mod header; use bytes::Bytes; +use circular_buffer::Array; use futures::{Async, Future, Poll}; use futures::future::{self, FutureResult}; use header::MultiplexHeader; @@ -65,15 +67,15 @@ use write::write_stream; // In the second state, the substream ID is known. Only this substream can progress until the packet // is consumed. -pub struct Substream { +pub struct Substream { id: u32, end: Endpoint, name: Option, - state: Arc>>, + state: Arc>>, buffer: Option>, } -impl Drop for Substream { +impl Drop for Substream { fn drop(&mut self) { let mut lock = self.state.lock().wait().expect("This should never fail"); @@ -81,12 +83,12 @@ impl Drop for Substream { } } -impl Substream { +impl Substream { fn new>>( id: u32, end: Endpoint, name: B, - state: Arc>>, + state: Arc>>, ) -> Self { let name = name.into(); @@ -109,7 +111,7 @@ impl Substream { } // TODO: We always zero the buffer, we should delegate to the inner stream. -impl Read for Substream { +impl> Read for Substream { fn read(&mut self, buf: &mut [u8]) -> io::Result { let mut lock = match self.state.poll_lock() { Async::Ready(lock) => lock, @@ -120,9 +122,9 @@ impl Read for Substream { } } -impl AsyncRead for Substream {} +impl> AsyncRead for Substream {} -impl Write for Substream { +impl Write for Substream { fn write(&mut self, buf: &[u8]) -> io::Result { let mut lock = match self.state.poll_lock() { Async::Ready(lock) => lock, @@ -156,19 +158,19 @@ impl Write for Substream { } } -impl AsyncWrite for Substream { +impl AsyncWrite for Substream { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(Async::Ready(())) } } -pub struct InboundFuture { +pub struct InboundFuture { end: Endpoint, - state: Arc>>, + state: Arc>>, } -impl Future for InboundFuture { - type Item = Substream; +impl> Future for InboundFuture { + type Item = Substream; type Error = io::Error; fn poll(&mut self) -> Poll { @@ -205,14 +207,14 @@ impl Future for InboundFuture { } } -pub struct OutboundFuture { +pub struct OutboundFuture { meta: Arc, current_id: Option<(io::Cursor, u32)>, - state: Arc>>, + state: Arc>>, } -impl OutboundFuture { - fn new(muxer: Multiplex) -> Self { +impl OutboundFuture { + fn new(muxer: BufferedMultiplex) -> Self { OutboundFuture { current_id: None, meta: muxer.meta, @@ -225,8 +227,8 @@ fn nonce_to_id(id: usize, end: Endpoint) -> u32 { id as u32 * 2 + if end == Endpoint::Dialer { 0 } else { 1 } } -impl Future for OutboundFuture { - type Item = Substream; +impl Future for OutboundFuture { + type Item = Substream; type Error = io::Error; fn poll(&mut self) -> Poll { @@ -284,23 +286,25 @@ pub struct MultiplexMetadata { end: Endpoint, } -pub struct Multiplex { +pub type Multiplex = BufferedMultiplex; + +pub struct BufferedMultiplex { meta: Arc, - state: Arc>>, + state: Arc>>, } -impl Clone for Multiplex { +impl Clone for BufferedMultiplex { fn clone(&self) -> Self { - Multiplex { + BufferedMultiplex { meta: self.meta.clone(), state: self.state.clone(), } } } -impl Multiplex { +impl BufferedMultiplex { pub fn new(stream: T, end: Endpoint) -> Self { - Multiplex { + BufferedMultiplex { meta: Arc::new(MultiplexMetadata { nonce: AtomicUsize::new(0), end, @@ -318,10 +322,10 @@ impl Multiplex { } } -impl StreamMuxer for Multiplex { - type Substream = Substream; - type OutboundSubstream = OutboundFuture; - type InboundSubstream = InboundFuture; +impl> StreamMuxer for BufferedMultiplex { + type Substream = Substream; + type OutboundSubstream = OutboundFuture; + type InboundSubstream = InboundFuture; fn inbound(self) -> Self::InboundSubstream { InboundFuture { @@ -335,21 +339,35 @@ impl StreamMuxer for Multiplex { } } -#[derive(Debug, Copy, Clone)] -pub struct MultiplexConfig; +pub type MultiplexConfig = BufferedMultiplexConfig<[u8; 0]>; -impl ConnectionUpgrade for MultiplexConfig +#[derive(Debug, Copy, Clone)] +pub struct BufferedMultiplexConfig(std::marker::PhantomData); + +impl Default for BufferedMultiplexConfig { + fn default() -> Self { + BufferedMultiplexConfig(std::marker::PhantomData) + } +} + +impl BufferedMultiplexConfig { + pub fn new() -> Self { + Self::default() + } +} + +impl ConnectionUpgrade for BufferedMultiplexConfig where C: AsyncRead + AsyncWrite, { - type Output = Multiplex; - type Future = FutureResult, io::Error>; + type Output = BufferedMultiplex; + type Future = FutureResult, io::Error>; type UpgradeIdentifier = (); type NamesIter = iter::Once<(Bytes, ())>; #[inline] fn upgrade(self, i: C, _: (), end: Endpoint, _: &Multiaddr) -> Self::Future { - future::ok(Multiplex::new(i, end)) + future::ok(BufferedMultiplex::new(i, end)) } #[inline] @@ -361,6 +379,7 @@ where #[cfg(test)] mod tests { use super::*; + use header::PacketType; use std::io; use tokio_io::io as tokio; @@ -523,7 +542,10 @@ mod tests { .chain( varint::encode( // ID for an unopened stream: 1 - MultiplexHeader::close(0, Endpoint::Dialer).to_u64(), + MultiplexHeader { + packet_type: PacketType::Close(Endpoint::Dialer), + substream_id: 0, + }.to_u64(), ).into_iter(), ) .chain(varint::encode(dummy_length)) @@ -614,4 +636,70 @@ mod tests { assert_eq!(&out[1..0x14 - 1], b"/multistream/1.0.0"); assert_eq!(out[0x14 - 1], 0x0a); } + + #[test] + fn can_buffer() { + type Buffer = [u8; 1024]; + + let stream = io::Cursor::new(Vec::new()); + + let mplex = BufferedMultiplex::<_, Buffer>::dial(stream); + + let mut outbound: Vec> = vec![]; + + for _ in 0..5 { + outbound.push(mplex.clone().outbound().wait().unwrap()); + } + + outbound.sort_by_key(|a| a.id()); + + for (i, substream) in outbound.iter_mut().enumerate() { + assert!( + tokio::write_all(substream, i.to_string().as_bytes()) + .wait() + .is_ok() + ); + } + + let stream = io::Cursor::new(mplex.state.lock().wait().unwrap().stream.get_ref().clone()); + + let mplex = BufferedMultiplex::<_, Buffer>::listen(stream); + + let mut inbound: Vec> = vec![]; + + for _ in 0..5 { + let inb: Substream<_, Buffer> = mplex.clone().inbound().wait().unwrap(); + inbound.push(inb); + } + + inbound.sort_by_key(|a| a.id()); + + // Skip the first substream and let it be cached. + for (mut substream, outbound) in inbound.iter_mut().zip(outbound.iter()).skip(1) { + let id = outbound.id(); + assert_eq!(id, substream.id()); + assert_eq!( + substream + .name() + .and_then(|bytes| String::from_utf8(bytes.to_vec()).ok()), + Some(id.to_string()) + ); + + let mut buf = [0; 3]; + assert_eq!(tokio::read(&mut substream, &mut buf).wait().unwrap().2, 1); + } + + let (mut substream, outbound) = (&mut inbound[0], &outbound[0]); + let id = outbound.id(); + assert_eq!(id, substream.id()); + assert_eq!( + substream + .name() + .and_then(|bytes| String::from_utf8(bytes.to_vec()).ok()), + Some(id.to_string()) + ); + + let mut buf = [0; 3]; + assert_eq!(tokio::read(&mut substream, &mut buf).wait().unwrap().2, 1); + } } diff --git a/multiplex-rs/src/read.rs b/multiplex-rs/src/read.rs index ab6a36c9..2559d80e 100644 --- a/multiplex-rs/src/read.rs +++ b/multiplex-rs/src/read.rs @@ -25,6 +25,7 @@ use header::{MultiplexHeader, PacketType}; use std::io; use tokio_io::AsyncRead; use shared::SubstreamMetadata; +use circular_buffer::Array; pub enum NextMultiplexState { NewStream(u32), @@ -76,14 +77,97 @@ fn create_buffer(capacity: usize) -> bytes::BytesMut { buffer } -pub fn read_stream<'a, O: Into>, T: AsyncRead>( - lock: &mut ::shared::MultiplexShared, +fn block_on_wrong_stream>( + substream_id: u32, + remaining_bytes: usize, + lock: &mut ::shared::MultiplexShared, +) -> io::Result<()> { + use std::{mem, slice}; + + lock.read_state = Some(MultiplexReadState::ParsingMessageBody { + substream_id, + remaining_bytes, + }); + + if let Some((tasks, cache)) = lock.open_streams + .entry(substream_id) + .or_insert_with(|| SubstreamMetadata::new_open()) + .open_meta_mut() + .map(|cur| { + ( + mem::replace(&mut cur.read, Default::default()), + &mut cur.read_cache, + ) + }) { + // We check `cache.capacity()` since that can totally statically remove this branch in the + // `== 0` path. + if cache.capacity() > 0 && cache.len() < cache.capacity() { + let mut buf: Buf = unsafe { mem::uninitialized() }; + + // Can't fail because `cache.len() >= 0`, + // `cache.len() <= cache.capacity()` and + // `cache.capacity() == mem::size_of::()` + let buf_prefix = unsafe { + let max_that_fits_in_buffer = cache.capacity() - cache.len(); + // We know this won't panic because of the earlier + // `number_read >= buf.len()` check + let new_len = max_that_fits_in_buffer.min(remaining_bytes); + + slice::from_raw_parts_mut(buf.ptr_mut(), new_len) + }; + + match lock.stream.read(buf_prefix) { + Ok(consumed) => { + let new_remaining = remaining_bytes - consumed; + + assert!(cache.extend_from_slice(&buf_prefix[..consumed])); + + lock.read_state = Some(MultiplexReadState::ParsingMessageBody { + substream_id, + remaining_bytes: new_remaining, + }); + } + Err(err) => { + if err.kind() != io::ErrorKind::WouldBlock { + for task in tasks { + task.notify(); + } + + return Err(err); + } + } + } + } + + for task in tasks { + task.notify(); + } + } + + Ok(()) +} + +pub fn read_stream< + 'a, + Buf: Array, + O: Into>, + T: AsyncRead, +>( + lock: &mut ::shared::MultiplexShared, stream_data: O, ) -> io::Result { - use self::MultiplexReadState::*; - use std::mem; + read_stream_internal(lock, stream_data.into()) +} - let mut stream_data = stream_data.into(); +fn read_stream_internal>( + lock: &mut ::shared::MultiplexShared, + mut stream_data: Option<(u32, &mut [u8])>, +) -> io::Result { + use self::MultiplexReadState::*; + + // This is only true if a stream exists and it has been closed in a "graceful" manner, so we + // can return `Ok(0)` like the `Read` trait requests. In any other case we want to return + // `WouldBlock` let stream_has_been_gracefully_closed = stream_data .as_ref() .and_then(|&(id, _)| lock.open_streams.get(&id)) @@ -96,13 +180,34 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( Err(io::ErrorKind::WouldBlock.into()) }; - if let Some((ref id, ..)) = stream_data { + if let Some((ref mut id, ref mut buf)) = stream_data { if let Some(cur) = lock.open_streams .entry(*id) .or_insert_with(|| SubstreamMetadata::new_open()) - .read_tasks_mut() + .open_meta_mut() { - cur.push(task::current()); + cur.read.push(task::current()); + + let cache = &mut cur.read_cache; + + if !cache.is_empty() { + let mut consumed = 0; + loop { + let cur_buf = &mut buf[consumed..]; + if cur_buf.is_empty() { + break; + } + + if let Some(out) = cache.pop_first_n_leaky(cur_buf.len()) { + cur_buf[..out.len()].copy_from_slice(out); + consumed += out.len(); + } else { + break; + }; + } + + on_block = Ok(consumed); + } } } @@ -284,11 +389,11 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( if let Some((ref mut id, ref mut buf)) = stream_data { use MultiplexReadState::*; + let number_read = *on_block.as_ref().unwrap_or(&0); + if remaining_bytes == 0 { lock.read_state = None; } else if substream_id == *id { - let number_read = *on_block.as_ref().unwrap_or(&0); - if number_read >= buf.len() { lock.read_state = Some(ParsingMessageBody { substream_id, @@ -307,6 +412,11 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( lock.stream.read(slice) }; + lock.read_state = Some(ParsingMessageBody { + substream_id, + remaining_bytes, + }); + match read_result { Ok(consumed) => { let new_remaining = remaining_bytes - consumed; @@ -319,58 +429,22 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( on_block = Ok(number_read + consumed); } Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - lock.read_state = Some(ParsingMessageBody { - substream_id, - remaining_bytes, - }); - return on_block; } Err(other) => { - lock.read_state = Some(ParsingMessageBody { - substream_id, - remaining_bytes, - }); - return Err(other); } } } else { - lock.read_state = Some(ParsingMessageBody { - substream_id, - remaining_bytes, - }); - - if let Some(tasks) = lock.open_streams - .get_mut(&substream_id) - .and_then(SubstreamMetadata::read_tasks_mut) - .map(|cur| mem::replace(cur, Default::default())) - { - for task in tasks { - task.notify(); - } - } - // We cannot make progress here, another stream has to accept this packet + block_on_wrong_stream(substream_id, remaining_bytes, lock)?; + return on_block; } } else { - lock.read_state = Some(ParsingMessageBody { - substream_id, - remaining_bytes, - }); + // We cannot make progress here, another stream has to accept this packet + block_on_wrong_stream(substream_id, remaining_bytes, lock)?; - if let Some(tasks) = lock.open_streams - .get_mut(&substream_id) - .and_then(SubstreamMetadata::read_tasks_mut) - .map(|cur| mem::replace(cur, Default::default())) - { - for task in tasks { - task.notify(); - } - } - - // We cannot make progress here, a stream has to accept this packet return on_block; } } diff --git a/multiplex-rs/src/shared.rs b/multiplex-rs/src/shared.rs index 9d4fce22..c16b8f5a 100644 --- a/multiplex-rs/src/shared.rs +++ b/multiplex-rs/src/shared.rs @@ -21,6 +21,7 @@ use read::MultiplexReadState; use write::MultiplexWriteState; +use circular_buffer::{Array, CircularBuffer}; use std::collections::HashMap; use bytes::Bytes; use arrayvec::ArrayVec; @@ -30,17 +31,24 @@ const BUF_SIZE: usize = 1024; pub type ByteBuf = ArrayVec<[u8; BUF_SIZE]>; -pub enum SubstreamMetadata { +pub enum SubstreamMetadata { Closed, - Open { read: Vec, write: Vec }, + Open(OpenSubstreamMetadata), } -impl SubstreamMetadata { +pub struct OpenSubstreamMetadata { + pub read_cache: CircularBuffer, + pub read: Vec, + pub write: Vec, +} + +impl SubstreamMetadata { pub fn new_open() -> Self { - SubstreamMetadata::Open { + SubstreamMetadata::Open(OpenSubstreamMetadata { + read_cache: Default::default(), read: Default::default(), write: Default::default(), - } + }) } pub fn open(&self) -> bool { @@ -50,24 +58,17 @@ impl SubstreamMetadata { } } - pub fn read_tasks_mut(&mut self) -> Option<&mut Vec> { + pub fn open_meta_mut(&mut self) -> Option<&mut OpenSubstreamMetadata> { match *self { SubstreamMetadata::Closed => None, - SubstreamMetadata::Open { ref mut read, .. } => Some(read), - } - } - - pub fn write_tasks_mut(&mut self) -> Option<&mut Vec> { - match *self { - SubstreamMetadata::Closed => None, - SubstreamMetadata::Open { ref mut write, .. } => Some(write), + SubstreamMetadata::Open(ref mut meta) => Some(meta), } } } // TODO: Split reading and writing into different structs and have information shared between the // two in a `RwLock`, since `open_streams` and `to_open` are mostly read-only. -pub struct MultiplexShared { +pub struct MultiplexShared { // We use `Option` in order to take ownership of heap allocations within `DecoderState` and // `BytesMut`. If this is ever observably `None` then something has panicked or the underlying // stream returned an error. @@ -75,7 +76,7 @@ pub struct MultiplexShared { pub write_state: Option, pub stream: T, // true if the stream is open, false otherwise - pub open_streams: HashMap, + pub open_streams: HashMap>, pub meta_write_tasks: Vec, // TODO: Should we use a version of this with a fixed size that doesn't allocate and return // `WouldBlock` if it's full? Even if we ignore or size-cap names you can still open 2^32 @@ -83,7 +84,7 @@ pub struct MultiplexShared { pub to_open: HashMap>, } -impl MultiplexShared { +impl MultiplexShared { pub fn new(stream: T) -> Self { MultiplexShared { read_state: Default::default(), @@ -98,10 +99,7 @@ impl MultiplexShared { pub fn open_stream(&mut self, id: u32) -> bool { self.open_streams .entry(id) - .or_insert(SubstreamMetadata::Open { - read: Default::default(), - write: Default::default(), - }) + .or_insert(SubstreamMetadata::new_open()) .open() } diff --git a/multiplex-rs/src/write.rs b/multiplex-rs/src/write.rs index 5eaf40f1..958d792c 100644 --- a/multiplex-rs/src/write.rs +++ b/multiplex-rs/src/write.rs @@ -21,6 +21,7 @@ use shared::{ByteBuf, MultiplexShared, SubstreamMetadata}; use header::MultiplexHeader; +use circular_buffer; use varint; use futures::task; use std::io; @@ -69,8 +70,8 @@ pub enum MultiplexWriteStateInner { Body { size: usize }, } -pub fn write_stream( - lock: &mut MultiplexShared, +pub fn write_stream( + lock: &mut MultiplexShared, write_request: WriteRequest, buf: &mut io::Cursor, ) -> io::Result { @@ -103,15 +104,15 @@ pub fn write_stream( if let Some(cur) = lock.open_streams .entry(id) .or_insert_with(|| SubstreamMetadata::new_open()) - .write_tasks_mut() + .open_meta_mut() { - cur.push(task::current()); + cur.write.push(task::current()); } if let Some(tasks) = lock.open_streams .get_mut(&request.header.substream_id) - .and_then(SubstreamMetadata::write_tasks_mut) - .map(|cur| mem::replace(cur, Default::default())) + .and_then(SubstreamMetadata::open_meta_mut) + .map(|cur| mem::replace(&mut cur.write, Default::default())) { for task in tasks { task.notify(); @@ -129,8 +130,8 @@ pub fn write_stream( if let Some(tasks) = lock.open_streams .get_mut(&request.header.substream_id) - .and_then(SubstreamMetadata::write_tasks_mut) - .map(|cur| mem::replace(cur, Default::default())) + .and_then(SubstreamMetadata::open_meta_mut) + .map(|cur| mem::replace(&mut cur.write, Default::default())) { for task in tasks { task.notify(); @@ -147,9 +148,9 @@ pub fn write_stream( if let Some(cur) = lock.open_streams .entry(id) .or_insert_with(|| SubstreamMetadata::new_open()) - .write_tasks_mut() + .open_meta_mut() { - cur.push(task::current()); + cur.write.push(task::current()); } for task in mem::replace(&mut lock.meta_write_tasks, Default::default()) { diff --git a/multiplex-rs/tests/two_peers.rs b/multiplex-rs/tests/two_peers.rs index ad30f22c..c3e00ba6 100644 --- a/multiplex-rs/tests/two_peers.rs +++ b/multiplex-rs/tests/two_peers.rs @@ -43,7 +43,8 @@ fn client_to_server_outbound() { let bg_thread = thread::spawn(move || { let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig); + let transport = + TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); let (listener, addr) = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -72,7 +73,7 @@ fn client_to_server_outbound() { }); let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig); + let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); let future = transport .dial(rx.recv().unwrap()) @@ -94,7 +95,8 @@ fn client_to_server_inbound() { let bg_thread = thread::spawn(move || { let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig); + let transport = + TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); let (listener, addr) = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -123,7 +125,7 @@ fn client_to_server_inbound() { }); let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig); + let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); let future = transport .dial(rx.recv().unwrap()) diff --git a/swarm/tests/multiplex.rs b/swarm/tests/multiplex.rs index 97ff2557..19c6c58e 100644 --- a/swarm/tests/multiplex.rs +++ b/swarm/tests/multiplex.rs @@ -79,7 +79,7 @@ fn client_to_server_outbound() { let bg_thread = thread::spawn(move || { let mut core = Core::new().unwrap(); let transport = TcpConfig::new(core.handle()) - .with_upgrade(multiplex::MultiplexConfig) + .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); let (listener, addr) = transport @@ -109,7 +109,7 @@ fn client_to_server_outbound() { }); let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig); + let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); let future = transport .dial(rx.recv().unwrap()) @@ -133,7 +133,7 @@ fn connection_reused_for_dialing() { let bg_thread = thread::spawn(move || { let mut core = Core::new().unwrap(); let transport = OnlyOnce::from(TcpConfig::new(core.handle())) - .with_upgrade(multiplex::MultiplexConfig) + .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); let (listener, addr) = transport @@ -175,7 +175,7 @@ fn connection_reused_for_dialing() { let mut core = Core::new().unwrap(); let transport = OnlyOnce::from(TcpConfig::new(core.handle())) - .with_upgrade(multiplex::MultiplexConfig) + .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); let listen_addr = rx.recv().unwrap(); @@ -211,8 +211,8 @@ fn use_opened_listen_to_dial() { let bg_thread = thread::spawn(move || { let mut core = Core::new().unwrap(); - let transport = - OnlyOnce::from(TcpConfig::new(core.handle())).with_upgrade(multiplex::MultiplexConfig); + let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + .with_upgrade(multiplex::MultiplexConfig::new()); let (listener, addr) = transport .clone() @@ -254,7 +254,7 @@ fn use_opened_listen_to_dial() { let mut core = Core::new().unwrap(); let transport = OnlyOnce::from(TcpConfig::new(core.handle())) - .with_upgrade(multiplex::MultiplexConfig) + .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); let listen_addr = rx.recv().unwrap();