diff --git a/Cargo.toml b/Cargo.toml index 5d4a6e76..80c6e8eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ libp2p-kad = { version = "0.9.0", path = "./protocols/kad" } libp2p-floodsub = { version = "0.9.0", path = "./protocols/floodsub" } libp2p-ping = { version = "0.9.0", path = "./protocols/ping" } libp2p-plaintext = { version = "0.9.0", path = "./protocols/plaintext" } -libp2p-deflate = { version = "0.1.0", path = "./protocols/deflate" } libp2p-ratelimit = { version = "0.9.0", path = "./transports/ratelimit" } libp2p-core = { version = "0.9.1", path = "./core" } libp2p-core-derive = { version = "0.9.0", path = "./misc/core-derive" } @@ -41,6 +40,7 @@ tokio-io = "0.1" wasm-timer = "0.1" [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] +libp2p-deflate = { version = "0.1.0", path = "./protocols/deflate" } libp2p-dns = { version = "0.9.0", path = "./transports/dns" } libp2p-mdns = { version = "0.9.0", path = "./misc/mdns" } libp2p-noise = { version = "0.7.0", path = "./protocols/noise" } diff --git a/src/lib.rs b/src/lib.rs index ca00d2f3..be744b91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,6 +163,7 @@ pub use tokio_codec; #[doc(inline)] pub use libp2p_core as core; +#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] #[doc(inline)] pub use libp2p_deflate as deflate; #[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 57f0d3d9..a01f7000 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -18,7 +18,7 @@ rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" } tokio-codec = "0.1.1" tokio-io = "0.1.12" tokio-rustls = "0.10.0-alpha.3" -soketto = "0.1.0" +soketto = { version = "0.2.0", features = ["deflate"] } url = "1.7.2" webpki-roots = "0.16.0" diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 4a5a3960..ad0aca35 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -29,8 +29,13 @@ use libp2p_core::{ }; use log::{debug, trace}; use tokio_rustls::{client, server}; -use soketto::{base, connection::{Connection, Mode}, handshake::{self, Redirect, Response}}; -use std::io; +use soketto::{ + base, + connection::{Connection, Mode}, + extension::deflate::Deflate, + handshake::{self, Redirect, Response} +}; +use std::{convert::TryFrom, io}; use tokio_codec::{Framed, FramedParts}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_rustls::webpki; @@ -47,7 +52,8 @@ pub struct WsConfig { transport: T, max_data_size: u64, tls_config: tls::Config, - max_redirects: u8 + max_redirects: u8, + use_deflate: bool } impl WsConfig { @@ -57,7 +63,8 @@ impl WsConfig { transport, max_data_size: MAX_DATA_SIZE, tls_config: tls::Config::client(), - max_redirects: 0 + max_redirects: 0, + use_deflate: false } } @@ -88,6 +95,12 @@ impl WsConfig { self.tls_config = c; self } + + /// Should the deflate extension (RFC 7692) be used if supported? + pub fn use_deflate(&mut self, flag: bool) -> &mut Self { + self.use_deflate = flag; + self + } } impl Transport for WsConfig @@ -125,6 +138,7 @@ where let tls_config = self.tls_config; let max_size = self.max_data_size; + let use_deflate = self.use_deflate; let listen = self.transport.listen_on(inner_addr) .map_err(|e| e.map(Error::Transport))? .map_err(Error::Transport) @@ -163,7 +177,11 @@ where }) .and_then(move |stream| { trace!("receiving websocket handshake request from {}", remote2); - Framed::new(stream, handshake::Server::new()) + let mut s = handshake::Server::new(); + if use_deflate { + s.add_extension(Box::new(Deflate::new(Mode::Server))); + } + Framed::new(stream, s) .into_future() .map_err(|(e, _framed)| Error::Handshake(Box::new(e))) .and_then(move |(request, framed)| { @@ -174,7 +192,9 @@ where .map_err(|e| Error::Base(Box::new(e))) .map(move |f| { trace!("websocket handshake with {} successful", remote2); - let c = new_connection(f, max_size, Mode::Server); + let (mut handshake, mut c) = + new_connection(f, max_size, Mode::Server); + c.add_extensions(handshake.drain_extensions()); BytesConnection { inner: c } })) } else { @@ -264,6 +284,7 @@ where let address1 = address.clone(); // used for logging let address2 = address.clone(); // used for logging + let use_deflate = config.use_deflate; let future = dial.map_err(Error::Transport) .and_then(move |stream| { trace!("connected to {}", address); @@ -283,7 +304,10 @@ where }) .and_then(move |stream| { trace!("sending websocket handshake request to {}", address1); - let client = handshake::Client::new(host_port, path); + let mut client = handshake::Client::new(host_port, path); + if use_deflate { + client.add_extension(Box::new(Deflate::new(Mode::Client))); + } Framed::new(stream, client) .send(()) .map_err(|e| Error::Handshake(Box::new(e))) @@ -306,7 +330,8 @@ where trace!("websocket handshake with {} successful", address1) } } - let c = new_connection(framed, max_data_size, Mode::Client); + let (mut handshake, mut c) = new_connection(framed, max_data_size, Mode::Client); + c.add_extensions(handshake.drain_extensions()); Ok(Either::B(BytesConnection { inner: c })) }) }); @@ -372,7 +397,7 @@ fn location_to_multiaddr(location: &str) -> Result> { } /// Create a `Connection` from an existing `Framed` value. -fn new_connection(framed: Framed, max_size: u64, mode: Mode) -> Connection +fn new_connection(framed: Framed, max_size: u64, mode: Mode) -> (C, Connection) where T: AsyncRead + AsyncWrite { @@ -383,7 +408,9 @@ where new.read_buf = old.read_buf; new.write_buf = old.write_buf; let framed = Framed::from_parts(new); - Connection::from_framed(framed, mode) + let mut conn = Connection::from_framed(framed, mode); + conn.set_max_buffer_size(usize::try_from(max_size).unwrap_or(std::usize::MAX)); + (old.codec, conn) } // BytesConnection //////////////////////////////////////////////////////////////////////////////// diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 00f56da8..533e1b78 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -75,6 +75,12 @@ impl WsConfig { self.transport.set_tls_config(c); self } + + /// Should the deflate extension (RFC 7692) be used if supported? + pub fn use_deflate(&mut self, flag: bool) -> &mut Self { + self.transport.use_deflate(flag); + self + } } impl From> for WsConfig {