From b423e9fe8cab3efd2f251a84f1c52e6122616606 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Wed, 18 Jul 2018 10:30:56 +0200 Subject: [PATCH] Use tokio-codec in mplex. (#315) --- mplex/Cargo.toml | 1 + mplex/src/lib.rs | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/mplex/Cargo.toml b/mplex/Cargo.toml index c3a870b4..e3473838 100644 --- a/mplex/Cargo.toml +++ b/mplex/Cargo.toml @@ -17,6 +17,7 @@ num-bigint = { version = "0.1.40", default-features = false } num-traits = "0.1.40" parking_lot = "0.4.8" rand = "0.3.17" +tokio-codec = "0.1" tokio-io = "0.1" varint = { path = "../varint-rs" } diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 5b0224fe..78e45cb2 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -23,9 +23,9 @@ extern crate fnv; #[macro_use] extern crate futures; extern crate libp2p_core as core; -#[macro_use] extern crate log; extern crate parking_lot; +extern crate tokio_codec; extern crate tokio_io; extern crate varint; @@ -41,7 +41,8 @@ use parking_lot::Mutex; use fnv::FnvHashSet; use futures::prelude::*; use futures::{future, stream::Fuse, task}; -use tokio_io::{AsyncRead, AsyncWrite, codec::Framed}; +use tokio_codec::Framed; +use tokio_io::{AsyncRead, AsyncWrite}; // Maximum number of simultaneously-open substreams. const MAX_SUBSTREAMS: usize = 1024; @@ -74,7 +75,7 @@ where fn upgrade(self, i: C, _: (), endpoint: Endpoint, remote_addr: Maf) -> Self::Future { let out = Multiplex { inner: Arc::new(Mutex::new(MultiplexInner { - inner: i.framed(codec::Codec::new()).fuse(), + inner: Framed::new(i, codec::Codec::new()).fuse(), buffer: Vec::with_capacity(32), opened_substreams: Default::default(), next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 }, @@ -152,7 +153,7 @@ where C: AsyncRead + AsyncWrite, if inner.buffer.len() >= MAX_BUFFER_LEN { return Err(IoError::new(IoErrorKind::InvalidData, "reached maximum buffer length")); } - + if inner.opened_substreams.contains(&elem.substream_id()) { inner.buffer.push(elem); for task in inner.to_notify.drain(..) {