mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-19 21:11:22 +00:00
Externalise unsigned-varint crate. (#329)
This commit is contained in:
committed by
Benjamin Kampmann
parent
eb53ea2515
commit
66db7f89b7
@ -22,7 +22,6 @@ members = [
|
||||
"tcp-transport",
|
||||
"transport-timeout",
|
||||
"uds",
|
||||
"varint-rs",
|
||||
"websocket",
|
||||
"yamux"
|
||||
]
|
||||
|
@ -18,4 +18,4 @@ protobuf = "2.0.2"
|
||||
smallvec = "0.6.0"
|
||||
tokio-codec = "0.1"
|
||||
tokio-io = "0.1"
|
||||
varint = { path = "../varint-rs" }
|
||||
unsigned-varint = { version = "0.1", features = ["codec"] }
|
||||
|
@ -32,7 +32,7 @@ extern crate protobuf;
|
||||
extern crate smallvec;
|
||||
extern crate tokio_codec;
|
||||
extern crate tokio_io;
|
||||
extern crate varint;
|
||||
extern crate unsigned_varint;
|
||||
|
||||
mod rpc_proto;
|
||||
mod topic;
|
||||
@ -58,7 +58,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use tokio_codec::Framed;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use varint::VarintCodec;
|
||||
use unsigned_varint::codec;
|
||||
|
||||
/// Implementation of the `ConnectionUpgrade` for the floodsub protocol.
|
||||
#[derive(Debug, Clone)]
|
||||
@ -136,7 +136,7 @@ where
|
||||
};
|
||||
|
||||
// Split the socket into writing and reading parts.
|
||||
let (floodsub_sink, floodsub_stream) = Framed::new(socket, VarintCodec::default())
|
||||
let (floodsub_sink, floodsub_stream) = Framed::new(socket, codec::UviBytes::default())
|
||||
.sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
|
||||
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
|
||||
.split();
|
||||
|
@ -16,7 +16,7 @@ parking_lot = "0.6"
|
||||
protobuf = "2.0.2"
|
||||
tokio-codec = "0.1"
|
||||
tokio-io = "0.1.0"
|
||||
varint = { path = "../varint-rs" }
|
||||
unsigned-varint = { version = "0.1", features = ["codec"] }
|
||||
|
||||
[dev-dependencies]
|
||||
libp2p-tcp-transport = { path = "../tcp-transport" }
|
||||
|
@ -77,7 +77,7 @@ extern crate parking_lot;
|
||||
extern crate protobuf;
|
||||
extern crate tokio_codec;
|
||||
extern crate tokio_io;
|
||||
extern crate varint;
|
||||
extern crate unsigned_varint;
|
||||
|
||||
pub use self::identify_transport::IdentifyTransportOutcome;
|
||||
pub use self::peer_id_transport::{PeerIdTransport, PeerIdTransportOutput};
|
||||
|
@ -30,7 +30,7 @@ use std::iter;
|
||||
use structs_proto;
|
||||
use tokio_codec::Framed;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use varint::VarintCodec;
|
||||
use unsigned_varint::codec;
|
||||
|
||||
/// Configuration for an upgrade to the identity protocol.
|
||||
#[derive(Debug, Clone)]
|
||||
@ -56,7 +56,7 @@ pub enum IdentifyOutput<T> {
|
||||
|
||||
/// Object used to send back information to the client.
|
||||
pub struct IdentifySender<T> {
|
||||
inner: Framed<T, VarintCodec<Vec<u8>>>,
|
||||
inner: Framed<T, codec::UviBytes<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl<'a, T> IdentifySender<T>
|
||||
@ -130,7 +130,7 @@ where
|
||||
fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: Maf) -> Self::Future {
|
||||
trace!("Upgrading connection as {:?}", ty);
|
||||
|
||||
let socket = Framed::new(socket, VarintCodec::default());
|
||||
let socket = Framed::new(socket, codec::UviBytes::default());
|
||||
|
||||
match ty {
|
||||
Endpoint::Dialer => {
|
||||
|
@ -24,7 +24,7 @@ smallvec = "0.5"
|
||||
tokio-codec = "0.1"
|
||||
tokio-io = "0.1"
|
||||
tokio-timer = "0.2"
|
||||
varint = { path = "../varint-rs" }
|
||||
unsigned-varint = { version = "0.1", features = ["codec"] }
|
||||
|
||||
[dev-dependencies]
|
||||
libp2p-tcp-transport = { path = "../tcp-transport" }
|
||||
|
@ -76,7 +76,7 @@ extern crate smallvec;
|
||||
extern crate tokio_codec;
|
||||
extern crate tokio_io;
|
||||
extern crate tokio_timer;
|
||||
extern crate varint;
|
||||
extern crate unsigned_varint;
|
||||
|
||||
pub use self::high_level::{KadSystemConfig, KadSystem, KadQueryEvent};
|
||||
pub use self::kad_server::{KadConnecController, KadConnecConfig, KadIncomingRequest, KadFindNodeRespond};
|
||||
|
@ -34,7 +34,7 @@ use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use std::iter;
|
||||
use tokio_codec::Framed;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use varint::VarintCodec;
|
||||
use unsigned_varint::codec;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
|
||||
pub enum KadConnectionType {
|
||||
@ -148,7 +148,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
type KadStreamSink<S> = stream::AndThen<sink::With<stream::FromErr<Framed<S, VarintCodec<Vec<u8>>>, IoError>, KadMsg, fn(KadMsg) -> Result<Vec<u8>, IoError>, Result<Vec<u8>, IoError>>, fn(BytesMut) -> Result<KadMsg, IoError>, Result<KadMsg, IoError>>;
|
||||
type KadStreamSink<S> = stream::AndThen<sink::With<stream::FromErr<Framed<S, codec::UviBytes<Vec<u8>>>, IoError>, KadMsg, fn(KadMsg) -> Result<Vec<u8>, IoError>, Result<Vec<u8>, IoError>>, fn(BytesMut) -> Result<KadMsg, IoError>, Result<KadMsg, IoError>>;
|
||||
|
||||
// Upgrades a socket to use the Kademlia protocol.
|
||||
fn kademlia_protocol<S>(
|
||||
@ -157,7 +157,7 @@ fn kademlia_protocol<S>(
|
||||
where
|
||||
S: AsyncRead + AsyncWrite,
|
||||
{
|
||||
Framed::new(socket, VarintCodec::default())
|
||||
Framed::new(socket, codec::UviBytes::default())
|
||||
.from_err::<IoError>()
|
||||
.with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> {
|
||||
let proto_struct = msg_to_proto(request);
|
||||
|
@ -12,7 +12,7 @@ libp2p-core = { path = "../core" }
|
||||
parking_lot = "0.6"
|
||||
tokio-codec = "0.1"
|
||||
tokio-io = "0.1"
|
||||
varint = { path = "../varint-rs" }
|
||||
unsigned-varint = { version = "0.1", features = ["codec"] }
|
||||
|
||||
[dev-dependencies]
|
||||
libp2p-tcp-transport = { path = "../tcp-transport" }
|
||||
|
@ -24,7 +24,7 @@ use std::mem;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use core::Endpoint;
|
||||
use tokio_io::codec::{Decoder, Encoder};
|
||||
use varint;
|
||||
use unsigned_varint::{codec, encode};
|
||||
|
||||
// Arbitrary maximum size for a packet.
|
||||
// Since data is entirely buffered before being dispatched, we need a limit or remotes could just
|
||||
@ -60,7 +60,7 @@ impl Elem {
|
||||
}
|
||||
|
||||
pub struct Codec {
|
||||
varint_decoder: varint::VarintDecoder<u32>,
|
||||
varint_decoder: codec::Uvi<u32>,
|
||||
decoder_state: CodecDecodeState,
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ enum CodecDecodeState {
|
||||
impl Codec {
|
||||
pub fn new() -> Codec {
|
||||
Codec {
|
||||
varint_decoder: varint::VarintDecoder::new(),
|
||||
varint_decoder: codec::Uvi::default(),
|
||||
decoder_state: CodecDecodeState::Begin,
|
||||
}
|
||||
}
|
||||
@ -178,9 +178,12 @@ impl Encoder for Codec {
|
||||
},
|
||||
};
|
||||
|
||||
let header_bytes = varint::encode(header);
|
||||
let mut header_buf = encode::u64_buffer();
|
||||
let header_bytes = encode::u64(header, &mut header_buf);
|
||||
|
||||
let data_len = data.as_ref().len();
|
||||
let data_len_bytes = varint::encode(data_len);
|
||||
let mut data_buf = encode::usize_buffer();
|
||||
let data_len_bytes = encode::usize(data_len, &mut data_buf);
|
||||
|
||||
if data_len > MAX_FRAME_SIZE {
|
||||
return Err(IoError::new(IoErrorKind::InvalidData, "data size exceed maximum"));
|
||||
|
@ -26,7 +26,7 @@ extern crate libp2p_core as core;
|
||||
extern crate parking_lot;
|
||||
extern crate tokio_codec;
|
||||
extern crate tokio_io;
|
||||
extern crate varint;
|
||||
extern crate unsigned_varint;
|
||||
|
||||
mod codec;
|
||||
|
||||
|
@ -10,7 +10,7 @@ futures = { version = "0.1" }
|
||||
log = "0.4"
|
||||
smallvec = "0.5"
|
||||
tokio-io = "0.1"
|
||||
varint = { path = "../varint-rs" }
|
||||
unsigned-varint = { version = "0.1", features = ["codec"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-current-thread = "0.1"
|
||||
|
@ -121,7 +121,7 @@ extern crate futures;
|
||||
extern crate log;
|
||||
extern crate smallvec;
|
||||
extern crate tokio_io;
|
||||
extern crate varint;
|
||||
extern crate unsigned_varint;
|
||||
|
||||
mod dialer_select;
|
||||
mod error;
|
||||
|
@ -27,11 +27,11 @@ use protocol::DialerToListenerMessage;
|
||||
use protocol::ListenerToDialerMessage;
|
||||
use protocol::MultistreamSelectError;
|
||||
use protocol::MULTISTREAM_PROTOCOL_WITH_LF;
|
||||
use std::io::{BufRead, Cursor, Read};
|
||||
use std::io::{BufRead, Cursor};
|
||||
use tokio_io::codec::length_delimited::Builder as LengthDelimitedBuilder;
|
||||
use tokio_io::codec::length_delimited::FramedWrite as LengthDelimitedFramedWrite;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use varint;
|
||||
use unsigned_varint::decode;
|
||||
|
||||
/// Wraps around a `AsyncRead+AsyncWrite`. Assumes that we're on the dialer's side. Produces and
|
||||
/// accepts messages.
|
||||
@ -154,9 +154,8 @@ where
|
||||
return Ok(Async::Ready(Some(ListenerToDialerMessage::NotAvailable)));
|
||||
} else {
|
||||
// A varint number of protocols
|
||||
let mut reader = Cursor::new(frame);
|
||||
let num_protocols: usize = varint::decode(reader.by_ref())?;
|
||||
|
||||
let (num_protocols, remaining) = decode::usize(&frame)?;
|
||||
let reader = Cursor::new(remaining);
|
||||
let mut iter = BufRead::split(reader, b'\r');
|
||||
if !iter.next()
|
||||
.ok_or(MultistreamSelectError::UnknownMessage)??
|
||||
|
@ -23,7 +23,7 @@
|
||||
use std::error;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use varint;
|
||||
use unsigned_varint::decode;
|
||||
|
||||
/// Error at the multistream-select layer of communication.
|
||||
#[derive(Debug)]
|
||||
@ -52,9 +52,9 @@ impl From<io::Error> for MultistreamSelectError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<varint::Error> for MultistreamSelectError {
|
||||
impl From<decode::Error> for MultistreamSelectError {
|
||||
#[inline]
|
||||
fn from(err: varint::Error) -> MultistreamSelectError {
|
||||
fn from(err: decode::Error) -> MultistreamSelectError {
|
||||
MultistreamSelectError::VarintParseError(err.to_string())
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ use protocol::MULTISTREAM_PROTOCOL_WITH_LF;
|
||||
use tokio_io::codec::length_delimited::Builder as LengthDelimitedBuilder;
|
||||
use tokio_io::codec::length_delimited::FramedWrite as LengthDelimitedFramedWrite;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use varint;
|
||||
use unsigned_varint::encode;
|
||||
|
||||
/// Wraps around a `AsyncRead+AsyncWrite`. Assumes that we're on the listener's side. Produces and
|
||||
/// accepts messages.
|
||||
@ -126,7 +126,8 @@ where
|
||||
ListenerToDialerMessage::ProtocolsListResponse { list } => {
|
||||
use std::iter;
|
||||
|
||||
let mut out_msg = varint::encode(list.len());
|
||||
let mut buf = encode::usize_buffer();
|
||||
let mut out_msg = Vec::from(encode::usize(list.len(), &mut buf));
|
||||
for elem in &list {
|
||||
out_msg.extend(iter::once(b'\r'));
|
||||
out_msg.extend_from_slice(elem);
|
||||
|
@ -15,4 +15,4 @@ protobuf = "2.0.2"
|
||||
rand = "0.4"
|
||||
tokio-codec = "0.1"
|
||||
tokio-io = "0.1"
|
||||
varint = { path = "../varint-rs" }
|
||||
unsigned-varint = { version = "0.1", features = ["codec"] }
|
||||
|
@ -30,7 +30,7 @@ extern crate protobuf;
|
||||
extern crate rand;
|
||||
extern crate tokio_codec;
|
||||
extern crate tokio_io;
|
||||
extern crate varint;
|
||||
extern crate unsigned_varint;
|
||||
|
||||
mod copy;
|
||||
mod message;
|
||||
|
@ -26,7 +26,7 @@ use protobuf::{self, Message};
|
||||
use std::{io, error::Error, iter::FromIterator};
|
||||
use tokio_codec::Framed;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use varint::VarintCodec;
|
||||
use unsigned_varint::codec;
|
||||
|
||||
pub(crate) fn is_success(msg: &CircuitRelay) -> bool {
|
||||
msg.get_field_type() == CircuitRelay_Type::STATUS
|
||||
@ -41,13 +41,13 @@ pub(crate) fn status(s: CircuitRelay_Status) -> CircuitRelay {
|
||||
}
|
||||
|
||||
pub(crate) struct Io<T> {
|
||||
codec: Framed<T, VarintCodec<Vec<u8>>>,
|
||||
codec: Framed<T, codec::UviBytes<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> Io<T> {
|
||||
pub(crate) fn new(c: T) -> Io<T> {
|
||||
Io {
|
||||
codec: Framed::new(c, VarintCodec::default()),
|
||||
codec: Framed::new(c, codec::UviBytes::default()),
|
||||
}
|
||||
}
|
||||
|
||||
|
3
varint-rs/.gitignore
vendored
3
varint-rs/.gitignore
vendored
@ -1,3 +0,0 @@
|
||||
/target/
|
||||
**/*.rs.bk
|
||||
Cargo.lock
|
@ -1,16 +0,0 @@
|
||||
[package]
|
||||
name = "varint"
|
||||
version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
num-bigint = { version = "0.1.41", default-features = false }
|
||||
num-traits = "0.2"
|
||||
bytes = "0.4.5"
|
||||
tokio-io = "0.1"
|
||||
futures = "0.1"
|
||||
error-chain = "0.11.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-codec = "0.1"
|
@ -1,655 +0,0 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
//! Encoding and decoding state machines for protobuf varints
|
||||
|
||||
// TODO: Non-allocating `BigUint`?
|
||||
extern crate bytes;
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
extern crate futures;
|
||||
extern crate num_bigint;
|
||||
extern crate num_traits;
|
||||
extern crate tokio_io;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate tokio_codec;
|
||||
|
||||
use bytes::{BufMut, Bytes, BytesMut, IntoBuf};
|
||||
use futures::{Async, Poll};
|
||||
use num_bigint::BigUint;
|
||||
use num_traits::ToPrimitive;
|
||||
use std::io;
|
||||
use std::io::prelude::*;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use tokio_io::codec::{Decoder, Encoder};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
mod errors {
|
||||
error_chain! {
|
||||
errors {
|
||||
ParseError {
|
||||
description("error parsing varint")
|
||||
display("error parsing varint")
|
||||
}
|
||||
WriteError {
|
||||
description("error writing varint")
|
||||
display("error writing varint")
|
||||
}
|
||||
}
|
||||
|
||||
foreign_links {
|
||||
Io(::std::io::Error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub use errors::{Error, ErrorKind};
|
||||
|
||||
const USABLE_BITS_PER_BYTE: usize = 7;
|
||||
|
||||
/// The state struct for the varint-to-bytes FSM
|
||||
#[derive(Debug)]
|
||||
pub struct EncoderState<T> {
|
||||
source: T,
|
||||
// A "chunk" is a section of the `source` `USABLE_BITS_PER_BYTE` bits long
|
||||
num_chunks: usize,
|
||||
cur_chunk: usize,
|
||||
}
|
||||
|
||||
/// Whether or not the varint writing was completed
|
||||
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
|
||||
pub enum WriteState {
|
||||
/// The encoder has finished writing
|
||||
Done(usize),
|
||||
/// The encoder still must write more bytes
|
||||
Pending(usize),
|
||||
}
|
||||
|
||||
fn ceil_div(a: usize, b: usize) -> usize {
|
||||
(a + b - 1) / b
|
||||
}
|
||||
|
||||
/// A trait to get the minimum number of bits required to represent a number
|
||||
pub trait Bits {
|
||||
/// The minimum number of bits required to represent `self`
|
||||
fn bits(&self) -> usize;
|
||||
}
|
||||
|
||||
impl Bits for BigUint {
|
||||
fn bits(&self) -> usize {
|
||||
BigUint::bits(self)
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_bits {
|
||||
($t:ty) => {
|
||||
impl Bits for $t {
|
||||
fn bits(&self) -> usize {
|
||||
(std::mem::size_of::<$t>() * 8) - self.leading_zeros() as usize
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_bits!(usize);
|
||||
impl_bits!(u64);
|
||||
impl_bits!(u32);
|
||||
impl_bits!(u16);
|
||||
impl_bits!(u8);
|
||||
|
||||
/// Helper trait to allow multiple integer types to be encoded
|
||||
pub trait EncoderHelper: Sized {
|
||||
/// Write as much as possible of the inner integer to the output `AsyncWrite`
|
||||
fn write<W: AsyncWrite>(encoder: &mut EncoderState<Self>, output: W)
|
||||
-> Poll<WriteState, Error>;
|
||||
}
|
||||
|
||||
/// Helper trait to allow multiple integer types to be encoded
|
||||
pub trait DecoderHelper: Sized {
|
||||
/// Decode a single byte
|
||||
fn decode_one(decoder: &mut DecoderState<Self>, byte: u8) -> errors::Result<Option<Self>>;
|
||||
|
||||
/// Read as much of the varint as possible
|
||||
fn read<R: AsyncRead>(decoder: &mut DecoderState<Self>, input: R) -> Poll<Option<Self>, Error>;
|
||||
}
|
||||
|
||||
macro_rules! impl_decoderstate {
|
||||
($t:ty) => {
|
||||
impl_decoderstate!(
|
||||
$t,
|
||||
|a| a as $t,
|
||||
|a: $t, b| -> Option<$t> { a.checked_shl(b as u32) }
|
||||
);
|
||||
};
|
||||
($t:ty, $make_fn:expr) => {
|
||||
impl_decoderstate!($t, $make_fn, $make_fn);
|
||||
};
|
||||
($t:ty, $make_fn:expr, $shift_fn:expr) => {
|
||||
impl DecoderHelper for $t {
|
||||
#[inline]
|
||||
fn decode_one(
|
||||
decoder: &mut DecoderState<Self>,
|
||||
byte: u8,
|
||||
) -> ::errors::Result<Option<$t>> {
|
||||
let res = decoder.accumulator.take().and_then(|accumulator| {
|
||||
let out = accumulator
|
||||
| match $shift_fn(
|
||||
$make_fn(byte & 0x7F),
|
||||
decoder.shift * USABLE_BITS_PER_BYTE,
|
||||
) {
|
||||
Some(a) => a,
|
||||
None => return Some(Err(ErrorKind::ParseError.into())),
|
||||
};
|
||||
decoder.shift += 1;
|
||||
|
||||
if byte & 0x80 == 0 {
|
||||
Some(Ok(out))
|
||||
} else {
|
||||
decoder.accumulator = AccumulatorState::InProgress(out);
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
match res {
|
||||
Some(Ok(number)) => Ok(Some(number)),
|
||||
Some(Err(err)) => Err(err),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn read<R: AsyncRead>(
|
||||
decoder: &mut DecoderState<Self>,
|
||||
mut input: R,
|
||||
) -> Poll<Option<Self>, Error> {
|
||||
if decoder.accumulator == AccumulatorState::Finished {
|
||||
return Err(Error::with_chain(
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Attempted to parse a second varint (create a new instance!)",
|
||||
),
|
||||
ErrorKind::ParseError,
|
||||
));
|
||||
}
|
||||
|
||||
loop {
|
||||
// We read one at a time to prevent consuming too much of the buffer.
|
||||
let mut buffer: [u8; 1] = [0];
|
||||
|
||||
match input.read_exact(&mut buffer) {
|
||||
Ok(()) => {
|
||||
if let Some(out) = Self::decode_one(decoder, buffer[0])? {
|
||||
break Ok(Async::Ready(Some(out)));
|
||||
}
|
||||
}
|
||||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
|
||||
break Ok(Async::NotReady);
|
||||
}
|
||||
Err(inner) => if decoder.accumulator == AccumulatorState::NotStarted {
|
||||
break Ok(Async::Ready(None));
|
||||
} else {
|
||||
break Err(Error::with_chain(inner, ErrorKind::ParseError));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! impl_encoderstate {
|
||||
($t:ty) => {
|
||||
impl_encoderstate!($t, <$t>::from);
|
||||
};
|
||||
($t:ty, $make_fn:expr) => {
|
||||
impl EncoderHelper for $t {
|
||||
/// Write as much as possible of the inner integer to the output `AsyncWrite`
|
||||
fn write<W: AsyncWrite>(
|
||||
encoder: &mut EncoderState<Self>,
|
||||
mut output: W,
|
||||
) -> Poll<WriteState, Error> {
|
||||
fn encode_one(encoder: &EncoderState<$t>) -> Option<u8> {
|
||||
let last_chunk = encoder.num_chunks - 1;
|
||||
|
||||
if encoder.cur_chunk > last_chunk {
|
||||
return None;
|
||||
}
|
||||
|
||||
let masked = (&encoder.source >> (encoder.cur_chunk * USABLE_BITS_PER_BYTE))
|
||||
& $make_fn((1 << USABLE_BITS_PER_BYTE) - 1usize);
|
||||
let masked = masked
|
||||
.to_u8()
|
||||
.expect("Masked with 0b0111_1111, is less than u8::MAX, QED");
|
||||
|
||||
if encoder.cur_chunk == last_chunk {
|
||||
Some(masked)
|
||||
} else {
|
||||
Some(masked | (1 << USABLE_BITS_PER_BYTE))
|
||||
}
|
||||
}
|
||||
|
||||
let mut written = 0usize;
|
||||
|
||||
loop {
|
||||
if let Some(byte) = encode_one(&encoder) {
|
||||
let buffer: [u8; 1] = [byte];
|
||||
|
||||
match output.write_all(&buffer) {
|
||||
Ok(()) => {
|
||||
written += 1;
|
||||
encoder.cur_chunk += 1;
|
||||
}
|
||||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
|
||||
break if written == 0 {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(Async::Ready(WriteState::Pending(written)))
|
||||
};
|
||||
}
|
||||
Err(inner) => {
|
||||
break Err(Error::with_chain(inner, ErrorKind::WriteError))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break Ok(Async::Ready(WriteState::Done(written)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_encoderstate!(usize);
|
||||
impl_encoderstate!(BigUint);
|
||||
impl_encoderstate!(u64, (|val| val as u64));
|
||||
impl_encoderstate!(u32, (|val| val as u32));
|
||||
|
||||
impl_decoderstate!(usize);
|
||||
impl_decoderstate!(BigUint, BigUint::from, |a, b| Some(a << b));
|
||||
impl_decoderstate!(u64);
|
||||
impl_decoderstate!(u32);
|
||||
|
||||
impl<T> EncoderState<T> {
|
||||
pub fn source(&self) -> &T {
|
||||
&self.source
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Bits> EncoderState<T> {
|
||||
/// Create a new encoder
|
||||
pub fn new(inner: T) -> Self {
|
||||
let bits = inner.bits();
|
||||
EncoderState {
|
||||
source: inner,
|
||||
num_chunks: ceil_div(bits, USABLE_BITS_PER_BYTE).max(1),
|
||||
cur_chunk: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EncoderHelper> EncoderState<T> {
|
||||
/// Write as much as possible of the inner integer to the output `AsyncWrite`
|
||||
pub fn write<W: AsyncWrite>(&mut self, output: W) -> Poll<WriteState, Error> {
|
||||
T::write(self, output)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum AccumulatorState<T> {
|
||||
InProgress(T),
|
||||
NotStarted,
|
||||
Finished,
|
||||
}
|
||||
|
||||
impl<T: Default> AccumulatorState<T> {
|
||||
fn take(&mut self) -> Option<T> {
|
||||
use AccumulatorState::*;
|
||||
use std::mem;
|
||||
|
||||
match mem::replace(self, AccumulatorState::Finished) {
|
||||
InProgress(inner) => Some(inner),
|
||||
NotStarted => Some(Default::default()),
|
||||
Finished => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The state struct for the varint bytes-to-bigint FSM
|
||||
#[derive(Debug)]
|
||||
pub struct DecoderState<T> {
|
||||
accumulator: AccumulatorState<T>,
|
||||
shift: usize,
|
||||
}
|
||||
|
||||
impl<T: Default> Default for DecoderState<T> {
|
||||
fn default() -> Self {
|
||||
DecoderState {
|
||||
accumulator: AccumulatorState::NotStarted,
|
||||
shift: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Default> DecoderState<T> {
|
||||
/// Make a new decoder
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DecoderHelper> DecoderState<T> {
|
||||
/// Make a new decoder
|
||||
pub fn read<R: AsyncRead>(&mut self, input: R) -> Poll<Option<T>, Error> {
|
||||
T::read(self, input)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper around `DecoderState` to make a `tokio` `Decoder`
|
||||
#[derive(Debug)]
|
||||
pub struct VarintDecoder<T> {
|
||||
state: Option<DecoderState<T>>,
|
||||
}
|
||||
|
||||
impl<T> Default for VarintDecoder<T> {
|
||||
fn default() -> Self {
|
||||
VarintDecoder { state: None }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> VarintDecoder<T> {
|
||||
/// Make a new decoder
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Default + DecoderHelper> Decoder for VarintDecoder<T> {
|
||||
type Item = T;
|
||||
type Error = io::Error;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
if src.is_empty() {
|
||||
break Ok(None);
|
||||
} else {
|
||||
// We know that the length is not 0, so this cannot fail.
|
||||
let first_byte = src.split_to(1)[0];
|
||||
let mut state = self.state.take().unwrap_or_default();
|
||||
let out = T::decode_one(&mut state, first_byte)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
|
||||
|
||||
if let Some(out) = out {
|
||||
break Ok(Some(out));
|
||||
} else {
|
||||
self.state = Some(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VarintCodec<W> {
|
||||
inner: VarintCodecInner,
|
||||
marker: PhantomData<W>,
|
||||
}
|
||||
|
||||
impl<T> Default for VarintCodec<T> {
|
||||
#[inline]
|
||||
fn default() -> VarintCodec<T> {
|
||||
VarintCodec {
|
||||
inner: VarintCodecInner::WaitingForLen(VarintDecoder::default()),
|
||||
marker: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum VarintCodecInner {
|
||||
WaitingForLen(VarintDecoder<usize>),
|
||||
WaitingForData(usize),
|
||||
Poisoned,
|
||||
}
|
||||
|
||||
impl<T> Decoder for VarintCodec<T> {
|
||||
type Item = BytesMut;
|
||||
type Error = io::Error;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
match mem::replace(&mut self.inner, VarintCodecInner::Poisoned) {
|
||||
VarintCodecInner::WaitingForData(len) => {
|
||||
if src.len() >= len {
|
||||
self.inner = VarintCodecInner::WaitingForLen(VarintDecoder::default());
|
||||
return Ok(Some(src.split_to(len)));
|
||||
} else {
|
||||
self.inner = VarintCodecInner::WaitingForData(len);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
VarintCodecInner::WaitingForLen(mut decoder) => match decoder.decode(src)? {
|
||||
None => {
|
||||
self.inner = VarintCodecInner::WaitingForLen(decoder);
|
||||
return Ok(None);
|
||||
}
|
||||
Some(len) => {
|
||||
self.inner = VarintCodecInner::WaitingForData(len);
|
||||
}
|
||||
},
|
||||
VarintCodecInner::Poisoned => panic!("varint codec was poisoned"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<D> Encoder for VarintCodec<D>
|
||||
where
|
||||
D: IntoBuf + AsRef<[u8]>,
|
||||
{
|
||||
type Item = D;
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: D, dst: &mut BytesMut) -> Result<(), io::Error> {
|
||||
let encoded_len = encode(item.as_ref().len());
|
||||
dst.reserve(encoded_len.len() + item.as_ref().len());
|
||||
dst.put(encoded_len);
|
||||
dst.put(item);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Syncronously decode a number from a `Read`
|
||||
pub fn decode<R: Read, T: Default + DecoderHelper>(mut input: R) -> errors::Result<T> {
|
||||
let mut decoder = DecoderState::default();
|
||||
|
||||
loop {
|
||||
// We read one at a time to prevent consuming too much of the buffer.
|
||||
let mut buffer: [u8; 1] = [0];
|
||||
|
||||
match input.read_exact(&mut buffer) {
|
||||
Ok(()) => {
|
||||
if let Some(out) = T::decode_one(&mut decoder, buffer[0])
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::Other))?
|
||||
{
|
||||
break Ok(out);
|
||||
}
|
||||
}
|
||||
Err(inner) => break Err(Error::with_chain(inner, ErrorKind::ParseError)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Syncronously decode a number from a `Read`
|
||||
pub fn encode<T: EncoderHelper + Bits>(input: T) -> Bytes {
|
||||
use tokio_io::io::AllowStdIo;
|
||||
|
||||
let mut encoder = EncoderState::new(input);
|
||||
let mut out = AllowStdIo::new(BytesMut::new().writer());
|
||||
|
||||
match T::write(&mut encoder, &mut out).expect("Writing to a vec should never fail, Q.E.D") {
|
||||
Async::Ready(_) => out.into_inner().into_inner().freeze(),
|
||||
Async::NotReady => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{decode, EncoderState, VarintDecoder};
|
||||
use futures::{Future, Stream};
|
||||
use num_bigint::BigUint;
|
||||
use tokio_codec::FramedRead;
|
||||
|
||||
#[test]
|
||||
fn large_number_fails() {
|
||||
use super::WriteState;
|
||||
use futures::Async;
|
||||
use std::io::Cursor;
|
||||
|
||||
let mut out = vec![0u8; 10];
|
||||
|
||||
{
|
||||
let writable: Cursor<&mut [_]> = Cursor::new(&mut out);
|
||||
|
||||
let mut state = EncoderState::new(::std::u64::MAX);
|
||||
|
||||
assert_eq!(
|
||||
state.write(writable).unwrap(),
|
||||
Async::Ready(WriteState::Done(10))
|
||||
);
|
||||
}
|
||||
|
||||
let result: Result<Option<u32>, _> = FramedRead::new(&out[..], VarintDecoder::new())
|
||||
.into_future()
|
||||
.map(|(out, _)| out)
|
||||
.map_err(|(out, _)| out)
|
||||
.wait();
|
||||
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_decode_basic_biguint() {
|
||||
assert_eq!(
|
||||
BigUint::from(300u16),
|
||||
decode(&[0b10101100, 0b00000010][..]).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_decode_basic_biguint_async() {
|
||||
let result = FramedRead::new(&[0b10101100, 0b00000010][..], VarintDecoder::new())
|
||||
.into_future()
|
||||
.map(|(out, _)| out)
|
||||
.map_err(|(out, _)| out)
|
||||
.wait();
|
||||
|
||||
assert_eq!(result.unwrap(), Some(BigUint::from(300u16)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_decode_trivial_usize_async() {
|
||||
let result = FramedRead::new(&[1][..], VarintDecoder::new())
|
||||
.into_future()
|
||||
.map(|(out, _)| out)
|
||||
.map_err(|(out, _)| out)
|
||||
.wait();
|
||||
|
||||
assert_eq!(result.unwrap(), Some(1usize));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_decode_basic_usize_async() {
|
||||
let result = FramedRead::new(&[0b10101100, 0b00000010][..], VarintDecoder::new())
|
||||
.into_future()
|
||||
.map(|(out, _)| out)
|
||||
.map_err(|(out, _)| out)
|
||||
.wait();
|
||||
|
||||
assert_eq!(result.unwrap(), Some(300usize));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_encode_basic_biguint_async() {
|
||||
use super::WriteState;
|
||||
use futures::Async;
|
||||
use std::io::Cursor;
|
||||
|
||||
let mut out = vec![0u8; 2];
|
||||
|
||||
{
|
||||
let writable: Cursor<&mut [_]> = Cursor::new(&mut out);
|
||||
|
||||
let mut state = EncoderState::new(BigUint::from(300usize));
|
||||
|
||||
assert_eq!(
|
||||
state.write(writable).unwrap(),
|
||||
Async::Ready(WriteState::Done(2))
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(out, vec![0b10101100, 0b00000010]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_encode_basic_usize_async() {
|
||||
use super::WriteState;
|
||||
use futures::Async;
|
||||
use std::io::Cursor;
|
||||
|
||||
let mut out = vec![0u8; 2];
|
||||
|
||||
{
|
||||
let writable: Cursor<&mut [_]> = Cursor::new(&mut out);
|
||||
|
||||
let mut state = EncoderState::new(300usize);
|
||||
|
||||
assert_eq!(
|
||||
state.write(writable).unwrap(),
|
||||
Async::Ready(WriteState::Done(2))
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(out, vec![0b10101100, 0b00000010]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unexpected_eof_async() {
|
||||
use std::io;
|
||||
|
||||
let result = FramedRead::new(&[0b10101100, 0b10000010][..], VarintDecoder::<usize>::new())
|
||||
.into_future()
|
||||
.map(|(out, _)| out)
|
||||
.map_err(|(out, _)| out)
|
||||
.wait();
|
||||
|
||||
assert_eq!(result.unwrap(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_panic_after_eof() {
|
||||
FramedRead::new(&[1, 1][..], VarintDecoder::<usize>::new())
|
||||
.collect()
|
||||
.wait()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user