Rewrite multiplex (#261)

* Rewrite multiplex

* Increase the packet size limit to 32 MB

* Fix waiting for poll_complete to finish

* Typo

* Properly close substreams

* Add a limit to the number of substreams

* Add a limit to the length of the internal buffer

* Fix concerns
This commit is contained in:
Pierre Krieger
2018-07-17 16:31:32 +02:00
committed by GitHub
parent 053197bd1c
commit 294310cd07
14 changed files with 544 additions and 1731 deletions

View File

@ -78,7 +78,7 @@ fn client_to_server_outbound() {
let bg_thread = thread::spawn(move || { let bg_thread = thread::spawn(move || {
let transport = TcpConfig::new() let transport = TcpConfig::new()
.with_upgrade(multiplex::MultiplexConfig::new()) .with_upgrade(multiplex::MplexConfig::new())
.into_connection_reuse(); .into_connection_reuse();
let (listener, addr) = transport let (listener, addr) = transport
@ -107,7 +107,7 @@ fn client_to_server_outbound() {
tokio_current_thread::block_on_all(future).unwrap(); tokio_current_thread::block_on_all(future).unwrap();
}); });
let transport = TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let transport = TcpConfig::new().with_upgrade(multiplex::MplexConfig::new());
let future = transport let future = transport
.dial(rx.recv().unwrap()) .dial(rx.recv().unwrap())
@ -130,7 +130,7 @@ fn connection_reused_for_dialing() {
let bg_thread = thread::spawn(move || { let bg_thread = thread::spawn(move || {
let transport = OnlyOnce::from(TcpConfig::new()) let transport = OnlyOnce::from(TcpConfig::new())
.with_upgrade(multiplex::MultiplexConfig::new()) .with_upgrade(multiplex::MplexConfig::new())
.into_connection_reuse(); .into_connection_reuse();
let (listener, addr) = transport let (listener, addr) = transport
@ -171,7 +171,7 @@ fn connection_reused_for_dialing() {
}); });
let transport = OnlyOnce::from(TcpConfig::new()) let transport = OnlyOnce::from(TcpConfig::new())
.with_upgrade(multiplex::MultiplexConfig::new()) .with_upgrade(multiplex::MplexConfig::new())
.into_connection_reuse(); .into_connection_reuse();
let listen_addr = rx.recv().unwrap(); let listen_addr = rx.recv().unwrap();
@ -207,7 +207,7 @@ fn use_opened_listen_to_dial() {
let bg_thread = thread::spawn(move || { let bg_thread = thread::spawn(move || {
let transport = OnlyOnce::from(TcpConfig::new()) let transport = OnlyOnce::from(TcpConfig::new())
.with_upgrade(multiplex::MultiplexConfig::new()); .with_upgrade(multiplex::MplexConfig::new());
let (listener, addr) = transport let (listener, addr) = transport
.clone() .clone()
@ -248,7 +248,7 @@ fn use_opened_listen_to_dial() {
}); });
let transport = OnlyOnce::from(TcpConfig::new()) let transport = OnlyOnce::from(TcpConfig::new())
.with_upgrade(multiplex::MultiplexConfig::new()) .with_upgrade(multiplex::MplexConfig::new())
.into_connection_reuse(); .into_connection_reuse();
let listen_addr = rx.recv().unwrap(); let listen_addr = rx.recv().unwrap();

View File

@ -70,8 +70,8 @@ fn main() {
}) })
// On top of plaintext or secio, we will use the multiplex protocol. // On top of plaintext or secio, we will use the multiplex protocol.
.with_upgrade(libp2p::mplex::MultiplexConfig::new()) .with_upgrade(libp2p::mplex::MplexConfig::new())
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a
// `Transport` because the output of the upgrade is not a stream but a controller for // `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`. // a `Transport`.

View File

@ -70,8 +70,8 @@ fn main() {
}) })
// On top of plaintext or secio, we will use the multiplex protocol. // On top of plaintext or secio, we will use the multiplex protocol.
.with_upgrade(libp2p::mplex::MultiplexConfig::new()) .with_upgrade(libp2p::mplex::MplexConfig::new())
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a
// `Transport` because the output of the upgrade is not a stream but a controller for // `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`. // a `Transport`.

View File

@ -71,8 +71,8 @@ fn main() {
}) })
// On top of plaintext or secio, we will use the multiplex protocol. // On top of plaintext or secio, we will use the multiplex protocol.
.with_upgrade(libp2p::mplex::MultiplexConfig::new()) .with_upgrade(libp2p::mplex::MplexConfig::new())
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a
// `Transport` because the output of the upgrade is not a stream but a controller for // `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`. // a `Transport`.

View File

@ -78,8 +78,8 @@ fn main() {
}) })
// On top of plaintext or secio, we will use the multiplex protocol. // On top of plaintext or secio, we will use the multiplex protocol.
.with_upgrade(libp2p::mplex::MultiplexConfig::new()) .with_upgrade(libp2p::mplex::MplexConfig::new())
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a
// `Transport` because the output of the upgrade is not a stream but a controller for // `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`. // a `Transport`.

View File

@ -63,8 +63,8 @@ fn main() {
}) })
// On top of plaintext or secio, we will use the multiplex protocol. // On top of plaintext or secio, we will use the multiplex protocol.
.with_upgrade(libp2p::mplex::MultiplexConfig::new()) .with_upgrade(libp2p::mplex::MplexConfig::new())
// The object returned by the call to `with_upgrade(MultiplexConfig::new())` can't be used as a // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a
// `Transport` because the output of the upgrade is not a stream but a controller for // `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`. // a `Transport`.

View File

@ -8,6 +8,7 @@ arrayvec = "0.4.6"
bytes = "0.4.5" bytes = "0.4.5"
circular-buffer = { path = "../circular-buffer" } circular-buffer = { path = "../circular-buffer" }
error-chain = "0.11.0" error-chain = "0.11.0"
fnv = "1.0"
futures = "0.1" futures = "0.1"
futures-mutex = { git = "https://github.com/paritytech/futures-mutex" } futures-mutex = { git = "https://github.com/paritytech/futures-mutex" }
libp2p-core = { path = "../core" } libp2p-core = { path = "../core" }

187
mplex/src/codec.rs Normal file
View File

@ -0,0 +1,187 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::cmp;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use bytes::{BufMut, BytesMut};
use core::Endpoint;
use tokio_io::codec::{Decoder, Encoder};
use varint;
// Arbitrary maximum size for a packet.
// Since data is entirely buffered before being dispatched, we need a limit or remotes could just
// send a 4 TB-long packet full of zeroes that we kill our process with an OOM error.
const MAX_FRAME_SIZE: usize = 32 * 1024 * 1024;
#[derive(Debug, Clone)]
pub enum Elem {
Open { substream_id: u32 },
Data { substream_id: u32, endpoint: Endpoint, data: BytesMut },
Close { substream_id: u32, endpoint: Endpoint },
Reset { substream_id: u32, endpoint: Endpoint },
}
impl Elem {
/// Returns the ID of the substream of the message.
pub fn substream_id(&self) -> u32 {
match *self {
Elem::Open { substream_id } => substream_id,
Elem::Data { substream_id, .. } => substream_id,
Elem::Close { substream_id, .. } => substream_id,
Elem::Reset { substream_id, .. } => substream_id,
}
}
}
pub struct Codec {
varint_decoder: varint::VarintDecoder<u32>,
decoder_state: CodecDecodeState,
}
#[derive(Debug, Clone)]
enum CodecDecodeState {
Begin,
HasHeader(u32),
HasHeaderAndLen(u32, usize, BytesMut),
Poisoned,
}
impl Codec {
pub fn new() -> Codec {
Codec {
varint_decoder: varint::VarintDecoder::new(),
decoder_state: CodecDecodeState::Begin,
}
}
}
impl Decoder for Codec {
type Item = Elem;
type Error = IoError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
match mem::replace(&mut self.decoder_state, CodecDecodeState::Poisoned) {
CodecDecodeState::Begin => {
match self.varint_decoder.decode(src)? {
Some(header) => {
self.decoder_state = CodecDecodeState::HasHeader(header);
},
None => {
self.decoder_state = CodecDecodeState::Begin;
return Ok(None);
},
}
},
CodecDecodeState::HasHeader(header) => {
match self.varint_decoder.decode(src)? {
Some(len) => {
if len as usize > MAX_FRAME_SIZE {
return Err(IoErrorKind::InvalidData.into());
}
self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize, BytesMut::with_capacity(len as usize));
},
None => {
self.decoder_state = CodecDecodeState::HasHeader(header);
return Ok(None);
},
}
},
CodecDecodeState::HasHeaderAndLen(header, len, mut buf) => {
debug_assert!(len == 0 || buf.len() < len);
let to_transfer = cmp::min(src.len(), len - buf.len());
buf.put(src.split_to(to_transfer)); // TODO: more optimal?
if buf.len() < len {
self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len, buf);
return Ok(None);
}
self.decoder_state = CodecDecodeState::Begin;
let substream_id = (header >> 3) as u32;
let out = match header & 7 {
0 => Elem::Open { substream_id },
1 => Elem::Data { substream_id, endpoint: Endpoint::Listener, data: buf },
2 => Elem::Data { substream_id, endpoint: Endpoint::Dialer, data: buf },
3 => Elem::Close { substream_id, endpoint: Endpoint::Listener },
4 => Elem::Close { substream_id, endpoint: Endpoint::Dialer },
5 => Elem::Reset { substream_id, endpoint: Endpoint::Listener },
6 => Elem::Reset { substream_id, endpoint: Endpoint::Dialer },
_ => return Err(IoErrorKind::InvalidData.into()),
};
return Ok(Some(out));
},
CodecDecodeState::Poisoned => {
return Err(IoErrorKind::InvalidData.into());
}
}
}
}
}
impl Encoder for Codec {
type Item = Elem;
type Error = IoError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Elem::Open { substream_id } => {
((substream_id as u64) << 3, BytesMut::new())
},
Elem::Data { substream_id, endpoint: Endpoint::Listener, data } => {
((substream_id as u64) << 3 | 1, data)
},
Elem::Data { substream_id, endpoint: Endpoint::Dialer, data } => {
((substream_id as u64) << 3 | 2, data)
},
Elem::Close { substream_id, endpoint: Endpoint::Listener } => {
((substream_id as u64) << 3 | 3, BytesMut::new())
},
Elem::Close { substream_id, endpoint: Endpoint::Dialer } => {
((substream_id as u64) << 3 | 4, BytesMut::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Listener } => {
((substream_id as u64) << 3 | 5, BytesMut::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Dialer } => {
((substream_id as u64) << 3 | 6, BytesMut::new())
},
};
let header_bytes = varint::encode(header);
let data_len = data.as_ref().len();
let data_len_bytes = varint::encode(data_len);
if data_len > MAX_FRAME_SIZE {
return Err(IoError::new(IoErrorKind::InvalidData, "data size exceed maximum"));
}
dst.reserve(header_bytes.len() + data_len_bytes.len() + data_len);
dst.put(header_bytes);
dst.put(data_len_bytes);
dst.put(data);
Ok(())
}
}

View File

@ -1,127 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use swarm::Endpoint;
const FLAG_BITS: usize = 3;
const FLAG_MASK: usize = (1usize << FLAG_BITS) - 1;
pub mod errors {
error_chain! {
errors {
ParseError
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub struct MultiplexHeader {
pub packet_type: PacketType,
pub substream_id: u32,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum PacketType {
Open,
Close(Endpoint),
Reset(Endpoint),
Message(Endpoint),
}
impl MultiplexHeader {
pub fn open(id: u32) -> Self {
MultiplexHeader {
substream_id: id,
packet_type: PacketType::Open,
}
}
pub fn message(id: u32, end: Endpoint) -> Self {
MultiplexHeader {
substream_id: id,
packet_type: PacketType::Message(end),
}
}
// TODO: Use `u128` or another large integer type instead of bigint since we never use more than
// `pointer width + FLAG_BITS` bits and unconditionally allocating 1-3 `u32`s for that is
// ridiculous (especially since even for small numbers we have to allocate 1 `u32`).
// If this is the future and `BigUint` is better-optimised (maybe by using `Bytes`) then
// forget it.
pub fn parse(header: u64) -> Result<MultiplexHeader, errors::Error> {
use num_traits::cast::ToPrimitive;
let flags = header & FLAG_MASK as u64;
let substream_id = (header >> FLAG_BITS)
.to_u32()
.ok_or(errors::ErrorKind::ParseError)?;
// Yes, this is really how it works. No, I don't know why.
let packet_type = match flags {
0 => PacketType::Open,
1 => PacketType::Message(Endpoint::Listener),
2 => PacketType::Message(Endpoint::Dialer),
3 => PacketType::Close(Endpoint::Listener),
4 => PacketType::Close(Endpoint::Dialer),
5 => PacketType::Reset(Endpoint::Listener),
6 => PacketType::Reset(Endpoint::Dialer),
_ => {
use std::io;
return Err(errors::Error::with_chain(
io::Error::new(
io::ErrorKind::Other,
format!("Unexpected packet type: {}", flags),
),
errors::ErrorKind::ParseError,
));
}
};
Ok(MultiplexHeader {
substream_id,
packet_type,
})
}
pub fn to_u64(&self) -> u64 {
let packet_type_id = match self.packet_type {
PacketType::Open => 0,
PacketType::Message(Endpoint::Listener) => 1,
PacketType::Message(Endpoint::Dialer) => 2,
PacketType::Close(Endpoint::Listener) => 3,
PacketType::Close(Endpoint::Dialer) => 4,
PacketType::Reset(Endpoint::Listener) => 5,
PacketType::Reset(Endpoint::Dialer) => 6,
};
let substream_id = (self.substream_id as u64) << FLAG_BITS;
substream_id | packet_type_id
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,542 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use circular_buffer::Array;
use futures::Async;
use futures::task;
use header::{MultiplexHeader, PacketType};
use shared::SubstreamMetadata;
use std::io;
use tokio_io::AsyncRead;
use {bytes, varint};
pub enum NextMultiplexState {
NewStream(u32),
ParsingMessageBody(u32),
Ignore(u32),
}
impl NextMultiplexState {
pub fn substream_id(&self) -> u32 {
match *self {
NextMultiplexState::NewStream(id)
| NextMultiplexState::ParsingMessageBody(id)
| NextMultiplexState::Ignore(id) => id,
}
}
}
pub enum MultiplexReadState {
Header {
state: varint::DecoderState<u64>,
},
BodyLength {
state: varint::DecoderState<usize>,
next: NextMultiplexState,
},
NewStream {
substream_id: u32,
name: bytes::BytesMut,
remaining_bytes: usize,
},
ParsingMessageBody {
substream_id: u32,
remaining_bytes: usize,
},
Ignore {
substream_id: u32,
remaining_bytes: usize,
},
}
impl Default for MultiplexReadState {
fn default() -> Self {
MultiplexReadState::Header {
state: Default::default(),
}
}
}
fn create_buffer(capacity: usize) -> bytes::BytesMut {
let mut buffer = bytes::BytesMut::with_capacity(capacity);
let zeroes = [0; 1024];
let mut cap = capacity;
while cap > 0 {
let len = cap.min(zeroes.len());
buffer.extend_from_slice(&zeroes[..len]);
cap -= len;
}
buffer
}
fn block_on_wrong_stream<T: AsyncRead, Buf: Array<Item = u8>>(
substream_id: u32,
remaining_bytes: usize,
lock: &mut ::shared::MultiplexShared<T, Buf>,
) -> io::Result<usize> {
use std::{mem, slice};
lock.read_state = Some(MultiplexReadState::ParsingMessageBody {
substream_id,
remaining_bytes,
});
let mut out_consumed = 0;
let mut stream_eof = false;
if let Some((tasks, cache)) = lock.open_streams
.entry(substream_id)
.or_insert_with(|| SubstreamMetadata::new_open())
.open_meta_mut()
.map(|cur| {
(
mem::replace(&mut cur.read, Default::default()),
&mut cur.read_cache,
)
}) {
// We check `cache.capacity()` since that can totally statically remove this branch in the
// `== 0` path.
if cache.capacity() > 0 && cache.len() < cache.capacity() {
let mut buf: Buf = unsafe { mem::uninitialized() };
// Can't fail because `cache.len() >= 0`,
// `cache.len() <= cache.capacity()` and
// `cache.capacity() == mem::size_of::<Buf>()`
let buf_prefix = unsafe {
let max_that_fits_in_buffer = cache.capacity() - cache.len();
// We know this won't panic because of the earlier
// `number_read >= buf.len()` check
let new_len = max_that_fits_in_buffer.min(remaining_bytes);
slice::from_raw_parts_mut(buf.ptr_mut(), new_len)
};
match lock.stream.read(buf_prefix) {
Ok(consumed) => {
if consumed == 0 && !buf_prefix.is_empty() {
stream_eof = true
}
let new_remaining = remaining_bytes - consumed;
assert!(cache.extend_from_slice(&buf_prefix[..consumed]));
out_consumed = consumed;
lock.read_state = Some(MultiplexReadState::ParsingMessageBody {
substream_id,
remaining_bytes: new_remaining,
});
}
Err(err) => {
if err.kind() != io::ErrorKind::WouldBlock {
for task in tasks {
task.notify();
}
return Err(err);
}
}
}
}
for task in tasks {
task.notify();
}
}
if stream_eof {
lock.close()
}
Ok(out_consumed)
}
pub fn read_stream<
'a,
Buf: Array<Item = u8>,
O: Into<Option<(u32, &'a mut [u8])>>,
T: AsyncRead,
>(
lock: &mut ::shared::MultiplexShared<T, Buf>,
stream_data: O,
) -> io::Result<usize> {
read_stream_internal(lock, stream_data.into())
}
fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
lock: &mut ::shared::MultiplexShared<T, Buf>,
mut stream_data: Option<(u32, &mut [u8])>,
) -> io::Result<usize> {
use self::MultiplexReadState::*;
// This is only true if a stream exists and it has been closed in a "graceful" manner, so we
// can return `Ok(0)` like the `Read` trait requests. In any other case we want to return
// `WouldBlock`
let stream_has_been_gracefully_closed = stream_data
.as_ref()
.and_then(|&(id, _)| lock.open_streams.get(&id))
.map(|meta| !meta.open())
.unwrap_or(false);
let mut on_block: io::Result<usize> = if stream_has_been_gracefully_closed {
Ok(0)
} else {
Err(io::ErrorKind::WouldBlock.into())
};
if let Some((ref mut id, ref mut buf)) = stream_data {
if let Some(cur) = lock.open_streams
.entry(*id)
.or_insert_with(|| SubstreamMetadata::new_open())
.open_meta_mut()
{
cur.read.push(task::current());
let cache = &mut cur.read_cache;
if !cache.is_empty() {
let mut consumed = 0;
loop {
let cur_buf = &mut buf[consumed..];
if cur_buf.is_empty() {
break;
}
if let Some(out) = cache.pop_first_n_leaky(cur_buf.len()) {
cur_buf[..out.len()].copy_from_slice(out);
consumed += out.len();
} else {
break;
};
}
on_block = Ok(consumed);
}
}
}
loop {
match lock.read_state.take().unwrap_or_default() {
Header {
state: mut varint_state,
} => {
match varint_state.read(&mut lock.stream) {
Ok(Async::Ready(header)) => {
let header = if let Some(header) = header {
header
} else {
lock.close();
return Ok(on_block.unwrap_or(0));
};
let MultiplexHeader {
substream_id,
packet_type,
} = MultiplexHeader::parse(header).map_err(|err| {
debug!("failed to parse header: {}", err);
io::Error::new(
io::ErrorKind::Other,
format!("Error parsing header: {:?}", err),
)
})?;
match packet_type {
PacketType::Open => {
lock.read_state = Some(BodyLength {
state: Default::default(),
next: NextMultiplexState::NewStream(substream_id),
})
}
PacketType::Message(_) => {
lock.read_state = Some(BodyLength {
state: Default::default(),
next: NextMultiplexState::ParsingMessageBody(substream_id),
})
}
// NOTE: What's the difference between close and reset?
PacketType::Close(_) | PacketType::Reset(_) => {
lock.read_state = Some(BodyLength {
state: Default::default(),
next: NextMultiplexState::Ignore(substream_id),
});
lock.close_stream(substream_id);
}
}
}
Ok(Async::NotReady) => {
lock.read_state = Some(Header {
state: varint_state,
});
return on_block;
}
Err(error) => {
return if let varint::Error(varint::ErrorKind::Io(inner), ..) = error {
debug!("failed to read header: {}", inner);
Err(inner)
} else {
debug!("failed to read header: {}", error);
Err(io::Error::new(io::ErrorKind::Other, error.description()))
};
}
}
}
BodyLength {
state: mut varint_state,
next,
} => {
use self::NextMultiplexState::*;
let body_len = varint_state.read(&mut lock.stream).map_err(|e| {
debug!("substream {}: failed to read body length: {}", next.substream_id(), e);
io::Error::new(io::ErrorKind::Other, "Error reading varint")
})?;
match body_len {
Async::Ready(length) => {
// TODO: Limit `length` to prevent resource-exhaustion DOS
let length = if let Some(length) = length {
length
} else {
lock.close();
return Ok(on_block.unwrap_or(0));
};
lock.read_state = match next {
Ignore(substream_id) => Some(MultiplexReadState::Ignore {
substream_id,
remaining_bytes: length,
}),
NewStream(substream_id) => {
if length == 0 {
lock.to_open.insert(substream_id, None);
None
} else {
Some(MultiplexReadState::NewStream {
// TODO: Uninit buffer
name: create_buffer(length),
remaining_bytes: length,
substream_id,
})
}
}
ParsingMessageBody(substream_id) => {
let is_open = lock.open_streams
.get(&substream_id)
.map(SubstreamMetadata::open)
.unwrap_or_else(|| lock.to_open.contains_key(&substream_id));
if is_open {
Some(MultiplexReadState::ParsingMessageBody {
remaining_bytes: length,
substream_id,
})
} else {
Some(MultiplexReadState::Ignore {
substream_id,
remaining_bytes: length,
})
}
}
};
}
Async::NotReady => {
lock.read_state = Some(BodyLength {
state: varint_state,
next,
});
return on_block;
}
}
}
NewStream {
substream_id,
mut name,
remaining_bytes,
} => {
if remaining_bytes == 0 {
lock.to_open.insert(substream_id, Some(name.freeze()));
lock.read_state = None;
} else {
let cursor_pos = name.len() - remaining_bytes;
let consumed = lock.stream.read(&mut name[cursor_pos..]);
match consumed {
Ok(consumed) => {
if consumed == 0 {
lock.close()
}
let new_remaining = remaining_bytes - consumed;
lock.read_state = Some(NewStream {
substream_id,
name,
remaining_bytes: new_remaining,
});
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
lock.read_state = Some(NewStream {
substream_id,
name,
remaining_bytes,
});
return on_block;
}
Err(other) => {
debug!("substream {}: failed to read new stream: {}",
substream_id,
other);
lock.read_state = Some(NewStream {
substream_id,
name,
remaining_bytes,
});
return Err(other);
}
}
}
}
ParsingMessageBody {
substream_id,
remaining_bytes,
} => {
if let Some((ref mut id, ref mut buf)) = stream_data {
use MultiplexReadState::*;
let number_read = *on_block.as_ref().unwrap_or(&0);
if remaining_bytes == 0 {
lock.read_state = None;
} else if substream_id == *id {
if number_read >= buf.len() {
lock.read_state = Some(ParsingMessageBody {
substream_id,
remaining_bytes,
});
return Ok(number_read);
}
let read_result = {
// We know this won't panic because of the earlier
// `number_read >= buf.len()` check
let new_len = (buf.len() - number_read).min(remaining_bytes);
let slice = &mut buf[number_read..number_read + new_len];
lock.stream.read(slice)
};
lock.read_state = Some(ParsingMessageBody {
substream_id,
remaining_bytes,
});
match read_result {
Ok(consumed) => {
if consumed == 0 {
lock.close()
}
let new_remaining = remaining_bytes - consumed;
lock.read_state = Some(ParsingMessageBody {
substream_id,
remaining_bytes: new_remaining,
});
on_block = Ok(number_read + consumed);
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
return on_block;
}
Err(other) => {
debug!("substream {}: failed to read message body: {}",
substream_id,
other);
return Err(other);
}
}
} else {
// We cannot make progress here, another stream has to accept this packet
if block_on_wrong_stream(substream_id, remaining_bytes, lock)? == 0 {
return on_block;
}
}
} else {
// We cannot make progress here, another stream has to accept this packet
if block_on_wrong_stream(substream_id, remaining_bytes, lock)? == 0 {
return on_block;
}
}
}
Ignore {
substream_id,
mut remaining_bytes,
} => {
let mut ignore_buf: [u8; 256] = [0; 256];
loop {
if remaining_bytes == 0 {
lock.read_state = None;
break;
} else {
let new_len = ignore_buf.len().min(remaining_bytes);
match lock.stream.read(&mut ignore_buf[..new_len]) {
Ok(consumed) => {
if consumed == 0 {
lock.close()
}
remaining_bytes -= consumed;
lock.read_state = Some(Ignore {
substream_id,
remaining_bytes: remaining_bytes,
});
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
lock.read_state = Some(Ignore {
substream_id,
remaining_bytes,
});
return on_block;
}
Err(other) => {
debug!("substream {}: failed to read ignore bytes: {}",
substream_id,
other);
lock.read_state = Some(Ignore {
substream_id,
remaining_bytes,
});
return Err(other);
}
}
}
}
}
}
}
}

View File

@ -1,124 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use read::MultiplexReadState;
use write::MultiplexWriteState;
use arrayvec::ArrayVec;
use bytes::Bytes;
use circular_buffer::{Array, CircularBuffer};
use futures::task::Task;
use std::collections::HashMap;
const BUF_SIZE: usize = 1024;
pub type ByteBuf = ArrayVec<[u8; BUF_SIZE]>;
pub enum SubstreamMetadata<Buf: Array> {
Closed,
Open(OpenSubstreamMetadata<Buf>),
}
pub struct OpenSubstreamMetadata<Buf: Array> {
pub read_cache: CircularBuffer<Buf>,
pub read: Vec<Task>,
pub write: Vec<Task>,
}
impl<Buf: Array> SubstreamMetadata<Buf> {
pub fn new_open() -> Self {
SubstreamMetadata::Open(OpenSubstreamMetadata {
read_cache: Default::default(),
read: Default::default(),
write: Default::default(),
})
}
pub fn open(&self) -> bool {
match *self {
SubstreamMetadata::Closed => false,
SubstreamMetadata::Open { .. } => true,
}
}
pub fn open_meta_mut(&mut self) -> Option<&mut OpenSubstreamMetadata<Buf>> {
match *self {
SubstreamMetadata::Closed => None,
SubstreamMetadata::Open(ref mut meta) => Some(meta),
}
}
}
// TODO: Split reading and writing into different structs and have information shared between the
// two in a `RwLock`, since `open_streams` and `to_open` are mostly read-only.
pub struct MultiplexShared<T, Buf: Array> {
// We use `Option` in order to take ownership of heap allocations within `DecoderState` and
// `BytesMut`. If this is ever observably `None` then something has panicked or the underlying
// stream returned an error.
pub read_state: Option<MultiplexReadState>,
pub write_state: Option<MultiplexWriteState>,
pub stream: T,
eof: bool, // true, if `stream` has been exhausted
pub open_streams: HashMap<u32, SubstreamMetadata<Buf>>,
pub meta_write_tasks: Vec<Task>,
// TODO: Should we use a version of this with a fixed size that doesn't allocate and return
// `WouldBlock` if it's full? Even if we ignore or size-cap names you can still open 2^32
// streams.
pub to_open: HashMap<u32, Option<Bytes>>,
}
impl<T, Buf: Array> MultiplexShared<T, Buf> {
pub fn new(stream: T) -> Self {
MultiplexShared {
read_state: Default::default(),
write_state: Default::default(),
open_streams: Default::default(),
meta_write_tasks: Default::default(),
to_open: Default::default(),
stream: stream,
eof: false,
}
}
pub fn open_stream(&mut self, id: u32) -> bool {
trace!("open stream {}", id);
self.open_streams
.entry(id)
.or_insert(SubstreamMetadata::new_open())
.open()
}
pub fn close_stream(&mut self, id: u32) {
trace!("close stream {}", id);
self.open_streams.insert(id, SubstreamMetadata::Closed);
}
pub fn close(&mut self) {
self.eof = true
}
pub fn is_closed(&self) -> bool {
self.eof
}
}
pub fn buf_from_slice(slice: &[u8]) -> ByteBuf {
slice.iter().cloned().take(BUF_SIZE).collect()
}

View File

@ -1,227 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use header::MultiplexHeader;
use shared::{ByteBuf, MultiplexShared, SubstreamMetadata};
use circular_buffer;
use futures::task;
use std::io;
use tokio_io::AsyncWrite;
use varint;
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum RequestType {
Meta,
Substream,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub struct WriteRequest {
header: MultiplexHeader,
request_type: RequestType,
}
impl WriteRequest {
pub fn substream(header: MultiplexHeader) -> Self {
WriteRequest {
header,
request_type: RequestType::Substream,
}
}
pub fn meta(header: MultiplexHeader) -> Self {
WriteRequest {
header,
request_type: RequestType::Meta,
}
}
}
#[derive(Default, Debug)]
pub struct MultiplexWriteState {
current: Option<(WriteRequest, MultiplexWriteStateInner)>,
queued: Option<WriteRequest>,
// TODO: Actually close these
to_close: Vec<u32>,
}
#[derive(Debug)]
pub enum MultiplexWriteStateInner {
WriteHeader { state: varint::EncoderState<u64> },
BodyLength { state: varint::EncoderState<usize> },
Body { size: usize },
}
pub fn write_stream<Buf: circular_buffer::Array, T: AsyncWrite>(
lock: &mut MultiplexShared<T, Buf>,
write_request: WriteRequest,
buf: &mut io::Cursor<ByteBuf>,
) -> io::Result<usize> {
use futures::Async;
use num_traits::cast::ToPrimitive;
use varint::WriteState;
use write::MultiplexWriteStateInner::*;
let mut on_block = Err(io::ErrorKind::WouldBlock.into());
let mut write_state = lock.write_state.take().unwrap_or_default();
let (request, mut state) = write_state.current.take().unwrap_or_else(|| {
(
write_request,
MultiplexWriteStateInner::WriteHeader {
state: varint::EncoderState::new(write_request.header.to_u64()),
},
)
});
let id = write_request.header.substream_id;
if buf.get_ref().len() as u64 - buf.position() == 0 {
return Ok(0);
}
match (request.request_type, write_request.request_type) {
(RequestType::Substream, RequestType::Substream) if request.header.substream_id != id => {
use std::mem;
if let Some(cur) = lock.open_streams
.entry(id)
.or_insert_with(|| SubstreamMetadata::new_open())
.open_meta_mut()
{
cur.write.push(task::current());
}
if let Some(tasks) = lock.open_streams
.get_mut(&request.header.substream_id)
.and_then(SubstreamMetadata::open_meta_mut)
.map(|cur| mem::replace(&mut cur.write, Default::default()))
{
for task in tasks {
task.notify();
}
}
lock.write_state = Some(write_state);
return on_block;
}
(RequestType::Substream, RequestType::Meta) => {
use std::mem;
lock.write_state = Some(write_state);
lock.meta_write_tasks.push(task::current());
if let Some(tasks) = lock.open_streams
.get_mut(&request.header.substream_id)
.and_then(SubstreamMetadata::open_meta_mut)
.map(|cur| mem::replace(&mut cur.write, Default::default()))
{
for task in tasks {
task.notify();
}
}
return on_block;
}
(RequestType::Meta, RequestType::Substream) => {
use std::mem;
lock.write_state = Some(write_state);
if let Some(cur) = lock.open_streams
.entry(id)
.or_insert_with(|| SubstreamMetadata::new_open())
.open_meta_mut()
{
cur.write.push(task::current());
}
for task in mem::replace(&mut lock.meta_write_tasks, Default::default()) {
task.notify();
}
return on_block;
}
_ => {}
}
loop {
// Err = should return, Ok = continue
let new_state = match state {
WriteHeader {
state: mut inner_state,
} => match inner_state
.write(&mut lock.stream)
.map_err(|_| io::ErrorKind::Other)?
{
Async::Ready(WriteState::Done(_)) => Ok(BodyLength {
state: varint::EncoderState::new(buf.get_ref().len()),
}),
Async::Ready(WriteState::Pending(_)) | Async::NotReady => {
Err(Some(WriteHeader { state: inner_state }))
}
},
BodyLength {
state: mut inner_state,
} => match inner_state
.write(&mut lock.stream)
.map_err(|_| io::ErrorKind::Other)?
{
Async::Ready(WriteState::Done(_)) => Ok(Body {
size: inner_state.source().to_usize().unwrap_or(::std::usize::MAX),
}),
Async::Ready(WriteState::Pending(_)) => Ok(BodyLength { state: inner_state }),
Async::NotReady => Err(Some(BodyLength { state: inner_state })),
},
Body { size } => {
if buf.position() == buf.get_ref().len() as u64 {
Err(None)
} else {
match lock.stream.write(&buf.get_ref()[buf.position() as usize..]) {
Ok(just_written) => {
let cur_pos = buf.position();
buf.set_position(cur_pos + just_written as u64);
on_block = Ok(on_block.unwrap_or(0) + just_written);
Ok(Body {
size: size - just_written,
})
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
Err(Some(Body { size }))
}
Err(other) => {
debug!("substream {}: failed to write body: {}", id, other);
return Err(other);
}
}
}
}
};
match new_state {
Ok(new_state) => state = new_state,
Err(new_state) => {
write_state.current = new_state.map(|state| (request, state));
lock.write_state = Some(write_state);
return on_block;
}
}
}
}

View File

@ -42,7 +42,7 @@ fn client_to_server_outbound() {
let bg_thread = thread::spawn(move || { let bg_thread = thread::spawn(move || {
let transport = let transport =
TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); TcpConfig::new().with_upgrade(multiplex::MplexConfig::new());
let (listener, addr) = transport let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
@ -70,7 +70,7 @@ fn client_to_server_outbound() {
tokio_current_thread::block_on_all(future).unwrap(); tokio_current_thread::block_on_all(future).unwrap();
}); });
let transport = TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let transport = TcpConfig::new().with_upgrade(multiplex::MplexConfig::new());
let future = transport let future = transport
.dial(rx.recv().unwrap()) .dial(rx.recv().unwrap())
@ -92,7 +92,7 @@ fn client_to_server_inbound() {
let bg_thread = thread::spawn(move || { let bg_thread = thread::spawn(move || {
let transport = let transport =
TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); TcpConfig::new().with_upgrade(multiplex::MplexConfig::new());
let (listener, addr) = transport let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
@ -120,7 +120,7 @@ fn client_to_server_inbound() {
tokio_current_thread::block_on_all(future).unwrap(); tokio_current_thread::block_on_all(future).unwrap();
}); });
let transport = TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let transport = TcpConfig::new().with_upgrade(multiplex::MplexConfig::new());
let future = transport let future = transport
.dial(rx.recv().unwrap()) .dial(rx.recv().unwrap())