diff --git a/floodsub/Cargo.toml b/floodsub/Cargo.toml index 973671f5..b25bb785 100644 --- a/floodsub/Cargo.toml +++ b/floodsub/Cargo.toml @@ -15,5 +15,6 @@ multiaddr = { path = "../multiaddr" } parking_lot = "0.5.3" protobuf = "1" smallvec = "0.6.0" +tokio-codec = "0.1" tokio-io = "0.1" varint = { path = "../varint-rs" } diff --git a/floodsub/src/lib.rs b/floodsub/src/lib.rs index d7466b72..b4637aca 100644 --- a/floodsub/src/lib.rs +++ b/floodsub/src/lib.rs @@ -30,6 +30,7 @@ extern crate multiaddr; extern crate parking_lot; extern crate protobuf; extern crate smallvec; +extern crate tokio_codec; extern crate tokio_io; extern crate varint; @@ -55,6 +56,7 @@ use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio_codec::Framed; use tokio_io::{AsyncRead, AsyncWrite}; use varint::VarintCodec; @@ -134,8 +136,7 @@ where }; // Split the socket into writing and reading parts. - let (floodsub_sink, floodsub_stream) = socket - .framed(VarintCodec::default()) + let (floodsub_sink, floodsub_stream) = Framed::new(socket, VarintCodec::default()) .sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) .map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) .split(); diff --git a/identify/Cargo.toml b/identify/Cargo.toml index 71c5baa9..dc0415a1 100644 --- a/identify/Cargo.toml +++ b/identify/Cargo.toml @@ -11,6 +11,7 @@ libp2p-core = { path = "../core" } log = "0.4.1" multiaddr = { path = "../multiaddr" } protobuf = "1" +tokio-codec = "0.1" tokio-io = "0.1.0" varint = { path = "../varint-rs" } diff --git a/identify/src/lib.rs b/identify/src/lib.rs index a1362619..7ff22db1 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -73,6 +73,7 @@ extern crate libp2p_core; extern crate log; extern crate multiaddr; extern crate protobuf; +extern crate tokio_codec; extern crate tokio_io; extern crate varint; diff --git a/identify/src/protocol.rs b/identify/src/protocol.rs index 80950d53..5c70bc71 100644 --- a/identify/src/protocol.rs +++ b/identify/src/protocol.rs @@ -28,7 +28,7 @@ use protobuf::RepeatedField; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; use structs_proto; -use tokio_io::codec::Framed; +use tokio_codec::Framed; use tokio_io::{AsyncRead, AsyncWrite}; use varint::VarintCodec; @@ -129,7 +129,7 @@ where fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: Maf) -> Self::Future { trace!("Upgrading connection as {:?}", ty); - let socket = socket.framed(VarintCodec::default()); + let socket = Framed::new(socket, VarintCodec::default()); match ty { Endpoint::Dialer => { diff --git a/kad/Cargo.toml b/kad/Cargo.toml index 8ca08822..02230bb9 100644 --- a/kad/Cargo.toml +++ b/kad/Cargo.toml @@ -20,6 +20,7 @@ parking_lot = "0.5.1" protobuf = "1" rand = "0.4.2" smallvec = "0.5" +tokio-codec = "0.1" tokio-io = "0.1" tokio-timer = "0.1.2" varint = { path = "../varint-rs" } diff --git a/kad/src/kad_server.rs b/kad/src/kad_server.rs index 3e5e048e..69bd370a 100644 --- a/kad/src/kad_server.rs +++ b/kad/src/kad_server.rs @@ -41,7 +41,6 @@ use protocol::{self, KadMsg, KademliaProtocolConfig, Peer}; use std::collections::VecDeque; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; -use std::sync::{Arc, atomic}; use tokio_io::{AsyncRead, AsyncWrite}; /// Configuration for a Kademlia server. diff --git a/kad/src/lib.rs b/kad/src/lib.rs index 8b48c3db..b94d69b1 100644 --- a/kad/src/lib.rs +++ b/kad/src/lib.rs @@ -78,6 +78,7 @@ extern crate parking_lot; extern crate protobuf; extern crate rand; extern crate smallvec; +extern crate tokio_codec; extern crate tokio_io; extern crate tokio_timer; extern crate varint; diff --git a/kad/src/protocol.rs b/kad/src/protocol.rs index b20c9e68..421c4ca8 100644 --- a/kad/src/protocol.rs +++ b/kad/src/protocol.rs @@ -32,7 +32,8 @@ use protobuf::{self, Message}; use protobuf_structs; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; -use tokio_io::{AsyncRead, AsyncWrite, codec::Framed}; +use tokio_codec::Framed; +use tokio_io::{AsyncRead, AsyncWrite}; use varint::VarintCodec; #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] @@ -156,8 +157,7 @@ fn kademlia_protocol( where S: AsyncRead + AsyncWrite, { - socket - .framed(VarintCodec::default()) + Framed::new(socket, VarintCodec::default()) .from_err::() .with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> { let proto_struct = msg_to_proto(request); diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 4e952ade..544ac6b1 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -21,6 +21,7 @@ libp2p-ratelimit = { path = "../ratelimit" } libp2p-relay = { path = "../relay" } libp2p-core = { path = "../core" } libp2p-websocket = { path = "../websocket" } +tokio-codec = "0.1" tokio-io = "0.1" [target.'cfg(not(target_os = "emscripten"))'.dependencies] diff --git a/libp2p/examples/echo-dialer.rs b/libp2p/examples/echo-dialer.rs index 3cca46e1..40251df7 100644 --- a/libp2p/examples/echo-dialer.rs +++ b/libp2p/examples/echo-dialer.rs @@ -22,8 +22,8 @@ extern crate bytes; extern crate env_logger; extern crate futures; extern crate libp2p; +extern crate tokio_codec; extern crate tokio_core; -extern crate tokio_io; use futures::sync::oneshot; use futures::{Future, Sink, Stream}; @@ -33,8 +33,7 @@ use libp2p::core::Transport; use libp2p::core::{upgrade, either::EitherOutput}; use libp2p::tcp::TcpConfig; use tokio_core::reactor::Core; -use tokio_io::AsyncRead; -use tokio_io::codec::BytesCodec; +use tokio_codec::{BytesCodec, Framed}; use libp2p::websocket::WsConfig; fn main() { @@ -90,7 +89,7 @@ fn main() { // successfully negotiated. The parameter is the raw socket (implements the AsyncRead // and AsyncWrite traits), and the closure must return an implementation of // `IntoFuture` that can yield any type of object. - Ok(AsyncRead::framed(socket, BytesCodec::new())) + Ok(Framed::new(socket, BytesCodec::new())) }); let (finished_tx, finished_rx) = oneshot::channel(); diff --git a/libp2p/examples/echo-server.rs b/libp2p/examples/echo-server.rs index 4f698f48..c898c4aa 100644 --- a/libp2p/examples/echo-server.rs +++ b/libp2p/examples/echo-server.rs @@ -22,8 +22,8 @@ extern crate bytes; extern crate env_logger; extern crate futures; extern crate libp2p; +extern crate tokio_codec; extern crate tokio_core; -extern crate tokio_io; use futures::future::{loop_fn, Future, IntoFuture, Loop}; use futures::{Sink, Stream}; @@ -33,8 +33,7 @@ use libp2p::core::Transport; use libp2p::core::{upgrade, either::EitherOutput}; use libp2p::tcp::TcpConfig; use tokio_core::reactor::Core; -use tokio_io::AsyncRead; -use tokio_io::codec::BytesCodec; +use tokio_codec::{BytesCodec, Framed}; use libp2p::websocket::WsConfig; fn main() { @@ -94,7 +93,7 @@ fn main() { // successfully negotiated. The parameter is the raw socket (implements the AsyncRead // and AsyncWrite traits), and the closure must return an implementation of // `IntoFuture` that can yield any type of object. - Ok(AsyncRead::framed(socket, BytesCodec::new())) + Ok(Framed::new(socket, BytesCodec::new())) }); // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and diff --git a/libp2p/examples/relay.rs b/libp2p/examples/relay.rs index f7edec01..90a91a9a 100644 --- a/libp2p/examples/relay.rs +++ b/libp2p/examples/relay.rs @@ -53,8 +53,8 @@ extern crate libp2p; extern crate rand; #[macro_use] extern crate structopt; +extern crate tokio_codec; extern crate tokio_core; -extern crate tokio_io; use libp2p::SimpleProtocol; use libp2p::core::Multiaddr; @@ -67,7 +67,7 @@ use std::{error::Error, iter, str::FromStr, sync::Arc, time::Duration}; use structopt::StructOpt; use libp2p::tcp::TcpConfig; use tokio_core::reactor::Core; -use tokio_io::{AsyncRead, codec::BytesCodec}; +use tokio_codec::{BytesCodec, Framed}; fn main() -> Result<(), Box> { env_logger::init(); @@ -133,7 +133,7 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box> { }; let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { - Ok(AsyncRead::framed(socket, BytesCodec::new())) + Ok(Framed::new(socket, BytesCodec::new())) }); let (control, future) = libp2p::core::swarm(transport.clone().with_upgrade(echo.clone()), |socket, _| { @@ -165,7 +165,7 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box> { let relay = RelayConfig::new(opts.me, transport.clone(), store); let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { - Ok(AsyncRead::framed(socket, BytesCodec::new())) + Ok(Framed::new(socket, BytesCodec::new())) }); let upgraded = transport.with_upgrade(relay) diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 412f8aff..6db48604 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -24,6 +24,7 @@ pub extern crate futures; pub extern crate tokio_core; pub extern crate multiaddr; pub extern crate tokio_io; +pub extern crate tokio_codec; pub extern crate libp2p_core as core; #[cfg(not(target_os = "emscripten"))] diff --git a/ping/Cargo.toml b/ping/Cargo.toml index 78d1dd0e..b49f637e 100644 --- a/ping/Cargo.toml +++ b/ping/Cargo.toml @@ -12,6 +12,7 @@ multistream-select = { path = "../multistream-select" } futures = "0.1" parking_lot = "0.5" rand = "0.3" +tokio-codec = "0.1" tokio-io = "0.1" [dev-dependencies] diff --git a/ping/src/lib.rs b/ping/src/lib.rs index cb8b59c7..3ce4f7b6 100644 --- a/ping/src/lib.rs +++ b/ping/src/lib.rs @@ -86,6 +86,7 @@ extern crate log; extern crate multistream_select; extern crate parking_lot; extern crate rand; +extern crate tokio_codec; extern crate tokio_io; use bytes::{BufMut, Bytes, BytesMut}; @@ -101,7 +102,7 @@ use std::error::Error; use std::io::Error as IoError; use std::iter; use std::sync::Arc; -use tokio_io::codec::{Decoder, Encoder}; +use tokio_codec::{Decoder, Encoder, Framed}; use tokio_io::{AsyncRead, AsyncWrite}; /// Represents a prototype for an upgrade to handle the ping protocol. @@ -160,8 +161,7 @@ where // TODO: can't figure out how to make it work without using an Arc/Mutex let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4))); - let sink_stream = socket - .framed(Codec) + let sink_stream = Framed::new(socket, Codec) .map(|msg| Message::Received(msg.freeze())); let (sink, stream) = sink_stream.split(); diff --git a/relay/Cargo.toml b/relay/Cargo.toml index cbb3cd7a..7d70a535 100644 --- a/relay/Cargo.toml +++ b/relay/Cargo.toml @@ -12,5 +12,6 @@ log = "0.4" multiaddr = { path = "../multiaddr" } protobuf = "2" rand = "0.4" +tokio-codec = "0.1" tokio-io = "0.1" varint = { path = "../varint-rs" } diff --git a/relay/src/lib.rs b/relay/src/lib.rs index f1ca0242..f77c59de 100644 --- a/relay/src/lib.rs +++ b/relay/src/lib.rs @@ -27,6 +27,7 @@ extern crate log; extern crate multiaddr; extern crate protobuf; extern crate rand; +extern crate tokio_codec; extern crate tokio_io; extern crate varint; diff --git a/relay/src/utility.rs b/relay/src/utility.rs index 4decad4e..0b518762 100644 --- a/relay/src/utility.rs +++ b/relay/src/utility.rs @@ -24,7 +24,7 @@ use multiaddr::{AddrComponent, Multiaddr, Protocol}; use peerstore::PeerId; use protobuf::{self, Message}; use std::{io, error::Error, iter::FromIterator}; -use tokio_io::codec::Framed; +use tokio_codec::Framed; use tokio_io::{AsyncRead, AsyncWrite}; use varint::VarintCodec; @@ -47,7 +47,7 @@ pub(crate) struct Io { impl Io { pub(crate) fn new(c: T) -> Io { Io { - codec: c.framed(VarintCodec::default()), + codec: Framed::new(c, VarintCodec::default()), } } diff --git a/varint-rs/Cargo.toml b/varint-rs/Cargo.toml index ea870e5d..d0d7a80f 100644 --- a/varint-rs/Cargo.toml +++ b/varint-rs/Cargo.toml @@ -10,3 +10,6 @@ bytes = "0.4.5" tokio-io = "0.1" futures = "0.1" error-chain = "0.11.0" + +[dev-dependencies] +tokio-codec = "0.1" diff --git a/varint-rs/src/lib.rs b/varint-rs/src/lib.rs index 93b1f08b..1f6eca31 100644 --- a/varint-rs/src/lib.rs +++ b/varint-rs/src/lib.rs @@ -31,6 +31,9 @@ extern crate num_bigint; extern crate num_traits; extern crate tokio_io; +#[cfg(test)] +extern crate tokio_codec; + use bytes::{BufMut, Bytes, BytesMut, IntoBuf}; use futures::{Async, Poll}; use num_bigint::BigUint; @@ -518,7 +521,7 @@ mod tests { use super::{decode, EncoderState, VarintDecoder}; use futures::{Future, Stream}; use num_bigint::BigUint; - use tokio_io::codec::FramedRead; + use tokio_codec::FramedRead; #[test] fn large_number_fails() {