mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Add buffering to Substream
using circular-buffer
(#154)
* Add buffering to `Substream` using `circular-buffer` * Fix freeze and add tests * Fix tests * fmt
This commit is contained in:
parent
5c4aefe457
commit
c487b489ce
@ -4,4 +4,4 @@ version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[dependencies]
|
||||
smallvec = "0.5.0"
|
||||
smallvec = { git = "https://github.com/Vurich/rust-smallvec.git", branch = "array-zero" }
|
@ -32,7 +32,7 @@ extern crate smallvec;
|
||||
use std::ops::Drop;
|
||||
use std::mem::ManuallyDrop;
|
||||
|
||||
use smallvec::Array;
|
||||
pub use smallvec::Array;
|
||||
|
||||
use owned_slice::OwnedSlice;
|
||||
|
||||
@ -248,12 +248,59 @@ impl<B: Array> CircularBuffer<B> {
|
||||
/// assert_eq!(buffer.len(), 0);
|
||||
/// ```
|
||||
pub fn pop_slice_leaky(&mut self) -> Option<&mut [B::Item]> {
|
||||
let len = self.len;
|
||||
self.pop_first_n_leaky(len)
|
||||
}
|
||||
|
||||
/// Pop a slice containing up to `n` contiguous elements. Since this buffer is circular it will
|
||||
/// take a maximum of two calls to this function to drain the buffer entirely.
|
||||
///
|
||||
/// ```rust
|
||||
/// use circular_buffer::CircularBuffer;
|
||||
///
|
||||
/// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]);
|
||||
///
|
||||
/// assert_eq!(buffer.pop(), Some(1));
|
||||
/// assert!(buffer.push(1).is_none());
|
||||
///
|
||||
/// assert_eq!(buffer.pop_first_n(2).as_ref().map(<_>::as_ref), Some(&[2, 3][..]));
|
||||
/// assert_eq!(buffer.pop_first_n(2).as_ref().map(<_>::as_ref), Some(&[4][..]));
|
||||
/// assert_eq!(buffer.pop_first_n(2).as_ref().map(<_>::as_ref), Some(&[1][..]));
|
||||
/// assert!(buffer.pop_first_n(2).is_none());
|
||||
///
|
||||
/// assert_eq!(buffer.len(), 0);
|
||||
/// ```
|
||||
pub fn pop_first_n(&mut self, n: usize) -> Option<OwnedSlice<B::Item>> {
|
||||
self.pop_first_n_leaky(n)
|
||||
.map(|x| unsafe { OwnedSlice::new(x) })
|
||||
}
|
||||
|
||||
/// Pop a slice containing up to `n` contiguous elements. Since this buffer is circular it will
|
||||
/// take a maximum of two calls to this function to drain the buffer entirely. This returns a
|
||||
/// slice and so any `Drop` types returned from this function will be leaked.
|
||||
///
|
||||
/// ```rust
|
||||
/// use circular_buffer::CircularBuffer;
|
||||
///
|
||||
/// let mut buffer = CircularBuffer::from_array([1, 2, 3, 4]);
|
||||
///
|
||||
/// assert_eq!(buffer.pop(), Some(1));
|
||||
/// assert!(buffer.push(1).is_none());
|
||||
///
|
||||
/// assert_eq!(buffer.pop_first_n_leaky(2), Some(&mut [2, 3][..]));
|
||||
/// assert_eq!(buffer.pop_first_n_leaky(2), Some(&mut [4][..]));
|
||||
/// assert_eq!(buffer.pop_first_n_leaky(2), Some(&mut [1][..]));
|
||||
/// assert!(buffer.pop_first_n_leaky(2).is_none());
|
||||
///
|
||||
/// assert_eq!(buffer.len(), 0);
|
||||
/// ```
|
||||
pub fn pop_first_n_leaky(&mut self, n: usize) -> Option<&mut [B::Item]> {
|
||||
use std::slice;
|
||||
|
||||
if self.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let (start, out_length) = (self.start, self.len.min(B::size() - self.start));
|
||||
let (start, out_length) = (self.start, self.len.min(B::size() - self.start).min(n));
|
||||
|
||||
self.advance(out_length);
|
||||
|
||||
@ -359,6 +406,9 @@ impl<B: Array> CircularBuffer<B> {
|
||||
///
|
||||
/// assert_eq!(buffer.capacity(), 4);
|
||||
/// ```
|
||||
// We inline-always so that this can get const-folded properly. This is really important for
|
||||
// stuff like `if buf.capacity() > 0 { ... }`.
|
||||
#[inline(always)]
|
||||
pub fn capacity(&self) -> usize {
|
||||
B::size()
|
||||
}
|
||||
|
@ -76,8 +76,8 @@ fn main() {
|
||||
})
|
||||
|
||||
// On top of plaintext or secio, we will use the multiplex protocol.
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a
|
||||
// `Transport` because the output of the upgrade is not a stream but a controller for
|
||||
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
|
||||
// a `Transport`.
|
||||
|
@ -75,8 +75,8 @@ fn main() {
|
||||
})
|
||||
|
||||
// On top of plaintext or secio, we will use the multiplex protocol.
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a
|
||||
// `Transport` because the output of the upgrade is not a stream but a controller for
|
||||
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
|
||||
// a `Transport`.
|
||||
|
@ -78,8 +78,8 @@ fn main() {
|
||||
})
|
||||
|
||||
// On top of plaintext or secio, we will use the multiplex protocol.
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a
|
||||
// `Transport` because the output of the upgrade is not a stream but a controller for
|
||||
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
|
||||
// a `Transport`.
|
||||
|
@ -83,8 +83,8 @@ fn main() {
|
||||
})
|
||||
|
||||
// On top of plaintext or secio, we will use the multiplex protocol.
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a
|
||||
// `Transport` because the output of the upgrade is not a stream but a controller for
|
||||
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
|
||||
// a `Transport`.
|
||||
|
@ -68,8 +68,8 @@ fn main() {
|
||||
})
|
||||
|
||||
// On top of plaintext or secio, we will use the multiplex protocol.
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a
|
||||
// `Transport` because the output of the upgrade is not a stream but a controller for
|
||||
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
|
||||
// a `Transport`.
|
||||
|
@ -12,6 +12,7 @@ futures = "0.1"
|
||||
parking_lot = "0.4.8"
|
||||
arrayvec = "0.4.6"
|
||||
rand = "0.3.17"
|
||||
circular-buffer = { path = "../circular-buffer" }
|
||||
libp2p-swarm = { path = "../swarm" }
|
||||
varint = { path = "../varint-rs" }
|
||||
error-chain = "0.11.0"
|
||||
|
@ -53,20 +53,6 @@ impl MultiplexHeader {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(id: u32, end: Endpoint) -> Self {
|
||||
MultiplexHeader {
|
||||
substream_id: id,
|
||||
packet_type: PacketType::Close(end),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(id: u32, end: Endpoint) -> Self {
|
||||
MultiplexHeader {
|
||||
substream_id: id,
|
||||
packet_type: PacketType::Reset(end),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn message(id: u32, end: Endpoint) -> Self {
|
||||
MultiplexHeader {
|
||||
substream_id: id,
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
extern crate arrayvec;
|
||||
extern crate bytes;
|
||||
extern crate circular_buffer;
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
extern crate futures;
|
||||
@ -38,6 +39,7 @@ mod shared;
|
||||
mod header;
|
||||
|
||||
use bytes::Bytes;
|
||||
use circular_buffer::Array;
|
||||
use futures::{Async, Future, Poll};
|
||||
use futures::future::{self, FutureResult};
|
||||
use header::MultiplexHeader;
|
||||
@ -65,15 +67,15 @@ use write::write_stream;
|
||||
// In the second state, the substream ID is known. Only this substream can progress until the packet
|
||||
// is consumed.
|
||||
|
||||
pub struct Substream<T> {
|
||||
pub struct Substream<T, Buf: Array = [u8; 0]> {
|
||||
id: u32,
|
||||
end: Endpoint,
|
||||
name: Option<Bytes>,
|
||||
state: Arc<Mutex<MultiplexShared<T>>>,
|
||||
state: Arc<Mutex<MultiplexShared<T, Buf>>>,
|
||||
buffer: Option<io::Cursor<ByteBuf>>,
|
||||
}
|
||||
|
||||
impl<T> Drop for Substream<T> {
|
||||
impl<T, Buf: Array> Drop for Substream<T, Buf> {
|
||||
fn drop(&mut self) {
|
||||
let mut lock = self.state.lock().wait().expect("This should never fail");
|
||||
|
||||
@ -81,12 +83,12 @@ impl<T> Drop for Substream<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Substream<T> {
|
||||
impl<T, Buf: Array> Substream<T, Buf> {
|
||||
fn new<B: Into<Option<Bytes>>>(
|
||||
id: u32,
|
||||
end: Endpoint,
|
||||
name: B,
|
||||
state: Arc<Mutex<MultiplexShared<T>>>,
|
||||
state: Arc<Mutex<MultiplexShared<T, Buf>>>,
|
||||
) -> Self {
|
||||
let name = name.into();
|
||||
|
||||
@ -109,7 +111,7 @@ impl<T> Substream<T> {
|
||||
}
|
||||
|
||||
// TODO: We always zero the buffer, we should delegate to the inner stream.
|
||||
impl<T: AsyncRead> Read for Substream<T> {
|
||||
impl<T: AsyncRead, Buf: Array<Item = u8>> Read for Substream<T, Buf> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let mut lock = match self.state.poll_lock() {
|
||||
Async::Ready(lock) => lock,
|
||||
@ -120,9 +122,9 @@ impl<T: AsyncRead> Read for Substream<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> AsyncRead for Substream<T> {}
|
||||
impl<T: AsyncRead, Buf: Array<Item = u8>> AsyncRead for Substream<T, Buf> {}
|
||||
|
||||
impl<T: AsyncWrite> Write for Substream<T> {
|
||||
impl<T: AsyncWrite, Buf: Array> Write for Substream<T, Buf> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let mut lock = match self.state.poll_lock() {
|
||||
Async::Ready(lock) => lock,
|
||||
@ -156,19 +158,19 @@ impl<T: AsyncWrite> Write for Substream<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> AsyncWrite for Substream<T> {
|
||||
impl<T: AsyncWrite, Buf: Array> AsyncWrite for Substream<T, Buf> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InboundFuture<T> {
|
||||
pub struct InboundFuture<T, Buf: Array> {
|
||||
end: Endpoint,
|
||||
state: Arc<Mutex<MultiplexShared<T>>>,
|
||||
state: Arc<Mutex<MultiplexShared<T, Buf>>>,
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> Future for InboundFuture<T> {
|
||||
type Item = Substream<T>;
|
||||
impl<T: AsyncRead, Buf: Array<Item = u8>> Future for InboundFuture<T, Buf> {
|
||||
type Item = Substream<T, Buf>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
@ -205,14 +207,14 @@ impl<T: AsyncRead> Future for InboundFuture<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OutboundFuture<T> {
|
||||
pub struct OutboundFuture<T, Buf: Array> {
|
||||
meta: Arc<MultiplexMetadata>,
|
||||
current_id: Option<(io::Cursor<ByteBuf>, u32)>,
|
||||
state: Arc<Mutex<MultiplexShared<T>>>,
|
||||
state: Arc<Mutex<MultiplexShared<T, Buf>>>,
|
||||
}
|
||||
|
||||
impl<T> OutboundFuture<T> {
|
||||
fn new(muxer: Multiplex<T>) -> Self {
|
||||
impl<T, Buf: Array> OutboundFuture<T, Buf> {
|
||||
fn new(muxer: BufferedMultiplex<T, Buf>) -> Self {
|
||||
OutboundFuture {
|
||||
current_id: None,
|
||||
meta: muxer.meta,
|
||||
@ -225,8 +227,8 @@ fn nonce_to_id(id: usize, end: Endpoint) -> u32 {
|
||||
id as u32 * 2 + if end == Endpoint::Dialer { 0 } else { 1 }
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> Future for OutboundFuture<T> {
|
||||
type Item = Substream<T>;
|
||||
impl<T: AsyncWrite, Buf: Array> Future for OutboundFuture<T, Buf> {
|
||||
type Item = Substream<T, Buf>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
@ -284,23 +286,25 @@ pub struct MultiplexMetadata {
|
||||
end: Endpoint,
|
||||
}
|
||||
|
||||
pub struct Multiplex<T> {
|
||||
pub type Multiplex<T> = BufferedMultiplex<T, [u8; 0]>;
|
||||
|
||||
pub struct BufferedMultiplex<T, Buf: Array> {
|
||||
meta: Arc<MultiplexMetadata>,
|
||||
state: Arc<Mutex<MultiplexShared<T>>>,
|
||||
state: Arc<Mutex<MultiplexShared<T, Buf>>>,
|
||||
}
|
||||
|
||||
impl<T> Clone for Multiplex<T> {
|
||||
impl<T, Buf: Array> Clone for BufferedMultiplex<T, Buf> {
|
||||
fn clone(&self) -> Self {
|
||||
Multiplex {
|
||||
BufferedMultiplex {
|
||||
meta: self.meta.clone(),
|
||||
state: self.state.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Multiplex<T> {
|
||||
impl<T, Buf: Array> BufferedMultiplex<T, Buf> {
|
||||
pub fn new(stream: T, end: Endpoint) -> Self {
|
||||
Multiplex {
|
||||
BufferedMultiplex {
|
||||
meta: Arc::new(MultiplexMetadata {
|
||||
nonce: AtomicUsize::new(0),
|
||||
end,
|
||||
@ -318,10 +322,10 @@ impl<T> Multiplex<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> StreamMuxer for Multiplex<T> {
|
||||
type Substream = Substream<T>;
|
||||
type OutboundSubstream = OutboundFuture<T>;
|
||||
type InboundSubstream = InboundFuture<T>;
|
||||
impl<T: AsyncRead + AsyncWrite, Buf: Array<Item = u8>> StreamMuxer for BufferedMultiplex<T, Buf> {
|
||||
type Substream = Substream<T, Buf>;
|
||||
type OutboundSubstream = OutboundFuture<T, Buf>;
|
||||
type InboundSubstream = InboundFuture<T, Buf>;
|
||||
|
||||
fn inbound(self) -> Self::InboundSubstream {
|
||||
InboundFuture {
|
||||
@ -335,21 +339,35 @@ impl<T: AsyncRead + AsyncWrite> StreamMuxer for Multiplex<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct MultiplexConfig;
|
||||
pub type MultiplexConfig = BufferedMultiplexConfig<[u8; 0]>;
|
||||
|
||||
impl<C> ConnectionUpgrade<C> for MultiplexConfig
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct BufferedMultiplexConfig<Buf: Array>(std::marker::PhantomData<Buf>);
|
||||
|
||||
impl<Buf: Array> Default for BufferedMultiplexConfig<Buf> {
|
||||
fn default() -> Self {
|
||||
BufferedMultiplexConfig(std::marker::PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Buf: Array> BufferedMultiplexConfig<Buf> {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, Buf: Array> ConnectionUpgrade<C> for BufferedMultiplexConfig<Buf>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = Multiplex<C>;
|
||||
type Future = FutureResult<Multiplex<C>, io::Error>;
|
||||
type Output = BufferedMultiplex<C, Buf>;
|
||||
type Future = FutureResult<BufferedMultiplex<C, Buf>, io::Error>;
|
||||
type UpgradeIdentifier = ();
|
||||
type NamesIter = iter::Once<(Bytes, ())>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade(self, i: C, _: (), end: Endpoint, _: &Multiaddr) -> Self::Future {
|
||||
future::ok(Multiplex::new(i, end))
|
||||
future::ok(BufferedMultiplex::new(i, end))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@ -361,6 +379,7 @@ where
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use header::PacketType;
|
||||
use std::io;
|
||||
use tokio_io::io as tokio;
|
||||
|
||||
@ -523,7 +542,10 @@ mod tests {
|
||||
.chain(
|
||||
varint::encode(
|
||||
// ID for an unopened stream: 1
|
||||
MultiplexHeader::close(0, Endpoint::Dialer).to_u64(),
|
||||
MultiplexHeader {
|
||||
packet_type: PacketType::Close(Endpoint::Dialer),
|
||||
substream_id: 0,
|
||||
}.to_u64(),
|
||||
).into_iter(),
|
||||
)
|
||||
.chain(varint::encode(dummy_length))
|
||||
@ -614,4 +636,70 @@ mod tests {
|
||||
assert_eq!(&out[1..0x14 - 1], b"/multistream/1.0.0");
|
||||
assert_eq!(out[0x14 - 1], 0x0a);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_buffer() {
|
||||
type Buffer = [u8; 1024];
|
||||
|
||||
let stream = io::Cursor::new(Vec::new());
|
||||
|
||||
let mplex = BufferedMultiplex::<_, Buffer>::dial(stream);
|
||||
|
||||
let mut outbound: Vec<Substream<_, Buffer>> = vec![];
|
||||
|
||||
for _ in 0..5 {
|
||||
outbound.push(mplex.clone().outbound().wait().unwrap());
|
||||
}
|
||||
|
||||
outbound.sort_by_key(|a| a.id());
|
||||
|
||||
for (i, substream) in outbound.iter_mut().enumerate() {
|
||||
assert!(
|
||||
tokio::write_all(substream, i.to_string().as_bytes())
|
||||
.wait()
|
||||
.is_ok()
|
||||
);
|
||||
}
|
||||
|
||||
let stream = io::Cursor::new(mplex.state.lock().wait().unwrap().stream.get_ref().clone());
|
||||
|
||||
let mplex = BufferedMultiplex::<_, Buffer>::listen(stream);
|
||||
|
||||
let mut inbound: Vec<Substream<_, Buffer>> = vec![];
|
||||
|
||||
for _ in 0..5 {
|
||||
let inb: Substream<_, Buffer> = mplex.clone().inbound().wait().unwrap();
|
||||
inbound.push(inb);
|
||||
}
|
||||
|
||||
inbound.sort_by_key(|a| a.id());
|
||||
|
||||
// Skip the first substream and let it be cached.
|
||||
for (mut substream, outbound) in inbound.iter_mut().zip(outbound.iter()).skip(1) {
|
||||
let id = outbound.id();
|
||||
assert_eq!(id, substream.id());
|
||||
assert_eq!(
|
||||
substream
|
||||
.name()
|
||||
.and_then(|bytes| String::from_utf8(bytes.to_vec()).ok()),
|
||||
Some(id.to_string())
|
||||
);
|
||||
|
||||
let mut buf = [0; 3];
|
||||
assert_eq!(tokio::read(&mut substream, &mut buf).wait().unwrap().2, 1);
|
||||
}
|
||||
|
||||
let (mut substream, outbound) = (&mut inbound[0], &outbound[0]);
|
||||
let id = outbound.id();
|
||||
assert_eq!(id, substream.id());
|
||||
assert_eq!(
|
||||
substream
|
||||
.name()
|
||||
.and_then(|bytes| String::from_utf8(bytes.to_vec()).ok()),
|
||||
Some(id.to_string())
|
||||
);
|
||||
|
||||
let mut buf = [0; 3];
|
||||
assert_eq!(tokio::read(&mut substream, &mut buf).wait().unwrap().2, 1);
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ use header::{MultiplexHeader, PacketType};
|
||||
use std::io;
|
||||
use tokio_io::AsyncRead;
|
||||
use shared::SubstreamMetadata;
|
||||
use circular_buffer::Array;
|
||||
|
||||
pub enum NextMultiplexState {
|
||||
NewStream(u32),
|
||||
@ -76,14 +77,97 @@ fn create_buffer(capacity: usize) -> bytes::BytesMut {
|
||||
buffer
|
||||
}
|
||||
|
||||
pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
|
||||
lock: &mut ::shared::MultiplexShared<T>,
|
||||
fn block_on_wrong_stream<T: AsyncRead, Buf: Array<Item = u8>>(
|
||||
substream_id: u32,
|
||||
remaining_bytes: usize,
|
||||
lock: &mut ::shared::MultiplexShared<T, Buf>,
|
||||
) -> io::Result<()> {
|
||||
use std::{mem, slice};
|
||||
|
||||
lock.read_state = Some(MultiplexReadState::ParsingMessageBody {
|
||||
substream_id,
|
||||
remaining_bytes,
|
||||
});
|
||||
|
||||
if let Some((tasks, cache)) = lock.open_streams
|
||||
.entry(substream_id)
|
||||
.or_insert_with(|| SubstreamMetadata::new_open())
|
||||
.open_meta_mut()
|
||||
.map(|cur| {
|
||||
(
|
||||
mem::replace(&mut cur.read, Default::default()),
|
||||
&mut cur.read_cache,
|
||||
)
|
||||
}) {
|
||||
// We check `cache.capacity()` since that can totally statically remove this branch in the
|
||||
// `== 0` path.
|
||||
if cache.capacity() > 0 && cache.len() < cache.capacity() {
|
||||
let mut buf: Buf = unsafe { mem::uninitialized() };
|
||||
|
||||
// Can't fail because `cache.len() >= 0`,
|
||||
// `cache.len() <= cache.capacity()` and
|
||||
// `cache.capacity() == mem::size_of::<Buf>()`
|
||||
let buf_prefix = unsafe {
|
||||
let max_that_fits_in_buffer = cache.capacity() - cache.len();
|
||||
// We know this won't panic because of the earlier
|
||||
// `number_read >= buf.len()` check
|
||||
let new_len = max_that_fits_in_buffer.min(remaining_bytes);
|
||||
|
||||
slice::from_raw_parts_mut(buf.ptr_mut(), new_len)
|
||||
};
|
||||
|
||||
match lock.stream.read(buf_prefix) {
|
||||
Ok(consumed) => {
|
||||
let new_remaining = remaining_bytes - consumed;
|
||||
|
||||
assert!(cache.extend_from_slice(&buf_prefix[..consumed]));
|
||||
|
||||
lock.read_state = Some(MultiplexReadState::ParsingMessageBody {
|
||||
substream_id,
|
||||
remaining_bytes: new_remaining,
|
||||
});
|
||||
}
|
||||
Err(err) => {
|
||||
if err.kind() != io::ErrorKind::WouldBlock {
|
||||
for task in tasks {
|
||||
task.notify();
|
||||
}
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for task in tasks {
|
||||
task.notify();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn read_stream<
|
||||
'a,
|
||||
Buf: Array<Item = u8>,
|
||||
O: Into<Option<(u32, &'a mut [u8])>>,
|
||||
T: AsyncRead,
|
||||
>(
|
||||
lock: &mut ::shared::MultiplexShared<T, Buf>,
|
||||
stream_data: O,
|
||||
) -> io::Result<usize> {
|
||||
use self::MultiplexReadState::*;
|
||||
use std::mem;
|
||||
read_stream_internal(lock, stream_data.into())
|
||||
}
|
||||
|
||||
let mut stream_data = stream_data.into();
|
||||
fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
|
||||
lock: &mut ::shared::MultiplexShared<T, Buf>,
|
||||
mut stream_data: Option<(u32, &mut [u8])>,
|
||||
) -> io::Result<usize> {
|
||||
use self::MultiplexReadState::*;
|
||||
|
||||
// This is only true if a stream exists and it has been closed in a "graceful" manner, so we
|
||||
// can return `Ok(0)` like the `Read` trait requests. In any other case we want to return
|
||||
// `WouldBlock`
|
||||
let stream_has_been_gracefully_closed = stream_data
|
||||
.as_ref()
|
||||
.and_then(|&(id, _)| lock.open_streams.get(&id))
|
||||
@ -96,13 +180,34 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
|
||||
Err(io::ErrorKind::WouldBlock.into())
|
||||
};
|
||||
|
||||
if let Some((ref id, ..)) = stream_data {
|
||||
if let Some((ref mut id, ref mut buf)) = stream_data {
|
||||
if let Some(cur) = lock.open_streams
|
||||
.entry(*id)
|
||||
.or_insert_with(|| SubstreamMetadata::new_open())
|
||||
.read_tasks_mut()
|
||||
.open_meta_mut()
|
||||
{
|
||||
cur.push(task::current());
|
||||
cur.read.push(task::current());
|
||||
|
||||
let cache = &mut cur.read_cache;
|
||||
|
||||
if !cache.is_empty() {
|
||||
let mut consumed = 0;
|
||||
loop {
|
||||
let cur_buf = &mut buf[consumed..];
|
||||
if cur_buf.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(out) = cache.pop_first_n_leaky(cur_buf.len()) {
|
||||
cur_buf[..out.len()].copy_from_slice(out);
|
||||
consumed += out.len();
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
on_block = Ok(consumed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,11 +389,11 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
|
||||
if let Some((ref mut id, ref mut buf)) = stream_data {
|
||||
use MultiplexReadState::*;
|
||||
|
||||
let number_read = *on_block.as_ref().unwrap_or(&0);
|
||||
|
||||
if remaining_bytes == 0 {
|
||||
lock.read_state = None;
|
||||
} else if substream_id == *id {
|
||||
let number_read = *on_block.as_ref().unwrap_or(&0);
|
||||
|
||||
if number_read >= buf.len() {
|
||||
lock.read_state = Some(ParsingMessageBody {
|
||||
substream_id,
|
||||
@ -307,6 +412,11 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
|
||||
lock.stream.read(slice)
|
||||
};
|
||||
|
||||
lock.read_state = Some(ParsingMessageBody {
|
||||
substream_id,
|
||||
remaining_bytes,
|
||||
});
|
||||
|
||||
match read_result {
|
||||
Ok(consumed) => {
|
||||
let new_remaining = remaining_bytes - consumed;
|
||||
@ -319,58 +429,22 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
|
||||
on_block = Ok(number_read + consumed);
|
||||
}
|
||||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
|
||||
lock.read_state = Some(ParsingMessageBody {
|
||||
substream_id,
|
||||
remaining_bytes,
|
||||
});
|
||||
|
||||
return on_block;
|
||||
}
|
||||
Err(other) => {
|
||||
lock.read_state = Some(ParsingMessageBody {
|
||||
substream_id,
|
||||
remaining_bytes,
|
||||
});
|
||||
|
||||
return Err(other);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
lock.read_state = Some(ParsingMessageBody {
|
||||
substream_id,
|
||||
remaining_bytes,
|
||||
});
|
||||
|
||||
if let Some(tasks) = lock.open_streams
|
||||
.get_mut(&substream_id)
|
||||
.and_then(SubstreamMetadata::read_tasks_mut)
|
||||
.map(|cur| mem::replace(cur, Default::default()))
|
||||
{
|
||||
for task in tasks {
|
||||
task.notify();
|
||||
}
|
||||
}
|
||||
|
||||
// We cannot make progress here, another stream has to accept this packet
|
||||
block_on_wrong_stream(substream_id, remaining_bytes, lock)?;
|
||||
|
||||
return on_block;
|
||||
}
|
||||
} else {
|
||||
lock.read_state = Some(ParsingMessageBody {
|
||||
substream_id,
|
||||
remaining_bytes,
|
||||
});
|
||||
// We cannot make progress here, another stream has to accept this packet
|
||||
block_on_wrong_stream(substream_id, remaining_bytes, lock)?;
|
||||
|
||||
if let Some(tasks) = lock.open_streams
|
||||
.get_mut(&substream_id)
|
||||
.and_then(SubstreamMetadata::read_tasks_mut)
|
||||
.map(|cur| mem::replace(cur, Default::default()))
|
||||
{
|
||||
for task in tasks {
|
||||
task.notify();
|
||||
}
|
||||
}
|
||||
|
||||
// We cannot make progress here, a stream has to accept this packet
|
||||
return on_block;
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
use read::MultiplexReadState;
|
||||
use write::MultiplexWriteState;
|
||||
|
||||
use circular_buffer::{Array, CircularBuffer};
|
||||
use std::collections::HashMap;
|
||||
use bytes::Bytes;
|
||||
use arrayvec::ArrayVec;
|
||||
@ -30,17 +31,24 @@ const BUF_SIZE: usize = 1024;
|
||||
|
||||
pub type ByteBuf = ArrayVec<[u8; BUF_SIZE]>;
|
||||
|
||||
pub enum SubstreamMetadata {
|
||||
pub enum SubstreamMetadata<Buf: Array> {
|
||||
Closed,
|
||||
Open { read: Vec<Task>, write: Vec<Task> },
|
||||
Open(OpenSubstreamMetadata<Buf>),
|
||||
}
|
||||
|
||||
impl SubstreamMetadata {
|
||||
pub struct OpenSubstreamMetadata<Buf: Array> {
|
||||
pub read_cache: CircularBuffer<Buf>,
|
||||
pub read: Vec<Task>,
|
||||
pub write: Vec<Task>,
|
||||
}
|
||||
|
||||
impl<Buf: Array> SubstreamMetadata<Buf> {
|
||||
pub fn new_open() -> Self {
|
||||
SubstreamMetadata::Open {
|
||||
SubstreamMetadata::Open(OpenSubstreamMetadata {
|
||||
read_cache: Default::default(),
|
||||
read: Default::default(),
|
||||
write: Default::default(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open(&self) -> bool {
|
||||
@ -50,24 +58,17 @@ impl SubstreamMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_tasks_mut(&mut self) -> Option<&mut Vec<Task>> {
|
||||
pub fn open_meta_mut(&mut self) -> Option<&mut OpenSubstreamMetadata<Buf>> {
|
||||
match *self {
|
||||
SubstreamMetadata::Closed => None,
|
||||
SubstreamMetadata::Open { ref mut read, .. } => Some(read),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_tasks_mut(&mut self) -> Option<&mut Vec<Task>> {
|
||||
match *self {
|
||||
SubstreamMetadata::Closed => None,
|
||||
SubstreamMetadata::Open { ref mut write, .. } => Some(write),
|
||||
SubstreamMetadata::Open(ref mut meta) => Some(meta),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Split reading and writing into different structs and have information shared between the
|
||||
// two in a `RwLock`, since `open_streams` and `to_open` are mostly read-only.
|
||||
pub struct MultiplexShared<T> {
|
||||
pub struct MultiplexShared<T, Buf: Array> {
|
||||
// We use `Option` in order to take ownership of heap allocations within `DecoderState` and
|
||||
// `BytesMut`. If this is ever observably `None` then something has panicked or the underlying
|
||||
// stream returned an error.
|
||||
@ -75,7 +76,7 @@ pub struct MultiplexShared<T> {
|
||||
pub write_state: Option<MultiplexWriteState>,
|
||||
pub stream: T,
|
||||
// true if the stream is open, false otherwise
|
||||
pub open_streams: HashMap<u32, SubstreamMetadata>,
|
||||
pub open_streams: HashMap<u32, SubstreamMetadata<Buf>>,
|
||||
pub meta_write_tasks: Vec<Task>,
|
||||
// TODO: Should we use a version of this with a fixed size that doesn't allocate and return
|
||||
// `WouldBlock` if it's full? Even if we ignore or size-cap names you can still open 2^32
|
||||
@ -83,7 +84,7 @@ pub struct MultiplexShared<T> {
|
||||
pub to_open: HashMap<u32, Option<Bytes>>,
|
||||
}
|
||||
|
||||
impl<T> MultiplexShared<T> {
|
||||
impl<T, Buf: Array> MultiplexShared<T, Buf> {
|
||||
pub fn new(stream: T) -> Self {
|
||||
MultiplexShared {
|
||||
read_state: Default::default(),
|
||||
@ -98,10 +99,7 @@ impl<T> MultiplexShared<T> {
|
||||
pub fn open_stream(&mut self, id: u32) -> bool {
|
||||
self.open_streams
|
||||
.entry(id)
|
||||
.or_insert(SubstreamMetadata::Open {
|
||||
read: Default::default(),
|
||||
write: Default::default(),
|
||||
})
|
||||
.or_insert(SubstreamMetadata::new_open())
|
||||
.open()
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
use shared::{ByteBuf, MultiplexShared, SubstreamMetadata};
|
||||
use header::MultiplexHeader;
|
||||
|
||||
use circular_buffer;
|
||||
use varint;
|
||||
use futures::task;
|
||||
use std::io;
|
||||
@ -69,8 +70,8 @@ pub enum MultiplexWriteStateInner {
|
||||
Body { size: usize },
|
||||
}
|
||||
|
||||
pub fn write_stream<T: AsyncWrite>(
|
||||
lock: &mut MultiplexShared<T>,
|
||||
pub fn write_stream<Buf: circular_buffer::Array, T: AsyncWrite>(
|
||||
lock: &mut MultiplexShared<T, Buf>,
|
||||
write_request: WriteRequest,
|
||||
buf: &mut io::Cursor<ByteBuf>,
|
||||
) -> io::Result<usize> {
|
||||
@ -103,15 +104,15 @@ pub fn write_stream<T: AsyncWrite>(
|
||||
if let Some(cur) = lock.open_streams
|
||||
.entry(id)
|
||||
.or_insert_with(|| SubstreamMetadata::new_open())
|
||||
.write_tasks_mut()
|
||||
.open_meta_mut()
|
||||
{
|
||||
cur.push(task::current());
|
||||
cur.write.push(task::current());
|
||||
}
|
||||
|
||||
if let Some(tasks) = lock.open_streams
|
||||
.get_mut(&request.header.substream_id)
|
||||
.and_then(SubstreamMetadata::write_tasks_mut)
|
||||
.map(|cur| mem::replace(cur, Default::default()))
|
||||
.and_then(SubstreamMetadata::open_meta_mut)
|
||||
.map(|cur| mem::replace(&mut cur.write, Default::default()))
|
||||
{
|
||||
for task in tasks {
|
||||
task.notify();
|
||||
@ -129,8 +130,8 @@ pub fn write_stream<T: AsyncWrite>(
|
||||
|
||||
if let Some(tasks) = lock.open_streams
|
||||
.get_mut(&request.header.substream_id)
|
||||
.and_then(SubstreamMetadata::write_tasks_mut)
|
||||
.map(|cur| mem::replace(cur, Default::default()))
|
||||
.and_then(SubstreamMetadata::open_meta_mut)
|
||||
.map(|cur| mem::replace(&mut cur.write, Default::default()))
|
||||
{
|
||||
for task in tasks {
|
||||
task.notify();
|
||||
@ -147,9 +148,9 @@ pub fn write_stream<T: AsyncWrite>(
|
||||
if let Some(cur) = lock.open_streams
|
||||
.entry(id)
|
||||
.or_insert_with(|| SubstreamMetadata::new_open())
|
||||
.write_tasks_mut()
|
||||
.open_meta_mut()
|
||||
{
|
||||
cur.push(task::current());
|
||||
cur.write.push(task::current());
|
||||
}
|
||||
|
||||
for task in mem::replace(&mut lock.meta_write_tasks, Default::default()) {
|
||||
|
@ -43,7 +43,8 @@ fn client_to_server_outbound() {
|
||||
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig);
|
||||
let transport =
|
||||
TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new());
|
||||
|
||||
let (listener, addr) = transport
|
||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
||||
@ -72,7 +73,7 @@ fn client_to_server_outbound() {
|
||||
});
|
||||
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig);
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new());
|
||||
|
||||
let future = transport
|
||||
.dial(rx.recv().unwrap())
|
||||
@ -94,7 +95,8 @@ fn client_to_server_inbound() {
|
||||
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig);
|
||||
let transport =
|
||||
TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new());
|
||||
|
||||
let (listener, addr) = transport
|
||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
||||
@ -123,7 +125,7 @@ fn client_to_server_inbound() {
|
||||
});
|
||||
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig);
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new());
|
||||
|
||||
let future = transport
|
||||
.dial(rx.recv().unwrap())
|
||||
|
@ -79,7 +79,7 @@ fn client_to_server_outbound() {
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = TcpConfig::new(core.handle())
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
.into_connection_reuse();
|
||||
|
||||
let (listener, addr) = transport
|
||||
@ -109,7 +109,7 @@ fn client_to_server_outbound() {
|
||||
});
|
||||
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig);
|
||||
let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new());
|
||||
|
||||
let future = transport
|
||||
.dial(rx.recv().unwrap())
|
||||
@ -133,7 +133,7 @@ fn connection_reused_for_dialing() {
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
.into_connection_reuse();
|
||||
|
||||
let (listener, addr) = transport
|
||||
@ -175,7 +175,7 @@ fn connection_reused_for_dialing() {
|
||||
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
.into_connection_reuse();
|
||||
|
||||
let listen_addr = rx.recv().unwrap();
|
||||
@ -211,8 +211,8 @@ fn use_opened_listen_to_dial() {
|
||||
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport =
|
||||
OnlyOnce::from(TcpConfig::new(core.handle())).with_upgrade(multiplex::MultiplexConfig);
|
||||
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
|
||||
.with_upgrade(multiplex::MultiplexConfig::new());
|
||||
|
||||
let (listener, addr) = transport
|
||||
.clone()
|
||||
@ -254,7 +254,7 @@ fn use_opened_listen_to_dial() {
|
||||
|
||||
let mut core = Core::new().unwrap();
|
||||
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
|
||||
.with_upgrade(multiplex::MultiplexConfig)
|
||||
.with_upgrade(multiplex::MultiplexConfig::new())
|
||||
.into_connection_reuse();
|
||||
|
||||
let listen_addr = rx.recv().unwrap();
|
||||
|
Loading…
x
Reference in New Issue
Block a user