diff --git a/Cargo.toml b/Cargo.toml index be813708..19617f49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ libp2p-kad = { version = "0.8.0", path = "./protocols/kad" } libp2p-floodsub = { version = "0.8.0", path = "./protocols/floodsub" } libp2p-ping = { version = "0.8.0", path = "./protocols/ping" } libp2p-plaintext = { version = "0.8.0", path = "./protocols/plaintext" } +libp2p-deflate = { version = "0.1.0", path = "./protocols/deflate" } libp2p-ratelimit = { version = "0.8.0", path = "./transports/ratelimit" } libp2p-core = { version = "0.8.1", path = "./core" } libp2p-core-derive = { version = "0.8.0", path = "./misc/core-derive" } diff --git a/protocols/deflate/Cargo.toml b/protocols/deflate/Cargo.toml new file mode 100644 index 00000000..29005515 --- /dev/null +++ b/protocols/deflate/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "libp2p-deflate" +edition = "2018" +description = "Deflate encryption protocol for libp2p" +version = "0.1.0" +authors = ["Parity Technologies "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +futures = "0.1" +libp2p-core = { version = "0.8.1", path = "../../core" } +tokio-io = "0.1.12" +flate2 = { version = "1.0", features = ["tokio"] } + +[dev-dependencies] +env_logger = "0.6" +libp2p-tcp = { version = "0.8.0", path = "../../transports/tcp" } +quickcheck = "0.8" +tokio = "0.1" +log = "0.4" diff --git a/protocols/deflate/src/lib.rs b/protocols/deflate/src/lib.rs new file mode 100644 index 00000000..7dbf03eb --- /dev/null +++ b/protocols/deflate/src/lib.rs @@ -0,0 +1,76 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use flate2::read::DeflateDecoder; +use flate2::write::DeflateEncoder; +use flate2::Compression; +use std::io; + +use futures::future::{self, FutureResult}; +use libp2p_core::{upgrade::Negotiated, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use std::iter; +use tokio_io::{AsyncRead, AsyncWrite}; + +#[derive(Debug, Copy, Clone)] +pub struct DeflateConfig; + +/// Output of the deflate protocol. +pub type DeflateOutput = DeflateDecoder>; + +impl UpgradeInfo for DeflateConfig { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/deflate/1.0.0") + } +} + +impl InboundUpgrade for DeflateConfig +where + C: AsyncRead + AsyncWrite, +{ + type Output = DeflateOutput>; + type Error = io::Error; + type Future = FutureResult; + + fn upgrade_inbound(self, r: Negotiated, _: Self::Info) -> Self::Future { + future::ok(DeflateDecoder::new(DeflateEncoder::new( + r, + Compression::default(), + ))) + } +} + +impl OutboundUpgrade for DeflateConfig +where + C: AsyncRead + AsyncWrite, +{ + type Output = DeflateOutput>; + type Error = io::Error; + type Future = FutureResult; + + fn upgrade_outbound(self, w: Negotiated, _: Self::Info) -> Self::Future { + future::ok(DeflateDecoder::new(DeflateEncoder::new( + w, + Compression::default(), + ))) + } +} diff --git a/protocols/deflate/tests/test.rs b/protocols/deflate/tests/test.rs new file mode 100644 index 00000000..0abed285 --- /dev/null +++ b/protocols/deflate/tests/test.rs @@ -0,0 +1,98 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use libp2p_core::transport::{ListenerEvent, Transport}; +use libp2p_core::upgrade::Negotiated; +use libp2p_deflate::{DeflateConfig, DeflateOutput}; +use libp2p_tcp::{TcpConfig, TcpTransStream}; +use log::info; +use quickcheck::QuickCheck; +use tokio::{self, io}; + +#[test] +fn deflate() { + let _ = env_logger::try_init(); + + fn prop(message: Vec) -> bool { + let server_transport = TcpConfig::new().with_upgrade(DeflateConfig {}); + let client_transport = TcpConfig::new().with_upgrade(DeflateConfig {}); + run(server_transport, client_transport, message); + true + } + + QuickCheck::new() + .max_tests(30) + .quickcheck(prop as fn(Vec) -> bool) +} + +type Output = DeflateOutput>; + +fn run(server_transport: T, client_transport: T, message1: Vec) +where + T: Transport, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, +{ + let message2 = message1.clone(); + + let mut server = server_transport + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .unwrap(); + let server_address = server + .by_ref() + .wait() + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + let server = server + .take(1) + .filter_map(ListenerEvent::into_upgrade) + .and_then(|(client, _)| client) + .map_err(|e| panic!("server error: {}", e)) + .and_then(|client| { + info!("server: reading message"); + io::read_to_end(client, Vec::new()) + }) + .for_each(move |(_, msg)| { + info!("server: read message: {:?}", msg); + assert_eq!(msg, message1); + Ok(()) + }); + + let client = client_transport + .dial(server_address.clone()) + .unwrap() + .map_err(|e| panic!("client error: {}", e)) + .and_then(move |server| { + io::write_all(server, message2).and_then(|(client, _)| io::flush(client)) + }) + .map(|_| ()); + + let future = client + .join(server) + .map_err(|e| panic!("{:?}", e)) + .map(|_| ()); + + tokio::run(future) +} diff --git a/src/lib.rs b/src/lib.rs index 30678a3d..a2075e99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,6 +163,8 @@ pub use tokio_codec; #[doc(inline)] pub use libp2p_core as core; +#[doc(inline)] +pub use libp2p_deflate as deflate; #[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] #[doc(inline)] pub use libp2p_dns as dns;