Add a VarintCodec type

This commit is contained in:
Pierre Krieger
2018-01-09 15:44:05 +01:00
parent 047e33023d
commit ab209edfb3
3 changed files with 80 additions and 5 deletions

View File

@ -31,14 +31,16 @@ extern crate futures;
#[macro_use]
extern crate error_chain;
use bytes::BytesMut;
use bytes::{BufMut, BytesMut, IntoBuf};
use futures::{Poll, Async};
use num_bigint::BigUint;
use num_traits::ToPrimitive;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Decoder;
use tokio_io::codec::{Encoder, Decoder};
use std::io;
use std::io::prelude::*;
use std::marker::PhantomData;
use std::mem;
mod errors {
error_chain! {
@ -395,6 +397,78 @@ impl<T: Default + DecoderHelper> Decoder for VarintDecoder<T> {
}
}
#[derive(Debug)]
pub struct VarintCodec<W> {
inner: VarintCodecInner,
marker: PhantomData<W>,
}
impl<T> Default for VarintCodec<T> {
#[inline]
fn default() -> VarintCodec<T> {
VarintCodec {
inner: VarintCodecInner::WaitingForLen(VarintDecoder::default()),
marker: PhantomData,
}
}
}
#[derive(Debug)]
enum VarintCodecInner {
WaitingForLen(VarintDecoder<usize>),
WaitingForData(usize),
Poisonned,
}
impl<T> Decoder for VarintCodec<T> {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
match mem::replace(&mut self.inner, VarintCodecInner::Poisonned) {
VarintCodecInner::WaitingForData(len) => {
if src.len() >= len {
self.inner = VarintCodecInner::WaitingForLen(VarintDecoder::default());
return Ok(Some(src.split_to(len)));
} else {
self.inner = VarintCodecInner::WaitingForData(len);
return Ok(None);
}
},
VarintCodecInner::WaitingForLen(mut decoder) => {
match decoder.decode(src)? {
None => {
self.inner = VarintCodecInner::WaitingForLen(decoder);
return Ok(None);
},
Some(len) => {
self.inner = VarintCodecInner::WaitingForData(len);
},
}
},
VarintCodecInner::Poisonned => {
panic!("Poisonned decoder")
},
}
}
}
}
impl<D> Encoder for VarintCodec<D>
where D: IntoBuf + AsRef<[u8]>,
{
type Item = D;
type Error = io::Error;
fn encode(&mut self, item: D, dst: &mut BytesMut) -> Result<(), io::Error> {
let encoded_len = encode(item.as_ref().len()); // TODO: can be optimized by not allocating?
dst.put(encoded_len);
dst.put(item);
Ok(())
}
}
/// Syncronously decode a number from a `Read`
pub fn decode<R: Read, T: Default + DecoderHelper>(mut input: R) -> errors::Result<T> {
let mut decoder = DecoderState::default();