Use tokio-codec. (#271)

This commit is contained in:
Toralf Wittner
2018-06-22 16:12:23 +02:00
committed by GitHub
parent 75df40010b
commit 16967abda7
21 changed files with 40 additions and 26 deletions

View File

@ -15,5 +15,6 @@ multiaddr = { path = "../multiaddr" }
parking_lot = "0.5.3" parking_lot = "0.5.3"
protobuf = "1" protobuf = "1"
smallvec = "0.6.0" smallvec = "0.6.0"
tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
varint = { path = "../varint-rs" } varint = { path = "../varint-rs" }

View File

@ -30,6 +30,7 @@ extern crate multiaddr;
extern crate parking_lot; extern crate parking_lot;
extern crate protobuf; extern crate protobuf;
extern crate smallvec; extern crate smallvec;
extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate varint; extern crate varint;
@ -55,6 +56,7 @@ use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter; use std::iter;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec; use varint::VarintCodec;
@ -134,8 +136,7 @@ where
}; };
// Split the socket into writing and reading parts. // Split the socket into writing and reading parts.
let (floodsub_sink, floodsub_stream) = socket let (floodsub_sink, floodsub_stream) = Framed::new(socket, VarintCodec::default())
.framed(VarintCodec::default())
.sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) .sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
.split(); .split();

View File

@ -11,6 +11,7 @@ libp2p-core = { path = "../core" }
log = "0.4.1" log = "0.4.1"
multiaddr = { path = "../multiaddr" } multiaddr = { path = "../multiaddr" }
protobuf = "1" protobuf = "1"
tokio-codec = "0.1"
tokio-io = "0.1.0" tokio-io = "0.1.0"
varint = { path = "../varint-rs" } varint = { path = "../varint-rs" }

View File

@ -73,6 +73,7 @@ extern crate libp2p_core;
extern crate log; extern crate log;
extern crate multiaddr; extern crate multiaddr;
extern crate protobuf; extern crate protobuf;
extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate varint; extern crate varint;

View File

@ -28,7 +28,7 @@ use protobuf::RepeatedField;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter; use std::iter;
use structs_proto; use structs_proto;
use tokio_io::codec::Framed; use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec; use varint::VarintCodec;
@ -129,7 +129,7 @@ where
fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: Maf) -> Self::Future { fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: Maf) -> Self::Future {
trace!("Upgrading connection as {:?}", ty); trace!("Upgrading connection as {:?}", ty);
let socket = socket.framed(VarintCodec::default()); let socket = Framed::new(socket, VarintCodec::default());
match ty { match ty {
Endpoint::Dialer => { Endpoint::Dialer => {

View File

@ -20,6 +20,7 @@ parking_lot = "0.5.1"
protobuf = "1" protobuf = "1"
rand = "0.4.2" rand = "0.4.2"
smallvec = "0.5" smallvec = "0.5"
tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-timer = "0.1.2" tokio-timer = "0.1.2"
varint = { path = "../varint-rs" } varint = { path = "../varint-rs" }

View File

@ -41,7 +41,6 @@ use protocol::{self, KadMsg, KademliaProtocolConfig, Peer};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter; use std::iter;
use std::sync::{Arc, atomic};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Configuration for a Kademlia server. /// Configuration for a Kademlia server.

View File

@ -78,6 +78,7 @@ extern crate parking_lot;
extern crate protobuf; extern crate protobuf;
extern crate rand; extern crate rand;
extern crate smallvec; extern crate smallvec;
extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_timer; extern crate tokio_timer;
extern crate varint; extern crate varint;

View File

@ -32,7 +32,8 @@ use protobuf::{self, Message};
use protobuf_structs; use protobuf_structs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter; use std::iter;
use tokio_io::{AsyncRead, AsyncWrite, codec::Framed}; use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec; use varint::VarintCodec;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
@ -156,8 +157,7 @@ fn kademlia_protocol<S>(
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite,
{ {
socket Framed::new(socket, VarintCodec::default())
.framed(VarintCodec::default())
.from_err::<IoError>() .from_err::<IoError>()
.with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> { .with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> {
let proto_struct = msg_to_proto(request); let proto_struct = msg_to_proto(request);

View File

@ -21,6 +21,7 @@ libp2p-ratelimit = { path = "../ratelimit" }
libp2p-relay = { path = "../relay" } libp2p-relay = { path = "../relay" }
libp2p-core = { path = "../core" } libp2p-core = { path = "../core" }
libp2p-websocket = { path = "../websocket" } libp2p-websocket = { path = "../websocket" }
tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
[target.'cfg(not(target_os = "emscripten"))'.dependencies] [target.'cfg(not(target_os = "emscripten"))'.dependencies]

View File

@ -22,8 +22,8 @@ extern crate bytes;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
extern crate libp2p; extern crate libp2p;
extern crate tokio_codec;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
@ -33,8 +33,7 @@ use libp2p::core::Transport;
use libp2p::core::{upgrade, either::EitherOutput}; use libp2p::core::{upgrade, either::EitherOutput};
use libp2p::tcp::TcpConfig; use libp2p::tcp::TcpConfig;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_io::AsyncRead; use tokio_codec::{BytesCodec, Framed};
use tokio_io::codec::BytesCodec;
use libp2p::websocket::WsConfig; use libp2p::websocket::WsConfig;
fn main() { fn main() {
@ -90,7 +89,7 @@ fn main() {
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead // successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of // and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object. // `IntoFuture` that can yield any type of object.
Ok(AsyncRead::framed(socket, BytesCodec::new())) Ok(Framed::new(socket, BytesCodec::new()))
}); });
let (finished_tx, finished_rx) = oneshot::channel(); let (finished_tx, finished_rx) = oneshot::channel();

View File

@ -22,8 +22,8 @@ extern crate bytes;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
extern crate libp2p; extern crate libp2p;
extern crate tokio_codec;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io;
use futures::future::{loop_fn, Future, IntoFuture, Loop}; use futures::future::{loop_fn, Future, IntoFuture, Loop};
use futures::{Sink, Stream}; use futures::{Sink, Stream};
@ -33,8 +33,7 @@ use libp2p::core::Transport;
use libp2p::core::{upgrade, either::EitherOutput}; use libp2p::core::{upgrade, either::EitherOutput};
use libp2p::tcp::TcpConfig; use libp2p::tcp::TcpConfig;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_io::AsyncRead; use tokio_codec::{BytesCodec, Framed};
use tokio_io::codec::BytesCodec;
use libp2p::websocket::WsConfig; use libp2p::websocket::WsConfig;
fn main() { fn main() {
@ -94,7 +93,7 @@ fn main() {
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead // successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of // and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object. // `IntoFuture` that can yield any type of object.
Ok(AsyncRead::framed(socket, BytesCodec::new())) Ok(Framed::new(socket, BytesCodec::new()))
}); });
// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and

View File

@ -53,8 +53,8 @@ extern crate libp2p;
extern crate rand; extern crate rand;
#[macro_use] #[macro_use]
extern crate structopt; extern crate structopt;
extern crate tokio_codec;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io;
use libp2p::SimpleProtocol; use libp2p::SimpleProtocol;
use libp2p::core::Multiaddr; use libp2p::core::Multiaddr;
@ -67,7 +67,7 @@ use std::{error::Error, iter, str::FromStr, sync::Arc, time::Duration};
use structopt::StructOpt; use structopt::StructOpt;
use libp2p::tcp::TcpConfig; use libp2p::tcp::TcpConfig;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_io::{AsyncRead, codec::BytesCodec}; use tokio_codec::{BytesCodec, Framed};
fn main() -> Result<(), Box<Error>> { fn main() -> Result<(), Box<Error>> {
env_logger::init(); env_logger::init();
@ -133,7 +133,7 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box<Error>> {
}; };
let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { let echo = SimpleProtocol::new("/echo/1.0.0", |socket| {
Ok(AsyncRead::framed(socket, BytesCodec::new())) Ok(Framed::new(socket, BytesCodec::new()))
}); });
let (control, future) = libp2p::core::swarm(transport.clone().with_upgrade(echo.clone()), |socket, _| { let (control, future) = libp2p::core::swarm(transport.clone().with_upgrade(echo.clone()), |socket, _| {
@ -165,7 +165,7 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box<Error>> {
let relay = RelayConfig::new(opts.me, transport.clone(), store); let relay = RelayConfig::new(opts.me, transport.clone(), store);
let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { let echo = SimpleProtocol::new("/echo/1.0.0", |socket| {
Ok(AsyncRead::framed(socket, BytesCodec::new())) Ok(Framed::new(socket, BytesCodec::new()))
}); });
let upgraded = transport.with_upgrade(relay) let upgraded = transport.with_upgrade(relay)

View File

@ -24,6 +24,7 @@ pub extern crate futures;
pub extern crate tokio_core; pub extern crate tokio_core;
pub extern crate multiaddr; pub extern crate multiaddr;
pub extern crate tokio_io; pub extern crate tokio_io;
pub extern crate tokio_codec;
pub extern crate libp2p_core as core; pub extern crate libp2p_core as core;
#[cfg(not(target_os = "emscripten"))] #[cfg(not(target_os = "emscripten"))]

View File

@ -12,6 +12,7 @@ multistream-select = { path = "../multistream-select" }
futures = "0.1" futures = "0.1"
parking_lot = "0.5" parking_lot = "0.5"
rand = "0.3" rand = "0.3"
tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
[dev-dependencies] [dev-dependencies]

View File

@ -86,6 +86,7 @@ extern crate log;
extern crate multistream_select; extern crate multistream_select;
extern crate parking_lot; extern crate parking_lot;
extern crate rand; extern crate rand;
extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
@ -101,7 +102,7 @@ use std::error::Error;
use std::io::Error as IoError; use std::io::Error as IoError;
use std::iter; use std::iter;
use std::sync::Arc; use std::sync::Arc;
use tokio_io::codec::{Decoder, Encoder}; use tokio_codec::{Decoder, Encoder, Framed};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Represents a prototype for an upgrade to handle the ping protocol. /// Represents a prototype for an upgrade to handle the ping protocol.
@ -160,8 +161,7 @@ where
// TODO: can't figure out how to make it work without using an Arc/Mutex // TODO: can't figure out how to make it work without using an Arc/Mutex
let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4))); let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4)));
let sink_stream = socket let sink_stream = Framed::new(socket, Codec)
.framed(Codec)
.map(|msg| Message::Received(msg.freeze())); .map(|msg| Message::Received(msg.freeze()));
let (sink, stream) = sink_stream.split(); let (sink, stream) = sink_stream.split();

View File

@ -12,5 +12,6 @@ log = "0.4"
multiaddr = { path = "../multiaddr" } multiaddr = { path = "../multiaddr" }
protobuf = "2" protobuf = "2"
rand = "0.4" rand = "0.4"
tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
varint = { path = "../varint-rs" } varint = { path = "../varint-rs" }

View File

@ -27,6 +27,7 @@ extern crate log;
extern crate multiaddr; extern crate multiaddr;
extern crate protobuf; extern crate protobuf;
extern crate rand; extern crate rand;
extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate varint; extern crate varint;

View File

@ -24,7 +24,7 @@ use multiaddr::{AddrComponent, Multiaddr, Protocol};
use peerstore::PeerId; use peerstore::PeerId;
use protobuf::{self, Message}; use protobuf::{self, Message};
use std::{io, error::Error, iter::FromIterator}; use std::{io, error::Error, iter::FromIterator};
use tokio_io::codec::Framed; use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec; use varint::VarintCodec;
@ -47,7 +47,7 @@ pub(crate) struct Io<T> {
impl<T: AsyncRead + AsyncWrite> Io<T> { impl<T: AsyncRead + AsyncWrite> Io<T> {
pub(crate) fn new(c: T) -> Io<T> { pub(crate) fn new(c: T) -> Io<T> {
Io { Io {
codec: c.framed(VarintCodec::default()), codec: Framed::new(c, VarintCodec::default()),
} }
} }

View File

@ -10,3 +10,6 @@ bytes = "0.4.5"
tokio-io = "0.1" tokio-io = "0.1"
futures = "0.1" futures = "0.1"
error-chain = "0.11.0" error-chain = "0.11.0"
[dev-dependencies]
tokio-codec = "0.1"

View File

@ -31,6 +31,9 @@ extern crate num_bigint;
extern crate num_traits; extern crate num_traits;
extern crate tokio_io; extern crate tokio_io;
#[cfg(test)]
extern crate tokio_codec;
use bytes::{BufMut, Bytes, BytesMut, IntoBuf}; use bytes::{BufMut, Bytes, BytesMut, IntoBuf};
use futures::{Async, Poll}; use futures::{Async, Poll};
use num_bigint::BigUint; use num_bigint::BigUint;
@ -518,7 +521,7 @@ mod tests {
use super::{decode, EncoderState, VarintDecoder}; use super::{decode, EncoderState, VarintDecoder};
use futures::{Future, Stream}; use futures::{Future, Stream};
use num_bigint::BigUint; use num_bigint::BigUint;
use tokio_io::codec::FramedRead; use tokio_codec::FramedRead;
#[test] #[test]
fn large_number_fails() { fn large_number_fails() {