Merge pull request #93 from tomaka/varint-codec

Add a VarintCodec type
This commit is contained in:
Pierre Krieger 2018-01-15 15:07:58 +01:00 committed by GitHub
commit 0534076d77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 5 deletions

View File

@ -11,6 +11,7 @@ libp2p-swarm = { path = "../libp2p-swarm" }
multiaddr = "0.2.0"
protobuf = "1.4.2"
tokio-io = "0.1.0"
varint = { path = "../varint-rs" }
[dev-dependencies]
libp2p-tcp-transport = { path = "../libp2p-tcp-transport" }

View File

@ -31,6 +31,7 @@ extern crate libp2p_peerstore;
extern crate libp2p_swarm;
extern crate protobuf;
extern crate tokio_io;
extern crate varint;
use bytes::{Bytes, BytesMut};
use futures::{Future, Stream, Sink};
@ -42,7 +43,7 @@ use protobuf::repeated::RepeatedField;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use varint::VarintCodec;
mod structs_proto;
@ -94,8 +95,7 @@ impl<C> ConnectionUpgrade<C> for IdentifyProtocol
}
fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: &Multiaddr) -> Self::Future {
// TODO: use jack's varint library instead
let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket);
let socket = socket.framed(VarintCodec::default());
match ty {
Endpoint::Dialer => {

View File

@ -31,14 +31,16 @@ extern crate futures;
#[macro_use]
extern crate error_chain;
use bytes::BytesMut;
use bytes::{BufMut, BytesMut, IntoBuf};
use futures::{Poll, Async};
use num_bigint::BigUint;
use num_traits::ToPrimitive;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Decoder;
use tokio_io::codec::{Encoder, Decoder};
use std::io;
use std::io::prelude::*;
use std::marker::PhantomData;
use std::mem;
mod errors {
error_chain! {
@ -395,6 +397,78 @@ impl<T: Default + DecoderHelper> Decoder for VarintDecoder<T> {
}
}
#[derive(Debug)]
pub struct VarintCodec<W> {
inner: VarintCodecInner,
marker: PhantomData<W>,
}
impl<T> Default for VarintCodec<T> {
#[inline]
fn default() -> VarintCodec<T> {
VarintCodec {
inner: VarintCodecInner::WaitingForLen(VarintDecoder::default()),
marker: PhantomData,
}
}
}
#[derive(Debug)]
enum VarintCodecInner {
WaitingForLen(VarintDecoder<usize>),
WaitingForData(usize),
Poisonned,
}
impl<T> Decoder for VarintCodec<T> {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
match mem::replace(&mut self.inner, VarintCodecInner::Poisonned) {
VarintCodecInner::WaitingForData(len) => {
if src.len() >= len {
self.inner = VarintCodecInner::WaitingForLen(VarintDecoder::default());
return Ok(Some(src.split_to(len)));
} else {
self.inner = VarintCodecInner::WaitingForData(len);
return Ok(None);
}
},
VarintCodecInner::WaitingForLen(mut decoder) => {
match decoder.decode(src)? {
None => {
self.inner = VarintCodecInner::WaitingForLen(decoder);
return Ok(None);
},
Some(len) => {
self.inner = VarintCodecInner::WaitingForData(len);
},
}
},
VarintCodecInner::Poisonned => {
panic!("varint codec was poisoned")
},
}
}
}
}
impl<D> Encoder for VarintCodec<D>
where D: IntoBuf + AsRef<[u8]>,
{
type Item = D;
type Error = io::Error;
fn encode(&mut self, item: D, dst: &mut BytesMut) -> Result<(), io::Error> {
let encoded_len = encode(item.as_ref().len()); // TODO: can be optimized by not allocating?
dst.put(encoded_len);
dst.put(item);
Ok(())
}
}
/// Syncronously decode a number from a `Read`
pub fn decode<R: Read, T: Default + DecoderHelper>(mut input: R) -> errors::Result<T> {
let mut decoder = DecoderState::default();