Add implementations of prepare_uninitialized_buffer and read_buf where relevant (#1107)

* Fix #1080

* Fix browser WebSockets
This commit is contained in:
Pierre Krieger 2019-05-10 11:26:18 +02:00 committed by GitHub
parent 089e349671
commit c2398adf67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 121 additions and 8 deletions

View File

@ -68,13 +68,19 @@ where
A: AsyncRead, A: AsyncRead,
B: AsyncRead, B: AsyncRead,
{ {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self { match self {
EitherOutput::First(a) => a.prepare_uninitialized_buffer(buf), EitherOutput::First(a) => a.prepare_uninitialized_buffer(buf),
EitherOutput::Second(b) => b.prepare_uninitialized_buffer(buf), EitherOutput::Second(b) => b.prepare_uninitialized_buffer(buf),
} }
} }
fn read_buf<Bu: bytes::BufMut>(&mut self, buf: &mut Bu) -> Poll<usize, IoError> {
match self {
EitherOutput::First(a) => a.read_buf(buf),
EitherOutput::Second(b) => b.read_buf(buf),
}
}
} }
impl<A, B> Read for EitherOutput<A, B> impl<A, B> Read for EitherOutput<A, B>
@ -179,6 +185,13 @@ where
} }
} }
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self {
EitherOutput::First(ref inner) => inner.prepare_uninitialized_buffer(buf),
EitherOutput::Second(ref inner) => inner.prepare_uninitialized_buffer(buf),
}
}
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> { fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> {
match (self, sub) { match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {

View File

@ -132,6 +132,20 @@ pub trait StreamMuxer {
/// happened. /// happened.
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error>; fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error>;
/// Mimics the `prepare_uninitialized_buffer` method of the `AsyncRead` trait.
///
/// This function isn't actually unsafe to call but unsafe to implement. The implementer must
/// ensure that either the whole buf has been zeroed or that `read_substream` overwrites the
/// buffer without reading it and returns correct value.
///
/// If this function returns true, then the memory has been zeroed out. This allows
/// implementations of `AsyncRead` which are composed of multiple subimplementations to
/// efficiently implement `prepare_uninitialized_buffer`.
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
for b in buf.iter_mut() { *b = 0; }
true
}
/// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`. /// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`.
/// ///
/// If `NotReady` is returned, then the current task will be notified once the substream /// If `NotReady` is returned, then the current task will be notified once the substream
@ -369,7 +383,10 @@ where
P: Deref, P: Deref,
P::Target: StreamMuxer, P::Target: StreamMuxer,
{ {
#[inline] unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.muxer.prepare_uninitialized_buffer(buf)
}
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
let s = self.substream.as_mut().expect("substream was empty"); let s = self.substream.as_mut().expect("substream was empty");
self.muxer.read_substream(s, buf).map_err(|e| e.into()) self.muxer.read_substream(s, buf).map_err(|e| e.into())
@ -488,6 +505,10 @@ impl StreamMuxer for StreamMuxerBox {
self.inner.destroy_outbound(substream) self.inner.destroy_outbound(substream)
} }
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
#[inline] #[inline]
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> { fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> {
self.inner.read_substream(s, buf) self.inner.read_substream(s, buf)
@ -579,6 +600,10 @@ where
self.inner.destroy_outbound(list.remove(&substream).unwrap()) self.inner.destroy_outbound(list.remove(&substream).unwrap())
} }
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
#[inline] #[inline]
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> { fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, Self::Error> {
let mut list = self.substreams.lock(); let mut list = self.substreams.lock();

View File

@ -101,6 +101,10 @@ where
fn destroy_outbound(&self, _: Self::OutboundSubstream) { fn destroy_outbound(&self, _: Self::OutboundSubstream) {
} }
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.lock().prepare_uninitialized_buffer(buf)
}
fn read_substream(&self, _: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, io::Error> { fn read_substream(&self, _: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, io::Error> {
let res = self.inner.lock().poll_read(buf); let res = self.inner.lock().poll_read(buf);
if let Ok(Async::Ready(_)) = res { if let Ok(Async::Ready(_)) = res {

View File

@ -94,6 +94,9 @@ impl io::Write for DummyStream {
} }
impl tokio_io::AsyncRead for DummyStream { impl tokio_io::AsyncRead for DummyStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
} }
impl tokio_io::AsyncWrite for DummyStream { impl tokio_io::AsyncWrite for DummyStream {

View File

@ -97,6 +97,13 @@ impl<TInner> tokio_io::AsyncRead for Negotiated<TInner>
where where
TInner: tokio_io::AsyncRead TInner: tokio_io::AsyncRead
{ {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.0.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.0.read_buf(buf)
}
} }
impl<TInner> io::Write for Negotiated<TInner> impl<TInner> io::Write for Negotiated<TInner>

View File

@ -91,6 +91,9 @@ where
S: Stream<Error = IoError>, S: Stream<Error = IoError>,
S::Item: IntoBuf, S::Item: IntoBuf,
{ {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
} }
impl<S> Write for RwStreamSink<S> impl<S> Write for RwStreamSink<S>

View File

@ -460,6 +460,10 @@ where C: AsyncRead + AsyncWrite
// Nothing to do. // Nothing to do.
} }
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> { fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
loop { loop {
// First, transfer from `current_data`. // First, transfer from `current_data`.

View File

@ -80,6 +80,10 @@ where
fn destroy_outbound(&self, _: Self::OutboundSubstream) { fn destroy_outbound(&self, _: Self::OutboundSubstream) {
} }
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> { fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
let result = sub.poll_read(buf); let result = sub.poll_read(buf);
if let Ok(Async::Ready(_)) = result { if let Ok(Async::Ready(_)) = result {

View File

@ -8,6 +8,7 @@ repository = "https://github.com/libp2p/rust-libp2p"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
bytes = "0.4"
curve25519-dalek = "1" curve25519-dalek = "1"
futures = "0.1" futures = "0.1"
lazy_static = "1.2" lazy_static = "1.2"

View File

@ -335,7 +335,11 @@ impl<T: io::Write> io::Write for NoiseOutput<T> {
} }
} }
impl<T: AsyncRead> AsyncRead for NoiseOutput<T> {} impl<T: AsyncRead> AsyncRead for NoiseOutput<T> {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
}
impl<T: AsyncWrite> AsyncWrite for NoiseOutput<T> { impl<T: AsyncWrite> AsyncWrite for NoiseOutput<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> { fn shutdown(&mut self) -> Poll<(), io::Error> {

View File

@ -266,7 +266,14 @@ impl<T: io::Write> io::Write for State<T> {
} }
} }
impl<T: AsyncRead> AsyncRead for State<T> {} impl<T: AsyncRead> AsyncRead for State<T> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.io.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.io.read_buf(buf)
}
}
impl<T: AsyncWrite> AsyncWrite for State<T> { impl<T: AsyncWrite> AsyncWrite for State<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> { fn shutdown(&mut self) -> Poll<(), io::Error> {

View File

@ -171,6 +171,13 @@ impl<TInner> Read for BandwidthConnecLogging<TInner>
impl<TInner> tokio_io::AsyncRead for BandwidthConnecLogging<TInner> impl<TInner> tokio_io::AsyncRead for BandwidthConnecLogging<TInner>
where TInner: tokio_io::AsyncRead where TInner: tokio_io::AsyncRead
{ {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
} }
impl<TInner> Write for BandwidthConnecLogging<TInner> impl<TInner> Write for BandwidthConnecLogging<TInner>

View File

@ -11,6 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
aio-limited = "0.1" aio-limited = "0.1"
bytes = "0.4"
futures = "0.1" futures = "0.1"
libp2p-core = { version = "0.7.0", path = "../../core" } libp2p-core = { version = "0.7.0", path = "../../core" }
log = "0.4" log = "0.4"

View File

@ -132,7 +132,15 @@ impl<C: AsyncRead + AsyncWrite> io::Write for Connection<C> {
} }
} }
impl<C: AsyncRead + AsyncWrite> AsyncRead for Connection<C> {} impl<C: AsyncRead + AsyncWrite> AsyncRead for Connection<C> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.reader.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.reader.read_buf(buf)
}
}
impl<C: AsyncRead + AsyncWrite> AsyncWrite for Connection<C> { impl<C: AsyncRead + AsyncWrite> AsyncWrite for Connection<C> {
fn shutdown(&mut self) -> Poll<(), io::Error> { fn shutdown(&mut self) -> Poll<(), io::Error> {

View File

@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
bytes = "0.4"
get_if_addrs = "0.5.3" get_if_addrs = "0.5.3"
libp2p-core = { version = "0.7.0", path = "../../core" } libp2p-core = { version = "0.7.0", path = "../../core" }
log = "0.4.1" log = "0.4.1"

View File

@ -464,7 +464,15 @@ impl Read for TcpTransStream {
} }
} }
impl AsyncRead for TcpTransStream {} impl AsyncRead for TcpTransStream {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl Write for TcpTransStream { impl Write for TcpTransStream {
#[inline] #[inline]

View File

@ -411,7 +411,11 @@ impl io::Read for Connection {
} }
} }
impl tokio_io::AsyncRead for Connection {} impl tokio_io::AsyncRead for Connection {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
}
impl io::Write for Connection { impl io::Write for Connection {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> { fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {

View File

@ -20,6 +20,7 @@ tokio-io = "0.1"
websocket = { version = "0.22.2", default-features = false, features = ["async", "async-ssl"] } websocket = { version = "0.22.2", default-features = false, features = ["async", "async-ssl"] }
[target.'cfg(any(target_os = "emscripten", target_os = "unknown"))'.dependencies] [target.'cfg(any(target_os = "emscripten", target_os = "unknown"))'.dependencies]
bytes = "0.4"
stdweb = { version = "0.4", default-features = false } stdweb = { version = "0.4", default-features = false }
wasm-bindgen = "0.2.42" wasm-bindgen = "0.2.42"

View File

@ -232,7 +232,15 @@ impl Drop for BrowserWsConn {
} }
} }
impl AsyncRead for BrowserWsConn {} impl AsyncRead for BrowserWsConn {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.incoming_data.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, IoError> {
self.incoming_data.read_buf(buf)
}
}
impl Read for BrowserWsConn { impl Read for BrowserWsConn {
#[inline] #[inline]