diff --git a/Cargo.toml b/Cargo.toml index 6f1accf0..35a89307 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,13 +2,15 @@ members = [ "multistream-select", "datastore", + "multihash", "example", "libp2p-peerstore", "libp2p-ping", "libp2p-secio", "libp2p-swarm", - "libp2p-tcp-transport", "rw-stream-sink", + "varint-rs", + "multiplex-rs" ] [replace] diff --git a/libp2p-stream-muxer/Cargo.toml b/libp2p-stream-muxer/Cargo.toml new file mode 100644 index 00000000..51a831d1 --- /dev/null +++ b/libp2p-stream-muxer/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "libp2p-stream-muxer" +version = "0.1.0" +authors = ["Vurich "] + +[dependencies] +futures = "0.1" +tokio-io = "0.1" diff --git a/libp2p-stream-muxer/src/lib.rs b/libp2p-stream-muxer/src/lib.rs new file mode 100644 index 00000000..5479c1bd --- /dev/null +++ b/libp2p-stream-muxer/src/lib.rs @@ -0,0 +1,22 @@ +extern crate tokio_io; +extern crate futures; + +use futures::stream::Stream; +use tokio_io::{AsyncRead, AsyncWrite}; + +pub trait StreamMuxer { + type Substream: AsyncRead + AsyncWrite; + type InboundSubstreams: Stream; + type OutboundSubstreams: Stream; + + fn inbound(&mut self) -> Self::InboundSubstreams; + fn outbound(&mut self) -> Self::OutboundSubstreams; +} + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} diff --git a/multiplex-rs/Cargo.toml b/multiplex-rs/Cargo.toml new file mode 100644 index 00000000..346598fb --- /dev/null +++ b/multiplex-rs/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "multiplex" +version = "0.1.0" +authors = ["Vurich "] + +[dependencies] +bytes = "0.4.5" +num-traits = "0.1.40" +num-bigint = "0.1.40" +tokio-io = "0.1" +futures = "0.1" +parking_lot = "0.4.8" +libp2p-stream-muxer = { path = "../libp2p-stream-muxer" } +varint = { path = "../varint-rs" } diff --git a/multiplex-rs/src/lib.rs b/multiplex-rs/src/lib.rs new file mode 100644 index 00000000..b53cd87a --- /dev/null +++ b/multiplex-rs/src/lib.rs @@ -0,0 +1,501 @@ +extern crate bytes; +extern crate futures; +extern crate libp2p_stream_muxer; +extern crate tokio_io; +extern crate varint; +extern crate num_bigint; +extern crate num_traits; +extern crate parking_lot; + +use bytes::Bytes; +use futures::prelude::*; +use libp2p_stream_muxer::StreamMuxer; +use parking_lot::Mutex; +use std::collections::{HashSet, HashMap}; +use std::io::{self, Read, Write}; +use std::sync::Arc; +use tokio_io::{AsyncRead, AsyncWrite}; + +// So the multiplex is essentially a distributed finite state machine. +// +// In the first state the header must be read so that we know which substream to hand off the +// upcoming packet to. This is first-come, first-served - whichever substream begins reading the +// packet will be locked into reading the header until it is consumed (this may be changed in the +// future, for example by allowing the streams to cooperate on parsing headers). This implementation +// of `Multiplex` operates under the assumption that all substreams are consumed relatively equally. +// A higher-level wrapper may wrap this and add some level of buffering. +// +// In the second state, the substream ID is known. Only this substream can progress until the packet +// is consumed. + +/// Number of bits used for the metadata on multiplex packets +enum NextMultiplexState { + NewStream(usize), + ParsingMessageBody(usize), + Ignore, +} + +enum MultiplexState { + Header { state: varint::DecoderState }, + BodyLength { + state: varint::DecoderState, + next: NextMultiplexState, + }, + NewStream { + substream_id: usize, + name: bytes::BytesMut, + remaining_bytes: usize, + }, + ParsingMessageBody { + substream_id: usize, + remaining_bytes: usize, + }, + Ignore { remaining_bytes: usize }, +} + +impl Default for MultiplexState { + fn default() -> Self { + MultiplexState::Header { state: Default::default() } + } +} + +// TODO: Add writing. We should also add some form of "pending packet" so that we can always open at +// least one new substream. If this is stored on the substream itself then we can open +// infinite new substreams. +// +// When we've implemented writing, we should send the close message on `Substream` drop. This +// should probably be implemented with some kind of "pending close message" queue. The +// priority should go: +// 1. Open new stream messages +// 2. Regular messages +// 3. Close messages +// Since if we receive a message to a closed stream we just drop it anyway. +struct MultiplexShared { + // We use `Option` in order to take ownership of heap allocations within `DecoderState` and + // `BytesMut`. If this is ever observably `None` then something has panicked and the `Mutex` + // will be poisoned. + read_state: Option, + stream: T, + // true if the stream is open, false otherwise + open_streams: HashMap, + // TODO: Should we use a version of this with a fixed size that doesn't allocate and return + // `WouldBlock` if it's full? + to_open: HashMap, +} + +pub struct Substream { + id: usize, + name: Option, + state: Arc>>, +} + +impl Drop for Substream { + fn drop(&mut self) { + let mut lock = self.state.lock(); + + lock.open_streams.insert(self.id, false); + } +} + +impl Substream { + fn new>>( + id: usize, + name: B, + state: Arc>>, + ) -> Self { + let name = name.into(); + + Substream { id, name, state } + } + + pub fn name(&self) -> Option<&Bytes> { + self.name.as_ref() + } +} + +/// This is unsafe because you must ensure that only the `AsyncRead` that was passed in is later +/// used to write to the returned buffer. +unsafe fn create_buffer_for(capacity: usize, inner: &R) -> bytes::BytesMut { + let mut buffer = bytes::BytesMut::with_capacity(capacity); + buffer.set_len(capacity); + inner.prepare_uninitialized_buffer(&mut buffer); + buffer +} + +fn read_stream( + mut stream_data: Option<(usize, &mut [u8])>, + lock: &mut MultiplexShared, +) -> io::Result { + use num_traits::cast::ToPrimitive; + use MultiplexState::*; + + let stream_has_been_gracefully_closed = stream_data + .as_ref() + .and_then(|&(id, _)| lock.open_streams.get(&id)) + .map(|is_open| !is_open) + .unwrap_or(false); + + let mut on_block: io::Result = if stream_has_been_gracefully_closed { + Ok(0) + } else { + Err(io::Error::from(io::ErrorKind::WouldBlock)) + }; + + loop { + match lock.read_state.take().expect("Logic error or panic") { + Header { state: varint_state } => { + match varint_state.read(&mut lock.stream).map_err(|_| { + io::Error::from(io::ErrorKind::Other) + })? { + Ok(header) => { + let MultiplexHeader { + substream_id, + packet_type, + } = MultiplexHeader::parse(header).map_err(|_| { + io::Error::from(io::ErrorKind::Other) + })?; + + match packet_type { + PacketType::Open => { + lock.read_state = Some(BodyLength { + state: Default::default(), + next: NextMultiplexState::NewStream(substream_id), + }) + } + PacketType::Message(_) => { + lock.read_state = Some(BodyLength { + state: Default::default(), + next: NextMultiplexState::ParsingMessageBody(substream_id), + }) + } + // NOTE: What's the difference between close and reset? + PacketType::Close(_) | + PacketType::Reset(_) => { + lock.read_state = Some(BodyLength { + state: Default::default(), + next: NextMultiplexState::Ignore, + }); + + lock.open_streams.remove(&substream_id); + } + } + } + Err(new_state) => { + lock.read_state = Some(Header { state: new_state }); + return on_block; + } + } + } + BodyLength { + state: varint_state, + next, + } => { + match varint_state.read(&mut lock.stream).map_err(|_| { + io::Error::from(io::ErrorKind::Other) + })? { + Ok(length) => { + use NextMultiplexState::*; + + let length = length.to_usize().ok_or( + io::Error::from(io::ErrorKind::Other), + )?; + + lock.read_state = Some(match next { + Ignore => MultiplexState::Ignore { remaining_bytes: length }, + NewStream(substream_id) => MultiplexState::NewStream { + // This is safe as long as we only use `lock.stream` to write to + // this field + name: unsafe { create_buffer_for(length, &lock.stream) }, + remaining_bytes: length, + substream_id, + }, + ParsingMessageBody(substream_id) => { + let is_open = lock.open_streams + .get(&substream_id) + .map(|is_open| *is_open) + .unwrap_or(false); + if is_open { + MultiplexState::ParsingMessageBody { + remaining_bytes: length, + substream_id, + } + } else { + MultiplexState::Ignore { remaining_bytes: length } + } + } + }); + } + Err(new_state) => { + lock.read_state = Some(BodyLength { + state: new_state, + next, + }); + + return on_block; + } + } + } + NewStream { + substream_id, + mut name, + remaining_bytes, + } => { + if remaining_bytes == 0 { + lock.to_open.insert(substream_id, name.freeze()); + + lock.read_state = Some(Default::default()); + } else { + let cursor_pos = name.len() - remaining_bytes; + let consumed = lock.stream.read(&mut name[cursor_pos..]); + + match consumed { + Ok(consumed) => { + let new_remaining = remaining_bytes - consumed; + + lock.read_state = Some(NewStream { + substream_id, + name, + remaining_bytes: new_remaining, + }) + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + lock.read_state = Some(NewStream { + substream_id, + name, + remaining_bytes, + }); + + return on_block; + } + Err(other) => return Err(other), + } + } + } + ParsingMessageBody { + substream_id, + remaining_bytes, + } => { + if let Some((ref mut id, ref mut buf)) = stream_data { + use MultiplexState::*; + + if substream_id == *id { + if remaining_bytes == 0 { + lock.read_state = Some(Default::default()); + } else { + let read_result = { + let new_len = buf.len().min(remaining_bytes); + let slice = &mut buf[..new_len]; + + lock.stream.read(slice) + }; + + match read_result { + Ok(consumed) => { + let new_remaining = remaining_bytes - consumed; + + lock.read_state = Some(ParsingMessageBody { + substream_id, + remaining_bytes: new_remaining, + }); + + on_block = Ok(on_block.unwrap_or(0) + consumed); + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + lock.read_state = Some(ParsingMessageBody { + substream_id, + remaining_bytes, + }); + + return on_block; + } + Err(other) => return Err(other), + } + } + } else { + lock.read_state = Some(ParsingMessageBody { + substream_id, + remaining_bytes, + }); + + // We cannot make progress here, another stream has to accept this packet + return on_block; + } + } + } + Ignore { mut remaining_bytes } => { + let mut ignore_buf: [u8; 256] = [0; 256]; + + loop { + if remaining_bytes == 0 { + lock.read_state = Some(Default::default()); + } else { + let new_len = ignore_buf.len().min(remaining_bytes); + match lock.stream.read(&mut ignore_buf[..new_len]) { + Ok(consumed) => remaining_bytes -= consumed, + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + lock.read_state = Some(Ignore { remaining_bytes }); + + return on_block; + } + Err(other) => return Err(other), + } + } + } + } + } + } +} + +// TODO: We always zero the buffer, we should delegate to the inner stream. Maybe use a `RWLock` +// instead? +impl Read for Substream { + // TODO: Is it wasteful to have all of our substreams try to make progress? Can we use an + // `AtomicBool` or `AtomicUsize` to limit the substreams that try to progress? + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut lock = match self.state.try_lock() { + Some(lock) => lock, + None => return Err(io::Error::from(io::ErrorKind::WouldBlock)), + }; + + read_stream(Some((self.id, buf)), &mut lock) + } +} + +impl AsyncRead for Substream {} + +impl Write for Substream { + fn write(&mut self, buf: &[u8]) -> io::Result { + unimplemented!() + } + + fn flush(&mut self) -> io::Result<()> { + unimplemented!() + } +} + +impl AsyncWrite for Substream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + unimplemented!() + } +} + +struct ParseError; + +enum MultiplexEnd { + Initiator, + Receiver, +} + +struct MultiplexHeader { + pub packet_type: PacketType, + pub substream_id: usize, +} +enum PacketType { + Open, + Close(MultiplexEnd), + Reset(MultiplexEnd), + Message(MultiplexEnd), +} + +impl MultiplexHeader { + // TODO: Use `u128` or another large integer type instead of bigint since we never use more than + // `pointer width + FLAG_BITS` bits and unconditionally allocating 1-3 `u32`s for that is + // ridiculous (especially since even for small numbers we have to allocate 1 `u32`). + // If this is the future and `BigUint` is better-optimised (maybe by using `Bytes`) then + // forget it. + fn parse(header: num_bigint::BigUint) -> Result { + use num_traits::cast::ToPrimitive; + + const FLAG_BITS: usize = 3; + + // `&header` to make `>>` produce a new `BigUint` instead of consuming the old `BigUint` + let substream_id = ((&header) >> FLAG_BITS).to_usize().ok_or(ParseError)?; + + let flag_mask = (2usize << FLAG_BITS) - 1; + let flags = header.to_usize().ok_or(ParseError)? & flag_mask; + + // Yes, this is really how it works. No, I don't know why. + let packet_type = match flags { + 0 => PacketType::Open, + + 1 => PacketType::Message(MultiplexEnd::Receiver), + 2 => PacketType::Message(MultiplexEnd::Initiator), + + 3 => PacketType::Close(MultiplexEnd::Receiver), + 4 => PacketType::Close(MultiplexEnd::Initiator), + + 5 => PacketType::Reset(MultiplexEnd::Receiver), + 6 => PacketType::Reset(MultiplexEnd::Initiator), + + _ => return Err(ParseError), + }; + + Ok(MultiplexHeader { + substream_id, + packet_type, + }) + } +} + +pub struct Multiplex { + state: Arc>>, +} + +pub struct InboundStream { + state: Arc>>, +} + +impl Stream for InboundStream { + type Item = Substream; + type Error = io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut lock = match self.state.try_lock() { + Some(lock) => lock, + None => return Ok(Async::NotReady), + }; + + // Attempt to make progress, but don't block if we can't + match read_stream(None, &mut lock) { + Ok(_) => (), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => (), + Err(err) => return Err(err), + } + + let id = if let Some((id, _)) = lock.to_open.iter().next() { + *id + } else { + return Ok(Async::NotReady); + }; + + let name = lock.to_open.remove(&id).expect( + "We just checked that this key exists and we have exclusive access to the map, QED", + ); + + Ok(Async::Ready( + Some(Substream::new(id, name, self.state.clone())), + )) + } +} + +impl StreamMuxer for Multiplex { + type Substream = Substream; + type OutboundSubstreams = Box>; + type InboundSubstreams = InboundStream; + + fn inbound(&mut self) -> Self::InboundSubstreams { + InboundStream { state: self.state.clone() } + } + + fn outbound(&mut self) -> Self::OutboundSubstreams { + unimplemented!() + } +} + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} diff --git a/varint-rs/.gitignore b/varint-rs/.gitignore new file mode 100644 index 00000000..6aa10640 --- /dev/null +++ b/varint-rs/.gitignore @@ -0,0 +1,3 @@ +/target/ +**/*.rs.bk +Cargo.lock diff --git a/varint-rs/Cargo.toml b/varint-rs/Cargo.toml new file mode 100644 index 00000000..ffe819b0 --- /dev/null +++ b/varint-rs/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "varint" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +num-bigint = "0.1.40" +bytes = "0.4.5" +tokio-io = "0.1" +futures = "0.1" diff --git a/varint-rs/src/lib.rs b/varint-rs/src/lib.rs new file mode 100644 index 00000000..f126536b --- /dev/null +++ b/varint-rs/src/lib.rs @@ -0,0 +1,161 @@ +extern crate num_bigint; +extern crate tokio_io; +extern crate bytes; +extern crate futures; + +use bytes::BytesMut; +use num_bigint::BigUint; +use tokio_io::AsyncRead; +use tokio_io::codec::Decoder; +use std::io; +use std::io::prelude::*; + +// TODO: error-chain +pub struct ParseError; + +#[derive(Default)] +pub struct DecoderState { + // TODO: Non-allocating `BigUint`? + accumulator: BigUint, + shift: usize, +} + +impl DecoderState { + pub fn new() -> Self { + Default::default() + } + + fn decode_one(mut self, byte: u8) -> Result { + self.accumulator = self.accumulator | (BigUint::from(byte & 0x7F) << self.shift); + self.shift += 7; + + if byte & 0x80 == 0 { + Ok(self.accumulator) + } else { + Err(self) + } + } + + // Why the weird type signature? Well, `BigUint` owns its storage, and we don't want to clone + // it. So, we want the accumulator to be moved out when it is ready. We could have also used + // `Option`, but this means that it's not possible to end up in an inconsistent state + // (`shift != 0 && accumulator.is_none()`). + pub fn read(self, mut input: R) -> Result, ParseError> { + let mut state = self; + loop { + // We read one at a time to prevent consuming too much of the buffer. + let mut buffer: [u8; 1] = [0]; + + match input.read_exact(&mut buffer) { + Ok(_) => { + state = match state.decode_one(buffer[0]) { + Ok(out) => break Ok(Ok(out)), + Err(state) => state, + }; + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break Ok(Err(state)), + Err(_) => break Err(ParseError), + } + } + } +} + +#[derive(Default)] +pub struct VarintDecoder { + state: Option, +} + +impl VarintDecoder { + pub fn new() -> Self { + Default::default() + } +} + +impl Decoder for VarintDecoder { + type Item = BigUint; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + loop { + if src.len() == 0 { + break Err(io::Error::from(io::ErrorKind::UnexpectedEof)); + } else { + // We know that the length is not 0, so this cannot fail. + let first_byte = src.split_to(1)[0]; + let new_state = match self.state.take() { + Some(state) => state.decode_one(first_byte), + None => DecoderState::new().decode_one(first_byte), + }; + + match new_state { + Ok(out) => break Ok(Some(out)), + Err(state) => self.state = Some(state), + } + } + } + } +} + +pub fn decode(stream: R) -> io::Result { + let mut out = BigUint::from(0u8); + let mut shift = 0; + let mut finished_cleanly = false; + + for i in stream.bytes() { + let i = i?; + + out = out | (BigUint::from(i & 0x7F) << shift); + shift += 7; + + if i & 0x80 == 0 { + finished_cleanly = true; + break; + } + } + + if finished_cleanly { + Ok(out) + } else { + Err(io::Error::from(io::ErrorKind::UnexpectedEof)) + } +} + +#[cfg(test)] +mod tests { + use super::{decode, VarintDecoder}; + use tokio_io::codec::FramedRead; + use num_bigint::BigUint; + use futures::{Future, Stream}; + + #[test] + fn can_decode_basic_uint() { + assert_eq!( + BigUint::from(300u16), + decode(&[0b10101100, 0b00000010][..]).unwrap() + ); + } + + #[test] + fn can_decode_basic_uint_async() { + let result = FramedRead::new(&[0b10101100, 0b00000010][..], VarintDecoder::new()) + .into_future() + .map(|(out, _)| out) + .map_err(|(out, _)| out) + .wait(); + + assert_eq!(result.unwrap(), Some(BigUint::from(300u16))); + } + + #[test] + fn unexpected_eof_async() { + use std::io; + + let result = FramedRead::new(&[0b10101100, 0b10000010][..], VarintDecoder::new()) + .into_future() + .map(|(out, _)| out) + .map_err(|(out, _)| out) + .wait(); + + assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); + } +}