*: Remove usage of custom buffer initialization usage (#1263)

* *: Remove usage of custom buffer initialization usage

With version `0.3.0-alpha.19` the futures-preview crate makes the
`AsyncRead::initializer` API unstable.

In order to improve interoperability with e.g. both a library depending
on alpha.18 as well as a library depending on alpha.19 and in order for
rust-libp2p to become stable again, this commit removes all usages of
the unstable `initializer` API.

* protocols/noise: Remove NoiseOutput Asyncread initializer

* transports/tcp: Remove TcpTransStream AsyncRead initializer

* *: Remove version pinning of futures-preview to 0.3.0-alpha.18

With version 0.3.0-alpha.19 the futures-preview crate makes the
AsyncRead::initializer API unstable. Given that the previous commits
removed usage of the initializer API, the version pinning is not needed
any longer.
This commit is contained in:
Max Inden
2019-11-01 16:53:11 +01:00
committed by Toralf Wittner
parent 0eeddac86f
commit 8944899fe0
14 changed files with 10 additions and 79 deletions

View File

@ -22,7 +22,7 @@ log = "0.4"
multiaddr = { package = "parity-multiaddr", version = "0.5.0", path = "../misc/multiaddr" }
multihash = { package = "parity-multihash", version = "0.1.0", path = "../misc/multihash" }
multistream-select = { version = "0.5.0", path = "../misc/multistream-select" }
futures-preview = { version = "= 0.3.0-alpha.18", features = ["compat", "io-compat"] }
futures-preview = { version = "0.3.0-alpha.18", features = ["compat", "io-compat"] }
parking_lot = "0.8"
protobuf = "2.8"
quick-error = "1.2"

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent};
use futures::{prelude::*, io::Initializer};
use futures::prelude::*;
use std::{fmt, io::{Error as IoError, Read, Write}, pin::Pin, task::Context, task::Poll};
#[derive(Debug, Copy, Clone)]
@ -67,13 +67,6 @@ where
A: AsyncRead + Unpin,
B: AsyncRead + Unpin,
{
unsafe fn initializer(&self) -> Initializer {
match self {
EitherOutput::First(a) => a.initializer(),
EitherOutput::Second(b) => b.initializer(),
}
}
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
match &mut *self {
EitherOutput::First(a) => AsyncRead::poll_read(Pin::new(a), cx, buf),
@ -249,13 +242,6 @@ where
}
}
unsafe fn initializer(&self) -> Initializer {
match self {
EitherOutput::First(ref inner) => inner.initializer(),
EitherOutput::Second(ref inner) => inner.initializer(),
}
}
fn read_substream(&self, cx: &mut Context, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {

View File

@ -52,7 +52,7 @@
//! implementation of `StreamMuxer` to control everything that happens on the wire.
use fnv::FnvHashMap;
use futures::{future, prelude::*, io::Initializer, task::Context, task::Poll};
use futures::{future, prelude::*, task::Context, task::Poll};
use parking_lot::Mutex;
use std::{io, ops::Deref, fmt, pin::Pin, sync::atomic::{AtomicUsize, Ordering}};
@ -130,11 +130,6 @@ pub trait StreamMuxer {
fn read_substream(&self, cx: &mut Context, s: &mut Self::Substream, buf: &mut [u8])
-> Poll<Result<usize, Self::Error>>;
/// Mimics the `initializer` method of the `AsyncRead` trait.
unsafe fn initializer(&self) -> Initializer {
Initializer::zeroing()
}
/// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`.
///
/// If `Pending` is returned, then the current task will be notified once the substream
@ -381,10 +376,6 @@ where
P: Deref,
P::Target: StreamMuxer,
{
unsafe fn initializer(&self) -> Initializer {
self.muxer.initializer()
}
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
// We use a `this` because the compiler isn't smart enough to allow mutably borrowing
// multiple different fields from the `Pin` at the same time.
@ -511,10 +502,6 @@ impl StreamMuxer for StreamMuxerBox {
self.inner.destroy_outbound(substream)
}
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
#[inline]
fn read_substream(&self, cx: &mut Context, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
self.inner.read_substream(cx, s, buf)
@ -616,10 +603,6 @@ where
self.inner.destroy_outbound(list.remove(&substream).unwrap())
}
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
#[inline]
fn read_substream(&self, cx: &mut Context, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
let mut list = self.substreams.lock();

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::{Endpoint, muxing::StreamMuxer};
use futures::{prelude::*, io::Initializer};
use futures::prelude::*;
use parking_lot::Mutex;
use std::{io, pin::Pin, sync::atomic::{AtomicBool, Ordering}, task::Context, task::Poll};
@ -100,10 +100,6 @@ where
fn destroy_outbound(&self, _: Self::OutboundSubstream) {
}
unsafe fn initializer(&self) -> Initializer {
self.inner.lock().initializer()
}
fn read_substream(&self, cx: &mut Context, _: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
let res = AsyncRead::poll_read(Pin::new(&mut *self.inner.lock()), cx, buf);
if let Poll::Ready(Ok(_)) = res {

View File

@ -10,4 +10,4 @@ keywords = ["networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
futures-preview = "= 0.3.0-alpha.18"
futures-preview = "0.3.0-alpha.18"

View File

@ -27,7 +27,7 @@
//! > **Note**: Although this crate is hosted in the libp2p repo, it is purely a utility crate and
//! > not at all specific to libp2p.
use futures::{prelude::*, io::Initializer};
use futures::prelude::*;
use std::{cmp, io, pin::Pin, task::Context, task::Poll};
/// Wraps around a `Stream + Sink` whose items are buffers. Implements `AsyncRead` and `AsyncWrite`.
@ -74,10 +74,6 @@ where
for _ in 0..to_copy { current_item.remove(0); }
Poll::Ready(Ok(to_copy))
}
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}
impl<S> AsyncWrite for RwStreamSink<S>

View File

@ -33,7 +33,7 @@ use libp2p_core::{
use log::{debug, trace};
use parking_lot::Mutex;
use fnv::FnvHashSet;
use futures::{prelude::*, future, io::Initializer, ready, stream::Fuse};
use futures::{prelude::*, future, ready, stream::Fuse};
use futures::task::{ArcWake, waker_ref};
use futures_codec::Framed;
@ -470,10 +470,6 @@ where C: AsyncRead + AsyncWrite + Unpin
// Nothing to do.
}
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
fn read_substream(&self, cx: &mut Context, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
loop {
// First, transfer from `current_data`.

View File

@ -80,10 +80,6 @@ where
fn destroy_outbound(&self, _: Self::OutboundSubstream) {
}
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
let result = sub.poll_read(buf);
if let Ok(Async::Ready(_)) = result {

View File

@ -135,7 +135,6 @@ impl<S> AsyncRead for DeflateOutput<S>
unsafe {
this.read_interm.reserve(256);
this.read_interm.set_len(this.read_interm.capacity());
this.inner.initializer().initialize(&mut this.read_interm);
}
match AsyncRead::poll_read(Pin::new(&mut this.inner), cx, &mut this.read_interm) {
@ -172,10 +171,6 @@ impl<S> AsyncRead for DeflateOutput<S>
}
}
}
unsafe fn initializer(&self) -> futures::io::Initializer {
futures::io::Initializer::nop()
}
}
impl<S> AsyncWrite for DeflateOutput<S>

View File

@ -252,10 +252,6 @@ impl<T: AsyncRead + Unpin> AsyncRead for NoiseOutput<T> {
}
}
}
unsafe fn initializer(&self) -> futures::io::Initializer {
futures::io::Initializer::nop()
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for NoiseOutput<T> {

View File

@ -58,7 +58,7 @@
pub use self::error::SecioError;
use futures::stream::MapErr as StreamMapErr;
use futures::{prelude::*, io::Initializer};
use futures::prelude::*;
use libp2p_core::{PeerId, PublicKey, identity, upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}};
use log::debug;
use rw_stream_sink::RwStreamSink;
@ -199,10 +199,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for SecioOutput<S> {
{
AsyncRead::poll_read(Pin::new(&mut self.stream), cx, buf)
}
unsafe fn initializer(&self) -> Initializer {
self.stream.initializer()
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for SecioOutput<S> {

View File

@ -39,7 +39,6 @@
use async_std::net::TcpStream;
use futures::{
future::{self, Ready},
io::Initializer,
prelude::*,
};
use futures_timer::Delay;
@ -420,10 +419,6 @@ impl AsyncRead for TcpTransStream {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf)
}
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
}
impl AsyncWrite for TcpTransStream {

View File

@ -32,7 +32,7 @@
//! module.
//!
use futures::{prelude::*, future::Ready, io::Initializer};
use futures::{prelude::*, future::Ready};
use libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport};
use parity_send_wrapper::SendWrapper;
use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll};
@ -356,10 +356,6 @@ impl fmt::Debug for Connection {
}
impl AsyncRead for Connection {
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
loop {
match mem::replace(&mut self.read_state, ConnectionReadState::Finished) {

View File

@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
bytes = "0.4.12"
either = "1.5.3"
futures-preview = "= 0.3.0-alpha.18"
futures-preview = "0.3.0-alpha.18"
#futures-rustls = "0.12.0-alpha" # TODO: https://github.com/quininer/tokio-rustls/issues/51
libp2p-core = { version = "0.12.0", path = "../../core" }
log = "0.4.8"