Update soketto and enable deflate extension. (#1169)

* Update soketto and enable deflate extension.

* libp2p-deflate and libp2p-websocket share flate2.

Due to the way feature resolution works in cargo today, the `deflate`
feature of `soketto` will include `flate2` with feature `zlib` which is
then also active for the `flate2` that `libp2p-deflate` depends on. This
leads to compilation failures for WASM targets. This PR therefore moves
libp2p-deflate to the crates which are not available on WASM.
This commit is contained in:
Toralf Wittner
2019-06-20 13:03:35 +02:00
committed by Pierre Krieger
parent 3b4f8d7094
commit acebab07f3
5 changed files with 46 additions and 12 deletions

View File

@ -25,7 +25,6 @@ libp2p-kad = { version = "0.9.0", path = "./protocols/kad" }
libp2p-floodsub = { version = "0.9.0", path = "./protocols/floodsub" } libp2p-floodsub = { version = "0.9.0", path = "./protocols/floodsub" }
libp2p-ping = { version = "0.9.0", path = "./protocols/ping" } libp2p-ping = { version = "0.9.0", path = "./protocols/ping" }
libp2p-plaintext = { version = "0.9.0", path = "./protocols/plaintext" } libp2p-plaintext = { version = "0.9.0", path = "./protocols/plaintext" }
libp2p-deflate = { version = "0.1.0", path = "./protocols/deflate" }
libp2p-ratelimit = { version = "0.9.0", path = "./transports/ratelimit" } libp2p-ratelimit = { version = "0.9.0", path = "./transports/ratelimit" }
libp2p-core = { version = "0.9.1", path = "./core" } libp2p-core = { version = "0.9.1", path = "./core" }
libp2p-core-derive = { version = "0.9.0", path = "./misc/core-derive" } libp2p-core-derive = { version = "0.9.0", path = "./misc/core-derive" }
@ -41,6 +40,7 @@ tokio-io = "0.1"
wasm-timer = "0.1" wasm-timer = "0.1"
[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.1.0", path = "./protocols/deflate" }
libp2p-dns = { version = "0.9.0", path = "./transports/dns" } libp2p-dns = { version = "0.9.0", path = "./transports/dns" }
libp2p-mdns = { version = "0.9.0", path = "./misc/mdns" } libp2p-mdns = { version = "0.9.0", path = "./misc/mdns" }
libp2p-noise = { version = "0.7.0", path = "./protocols/noise" } libp2p-noise = { version = "0.7.0", path = "./protocols/noise" }

View File

@ -163,6 +163,7 @@ pub use tokio_codec;
#[doc(inline)] #[doc(inline)]
pub use libp2p_core as core; pub use libp2p_core as core;
#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))]
#[doc(inline)] #[doc(inline)]
pub use libp2p_deflate as deflate; pub use libp2p_deflate as deflate;
#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] #[cfg(not(any(target_os = "emscripten", target_os = "unknown")))]

View File

@ -18,7 +18,7 @@ rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" }
tokio-codec = "0.1.1" tokio-codec = "0.1.1"
tokio-io = "0.1.12" tokio-io = "0.1.12"
tokio-rustls = "0.10.0-alpha.3" tokio-rustls = "0.10.0-alpha.3"
soketto = "0.1.0" soketto = { version = "0.2.0", features = ["deflate"] }
url = "1.7.2" url = "1.7.2"
webpki-roots = "0.16.0" webpki-roots = "0.16.0"

View File

@ -29,8 +29,13 @@ use libp2p_core::{
}; };
use log::{debug, trace}; use log::{debug, trace};
use tokio_rustls::{client, server}; use tokio_rustls::{client, server};
use soketto::{base, connection::{Connection, Mode}, handshake::{self, Redirect, Response}}; use soketto::{
use std::io; base,
connection::{Connection, Mode},
extension::deflate::Deflate,
handshake::{self, Redirect, Response}
};
use std::{convert::TryFrom, io};
use tokio_codec::{Framed, FramedParts}; use tokio_codec::{Framed, FramedParts};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_rustls::webpki; use tokio_rustls::webpki;
@ -47,7 +52,8 @@ pub struct WsConfig<T> {
transport: T, transport: T,
max_data_size: u64, max_data_size: u64,
tls_config: tls::Config, tls_config: tls::Config,
max_redirects: u8 max_redirects: u8,
use_deflate: bool
} }
impl<T> WsConfig<T> { impl<T> WsConfig<T> {
@ -57,7 +63,8 @@ impl<T> WsConfig<T> {
transport, transport,
max_data_size: MAX_DATA_SIZE, max_data_size: MAX_DATA_SIZE,
tls_config: tls::Config::client(), tls_config: tls::Config::client(),
max_redirects: 0 max_redirects: 0,
use_deflate: false
} }
} }
@ -88,6 +95,12 @@ impl<T> WsConfig<T> {
self.tls_config = c; self.tls_config = c;
self self
} }
/// Should the deflate extension (RFC 7692) be used if supported?
pub fn use_deflate(&mut self, flag: bool) -> &mut Self {
self.use_deflate = flag;
self
}
} }
impl<T> Transport for WsConfig<T> impl<T> Transport for WsConfig<T>
@ -125,6 +138,7 @@ where
let tls_config = self.tls_config; let tls_config = self.tls_config;
let max_size = self.max_data_size; let max_size = self.max_data_size;
let use_deflate = self.use_deflate;
let listen = self.transport.listen_on(inner_addr) let listen = self.transport.listen_on(inner_addr)
.map_err(|e| e.map(Error::Transport))? .map_err(|e| e.map(Error::Transport))?
.map_err(Error::Transport) .map_err(Error::Transport)
@ -163,7 +177,11 @@ where
}) })
.and_then(move |stream| { .and_then(move |stream| {
trace!("receiving websocket handshake request from {}", remote2); trace!("receiving websocket handshake request from {}", remote2);
Framed::new(stream, handshake::Server::new()) let mut s = handshake::Server::new();
if use_deflate {
s.add_extension(Box::new(Deflate::new(Mode::Server)));
}
Framed::new(stream, s)
.into_future() .into_future()
.map_err(|(e, _framed)| Error::Handshake(Box::new(e))) .map_err(|(e, _framed)| Error::Handshake(Box::new(e)))
.and_then(move |(request, framed)| { .and_then(move |(request, framed)| {
@ -174,7 +192,9 @@ where
.map_err(|e| Error::Base(Box::new(e))) .map_err(|e| Error::Base(Box::new(e)))
.map(move |f| { .map(move |f| {
trace!("websocket handshake with {} successful", remote2); trace!("websocket handshake with {} successful", remote2);
let c = new_connection(f, max_size, Mode::Server); let (mut handshake, mut c) =
new_connection(f, max_size, Mode::Server);
c.add_extensions(handshake.drain_extensions());
BytesConnection { inner: c } BytesConnection { inner: c }
})) }))
} else { } else {
@ -264,6 +284,7 @@ where
let address1 = address.clone(); // used for logging let address1 = address.clone(); // used for logging
let address2 = address.clone(); // used for logging let address2 = address.clone(); // used for logging
let use_deflate = config.use_deflate;
let future = dial.map_err(Error::Transport) let future = dial.map_err(Error::Transport)
.and_then(move |stream| { .and_then(move |stream| {
trace!("connected to {}", address); trace!("connected to {}", address);
@ -283,7 +304,10 @@ where
}) })
.and_then(move |stream| { .and_then(move |stream| {
trace!("sending websocket handshake request to {}", address1); trace!("sending websocket handshake request to {}", address1);
let client = handshake::Client::new(host_port, path); let mut client = handshake::Client::new(host_port, path);
if use_deflate {
client.add_extension(Box::new(Deflate::new(Mode::Client)));
}
Framed::new(stream, client) Framed::new(stream, client)
.send(()) .send(())
.map_err(|e| Error::Handshake(Box::new(e))) .map_err(|e| Error::Handshake(Box::new(e)))
@ -306,7 +330,8 @@ where
trace!("websocket handshake with {} successful", address1) trace!("websocket handshake with {} successful", address1)
} }
} }
let c = new_connection(framed, max_data_size, Mode::Client); let (mut handshake, mut c) = new_connection(framed, max_data_size, Mode::Client);
c.add_extensions(handshake.drain_extensions());
Ok(Either::B(BytesConnection { inner: c })) Ok(Either::B(BytesConnection { inner: c }))
}) })
}); });
@ -372,7 +397,7 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
} }
/// Create a `Connection` from an existing `Framed` value. /// Create a `Connection` from an existing `Framed` value.
fn new_connection<T, C>(framed: Framed<T, C>, max_size: u64, mode: Mode) -> Connection<T> fn new_connection<T, C>(framed: Framed<T, C>, max_size: u64, mode: Mode) -> (C, Connection<T>)
where where
T: AsyncRead + AsyncWrite T: AsyncRead + AsyncWrite
{ {
@ -383,7 +408,9 @@ where
new.read_buf = old.read_buf; new.read_buf = old.read_buf;
new.write_buf = old.write_buf; new.write_buf = old.write_buf;
let framed = Framed::from_parts(new); let framed = Framed::from_parts(new);
Connection::from_framed(framed, mode) let mut conn = Connection::from_framed(framed, mode);
conn.set_max_buffer_size(usize::try_from(max_size).unwrap_or(std::usize::MAX));
(old.codec, conn)
} }
// BytesConnection //////////////////////////////////////////////////////////////////////////////// // BytesConnection ////////////////////////////////////////////////////////////////////////////////

View File

@ -75,6 +75,12 @@ impl<T> WsConfig<T> {
self.transport.set_tls_config(c); self.transport.set_tls_config(c);
self self
} }
/// Should the deflate extension (RFC 7692) be used if supported?
pub fn use_deflate(&mut self, flag: bool) -> &mut Self {
self.transport.use_deflate(flag);
self
}
} }
impl<T> From<framed::WsConfig<T>> for WsConfig<T> { impl<T> From<framed::WsConfig<T>> for WsConfig<T> {