diff --git a/core/src/either.rs b/core/src/either.rs index 1230da44..5a40c624 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -68,13 +68,19 @@ where A: AsyncRead, B: AsyncRead, { - #[inline] unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { match self { EitherOutput::First(a) => a.prepare_uninitialized_buffer(buf), EitherOutput::Second(b) => b.prepare_uninitialized_buffer(buf), } } + + fn read_buf(&mut self, buf: &mut Bu) -> Poll { + match self { + EitherOutput::First(a) => a.read_buf(buf), + EitherOutput::Second(b) => b.read_buf(buf), + } + } } impl Read for EitherOutput @@ -179,6 +185,13 @@ where } } + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + match self { + EitherOutput::First(ref inner) => inner.prepare_uninitialized_buffer(buf), + EitherOutput::Second(ref inner) => inner.prepare_uninitialized_buffer(buf), + } + } + fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 4f19c664..4e3c8957 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -132,6 +132,20 @@ pub trait StreamMuxer { /// happened. fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll; + /// Mimics the `prepare_uninitialized_buffer` method of the `AsyncRead` trait. + /// + /// This function isn't actually unsafe to call but unsafe to implement. The implementer must + /// ensure that either the whole buf has been zeroed or that `read_substream` overwrites the + /// buffer without reading it and returns correct value. + /// + /// If this function returns true, then the memory has been zeroed out. This allows + /// implementations of `AsyncRead` which are composed of multiple subimplementations to + /// efficiently implement `prepare_uninitialized_buffer`. + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + for b in buf.iter_mut() { *b = 0; } + true + } + /// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`. /// /// If `NotReady` is returned, then the current task will be notified once the substream @@ -369,7 +383,10 @@ where P: Deref, P::Target: StreamMuxer, { - #[inline] + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.muxer.prepare_uninitialized_buffer(buf) + } + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { let s = self.substream.as_mut().expect("substream was empty"); self.muxer.read_substream(s, buf).map_err(|e| e.into()) @@ -488,6 +505,10 @@ impl StreamMuxer for StreamMuxerBox { self.inner.destroy_outbound(substream) } + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } + #[inline] fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { self.inner.read_substream(s, buf) @@ -579,6 +600,10 @@ where self.inner.destroy_outbound(list.remove(&substream).unwrap()) } + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } + #[inline] fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { let mut list = self.substreams.lock(); diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index a2a8d01d..7bec14ed 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -101,6 +101,10 @@ where fn destroy_outbound(&self, _: Self::OutboundSubstream) { } + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.lock().prepare_uninitialized_buffer(buf) + } + fn read_substream(&self, _: &mut Self::Substream, buf: &mut [u8]) -> Poll { let res = self.inner.lock().poll_read(buf); if let Ok(Async::Ready(_)) = res { diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index e4d7c6de..4d478016 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -94,6 +94,9 @@ impl io::Write for DummyStream { } impl tokio_io::AsyncRead for DummyStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } } impl tokio_io::AsyncWrite for DummyStream { diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 7d50555d..746f1676 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -97,6 +97,13 @@ impl tokio_io::AsyncRead for Negotiated where TInner: tokio_io::AsyncRead { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.0.read_buf(buf) + } } impl io::Write for Negotiated diff --git a/misc/rw-stream-sink/src/lib.rs b/misc/rw-stream-sink/src/lib.rs index 69485961..1d06458c 100644 --- a/misc/rw-stream-sink/src/lib.rs +++ b/misc/rw-stream-sink/src/lib.rs @@ -91,6 +91,9 @@ where S: Stream, S::Item: IntoBuf, { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } } impl Write for RwStreamSink diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 9dba50cc..550a9609 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -460,6 +460,10 @@ where C: AsyncRead + AsyncWrite // Nothing to do. } + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll { loop { // First, transfer from `current_data`. diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index c19f12f0..dd062a6d 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -80,6 +80,10 @@ where fn destroy_outbound(&self, _: Self::OutboundSubstream) { } + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll { let result = sub.poll_read(buf); if let Ok(Async::Ready(_)) = result { diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index 20dcf81c..a26b39b7 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -8,6 +8,7 @@ repository = "https://github.com/libp2p/rust-libp2p" edition = "2018" [dependencies] +bytes = "0.4" curve25519-dalek = "1" futures = "0.1" lazy_static = "1.2" diff --git a/protocols/noise/src/io.rs b/protocols/noise/src/io.rs index 47fef9c8..67c1aeb4 100644 --- a/protocols/noise/src/io.rs +++ b/protocols/noise/src/io.rs @@ -335,7 +335,11 @@ impl io::Write for NoiseOutput { } } -impl AsyncRead for NoiseOutput {} +impl AsyncRead for NoiseOutput { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } +} impl AsyncWrite for NoiseOutput { fn shutdown(&mut self) -> Poll<(), io::Error> { diff --git a/protocols/noise/src/io/handshake.rs b/protocols/noise/src/io/handshake.rs index 6b993da8..93a1f206 100644 --- a/protocols/noise/src/io/handshake.rs +++ b/protocols/noise/src/io/handshake.rs @@ -266,7 +266,14 @@ impl io::Write for State { } } -impl AsyncRead for State {} +impl AsyncRead for State { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.io.prepare_uninitialized_buffer(buf) + } + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.io.read_buf(buf) + } +} impl AsyncWrite for State { fn shutdown(&mut self) -> Poll<(), io::Error> { diff --git a/src/bandwidth.rs b/src/bandwidth.rs index 76b889cf..4395d7e7 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -171,6 +171,13 @@ impl Read for BandwidthConnecLogging impl tokio_io::AsyncRead for BandwidthConnecLogging where TInner: tokio_io::AsyncRead { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.inner.read_buf(buf) + } } impl Write for BandwidthConnecLogging diff --git a/transports/ratelimit/Cargo.toml b/transports/ratelimit/Cargo.toml index d20c83bb..f0e92439 100644 --- a/transports/ratelimit/Cargo.toml +++ b/transports/ratelimit/Cargo.toml @@ -11,6 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] aio-limited = "0.1" +bytes = "0.4" futures = "0.1" libp2p-core = { version = "0.7.0", path = "../../core" } log = "0.4" diff --git a/transports/ratelimit/src/lib.rs b/transports/ratelimit/src/lib.rs index 21a18d74..aecaca77 100644 --- a/transports/ratelimit/src/lib.rs +++ b/transports/ratelimit/src/lib.rs @@ -132,7 +132,15 @@ impl io::Write for Connection { } } -impl AsyncRead for Connection {} +impl AsyncRead for Connection { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.reader.prepare_uninitialized_buffer(buf) + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.reader.read_buf(buf) + } +} impl AsyncWrite for Connection { fn shutdown(&mut self) -> Poll<(), io::Error> { diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 82324e66..4998fd59 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +bytes = "0.4" get_if_addrs = "0.5.3" libp2p-core = { version = "0.7.0", path = "../../core" } log = "0.4.1" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index b16fcec8..6441d2dc 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -464,7 +464,15 @@ impl Read for TcpTransStream { } } -impl AsyncRead for TcpTransStream {} +impl AsyncRead for TcpTransStream { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.inner.read_buf(buf) + } +} impl Write for TcpTransStream { #[inline] diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 8577590d..ee0abc91 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -411,7 +411,11 @@ impl io::Read for Connection { } } -impl tokio_io::AsyncRead for Connection {} +impl tokio_io::AsyncRead for Connection { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } +} impl io::Write for Connection { fn write(&mut self, buf: &[u8]) -> Result { diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 8c37e1db..499db057 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -20,6 +20,7 @@ tokio-io = "0.1" websocket = { version = "0.22.2", default-features = false, features = ["async", "async-ssl"] } [target.'cfg(any(target_os = "emscripten", target_os = "unknown"))'.dependencies] +bytes = "0.4" stdweb = { version = "0.4", default-features = false } wasm-bindgen = "0.2.42" diff --git a/transports/websocket/src/browser.rs b/transports/websocket/src/browser.rs index 2c8324e5..0822262c 100644 --- a/transports/websocket/src/browser.rs +++ b/transports/websocket/src/browser.rs @@ -232,7 +232,15 @@ impl Drop for BrowserWsConn { } } -impl AsyncRead for BrowserWsConn {} +impl AsyncRead for BrowserWsConn { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.incoming_data.prepare_uninitialized_buffer(buf) + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.incoming_data.read_buf(buf) + } +} impl Read for BrowserWsConn { #[inline]