Update root crate to use futures-0.3. (#1315)

Mostly mechanical. Creating a `CommonTransport` yields an
`io::Result<_>` now since creating the `DnsConfig` may fail with an
`io::Error` when creating the `ThreadPool`.

The `DnsConfig` `Transport` impl had to be changed slightly:

(a) PR [[1311]] requires some `Send` bounds.
(b) The async block had to be changed to work around lifetime inference
issues which resulted in an "one type is more general than the other"
error.

[1311]: https://github.com/libp2p/rust-libp2p/pull/1311
This commit is contained in:
Toralf Wittner
2019-11-22 14:30:21 +01:00
committed by GitHub
parent 1597b026cb
commit df71d4a861
5 changed files with 97 additions and 126 deletions

View File

@ -15,7 +15,7 @@ secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"]
[dependencies]
bytes = "0.4"
futures = "0.1"
futures = "0.3.1"
multiaddr = { package = "parity-multiaddr", version = "0.5.1", path = "misc/multiaddr" }
multihash = { package = "parity-multihash", version = "0.1.4", path = "misc/multihash" }
lazy_static = "1.2"
@ -34,10 +34,7 @@ libp2p-wasm-ext = { version = "0.6.0", path = "transports/wasm-ext" }
libp2p-yamux = { version = "0.13.0", path = "muxers/yamux" }
parking_lot = "0.9.0"
smallvec = "1.0"
tokio-codec = "0.1"
tokio-executor = "0.1"
tokio-io = "0.1"
wasm-timer = "0.1"
wasm-timer = "0.2.4"
[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.5.0", path = "protocols/deflate" }

View File

@ -19,11 +19,11 @@
// DEALINGS IN THE SOFTWARE.
use crate::{Multiaddr, core::{Transport, transport::{ListenerEvent, TransportError}}};
use futures::{prelude::*, try_ready};
use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready};
use lazy_static::lazy_static;
use parking_lot::Mutex;
use smallvec::{smallvec, SmallVec};
use std::{cmp, io, io::Read, io::Write, sync::Arc, time::Duration};
use std::{cmp, io, pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
/// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections.
@ -35,7 +35,6 @@ pub struct BandwidthLogging<TInner> {
impl<TInner> BandwidthLogging<TInner> {
/// Creates a new `BandwidthLogging` around the transport.
#[inline]
pub fn new(inner: TInner, period: Duration) -> (Self, Arc<BandwidthSinks>) {
let mut period_seconds = cmp::min(period.as_secs(), 86400) as u32;
if period.subsec_nanos() > 0 {
@ -58,7 +57,10 @@ impl<TInner> BandwidthLogging<TInner> {
impl<TInner> Transport for BandwidthLogging<TInner>
where
TInner: Transport,
TInner: Transport + Unpin,
TInner::Dial: Unpin,
TInner::Listener: Unpin,
TInner::ListenerUpgrade: Unpin
{
type Output = BandwidthConnecLogging<TInner::Output>;
type Error = TInner::Error;
@ -90,22 +92,23 @@ pub struct BandwidthListener<TInner> {
impl<TInner, TConn> Stream for BandwidthListener<TInner>
where
TInner: Stream<Item = ListenerEvent<TConn>>,
TInner: TryStream<Ok = ListenerEvent<TConn>> + Unpin
{
type Item = ListenerEvent<BandwidthFuture<TConn>>;
type Error = TInner::Error;
type Item = Result<ListenerEvent<BandwidthFuture<TConn>>, TInner::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let event = match try_ready!(self.inner.poll()) {
Some(v) => v,
None => return Ok(Async::Ready(None))
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let event =
if let Some(event) = ready!(self.inner.try_poll_next_unpin(cx)?) {
event
} else {
return Poll::Ready(None)
};
let event = event.map(|inner| {
BandwidthFuture { inner, sinks: self.sinks.clone() }
});
Ok(Async::Ready(Some(event)))
Poll::Ready(Some(Ok(event)))
}
}
@ -116,18 +119,13 @@ pub struct BandwidthFuture<TInner> {
sinks: Arc<BandwidthSinks>,
}
impl<TInner> Future for BandwidthFuture<TInner>
where TInner: Future,
{
type Item = BandwidthConnecLogging<TInner::Item>;
type Error = TInner::Error;
impl<TInner: TryFuture + Unpin> Future for BandwidthFuture<TInner> {
type Output = Result<BandwidthConnecLogging<TInner::Ok>, TInner::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = try_ready!(self.inner.poll());
Ok(Async::Ready(BandwidthConnecLogging {
inner,
sinks: self.sinks.clone(),
}))
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = ready!(self.inner.try_poll_unpin(cx)?);
let logged = BandwidthConnecLogging { inner, sinks: self.sinks.clone() };
Poll::Ready(Ok(logged))
}
}
@ -139,13 +137,11 @@ pub struct BandwidthSinks {
impl BandwidthSinks {
/// Returns the average number of bytes that have been downloaded in the period.
#[inline]
pub fn average_download_per_sec(&self) -> u64 {
self.download.lock().get()
}
/// Returns the average number of bytes that have been uploaded in the period.
#[inline]
pub fn average_upload_per_sec(&self) -> u64 {
self.upload.lock().get()
}
@ -157,56 +153,43 @@ pub struct BandwidthConnecLogging<TInner> {
sinks: Arc<BandwidthSinks>,
}
impl<TInner> Read for BandwidthConnecLogging<TInner>
where TInner: Read
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let num_bytes = self.inner.read(buf)?;
impl<TInner: AsyncRead + Unpin> AsyncRead for BandwidthConnecLogging<TInner> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_read(cx, buf))?;
self.sinks.download.lock().inject(num_bytes);
Ok(num_bytes)
Poll::Ready(Ok(num_bytes))
}
fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_read_vectored(cx, bufs))?;
self.sinks.download.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
}
impl<TInner> tokio_io::AsyncRead for BandwidthConnecLogging<TInner>
where TInner: tokio_io::AsyncRead
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl<TInner> Write for BandwidthConnecLogging<TInner>
where TInner: Write
{
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let num_bytes = self.inner.write(buf)?;
impl<TInner: AsyncWrite + Unpin> AsyncWrite for BandwidthConnecLogging<TInner> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_write(cx, buf))?;
self.sinks.upload.lock().inject(num_bytes);
Ok(num_bytes)
Poll::Ready(Ok(num_bytes))
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, bufs))?;
self.sinks.upload.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
}
impl<TInner> tokio_io::AsyncWrite for BandwidthConnecLogging<TInner>
where TInner: tokio_io::AsyncWrite
{
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}
/// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now.
#[inline]
fn current_second() -> u32 {
lazy_static! {
static ref EPOCH: Instant = Instant::now();
@ -267,7 +250,6 @@ impl BandwidthSink {
self.bytes.remove(0);
self.bytes.push(0);
}
self.latest_update = current_second;
}
}

View File

@ -158,8 +158,6 @@ pub use futures;
pub use multiaddr;
#[doc(inline)]
pub use multihash;
pub use tokio_io;
pub use tokio_codec;
#[doc(inline)]
pub use libp2p_core as core;
@ -229,7 +227,7 @@ use std::{error, io, time::Duration};
/// > **Note**: This `Transport` is not suitable for production usage, as its implementation
/// > reserves the right to support additional protocols or remove deprecated protocols.
pub fn build_development_transport(keypair: identity::Keypair)
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
-> io::Result<impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone>
{
build_tcp_ws_secio_mplex_yamux(keypair)
}
@ -241,14 +239,14 @@ pub fn build_development_transport(keypair: identity::Keypair)
///
/// > **Note**: If you ever need to express the type of this `Transport`.
pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair)
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
-> io::Result<impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone>
{
CommonTransport::new()
Ok(CommonTransport::new()?
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(keypair))
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20)))
}
/// Implementation of `Transport` that supports the most common protocols.
@ -276,27 +274,27 @@ struct CommonTransportInner {
impl CommonTransport {
/// Initializes the `CommonTransport`.
#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))]
pub fn new() -> CommonTransport {
pub fn new() -> io::Result<CommonTransport> {
let tcp = tcp::TcpConfig::new().nodelay(true);
let transport = dns::DnsConfig::new(tcp);
let transport = dns::DnsConfig::new(tcp)?;
#[cfg(feature = "libp2p-websocket")]
let transport = {
let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone))
};
CommonTransport {
Ok(CommonTransport {
inner: CommonTransportInner { inner: transport }
}
})
}
/// Initializes the `CommonTransport`.
#[cfg(any(target_os = "emscripten", target_os = "unknown"))]
pub fn new() -> CommonTransport {
pub fn new() -> io::Result<CommonTransport> {
let inner = core::transport::dummy::DummyTransport::new();
CommonTransport {
Ok(CommonTransport {
inner: CommonTransportInner { inner }
}
})
}
}

View File

@ -20,9 +20,8 @@
use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use bytes::Bytes;
use futures::{future::FromErr, prelude::*};
use std::{iter, io::Error as IoError, sync::Arc};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::prelude::*;
use std::{iter, sync::Arc};
/// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols.
#[derive(Debug)]
@ -35,7 +34,6 @@ pub struct SimpleProtocol<F> {
impl<F> SimpleProtocol<F> {
/// Builds a `SimpleProtocol`.
#[inline]
pub fn new<N>(info: N, upgrade: F) -> SimpleProtocol<F>
where
N: Into<Bytes>,
@ -48,7 +46,6 @@ impl<F> SimpleProtocol<F> {
}
impl<F> Clone for SimpleProtocol<F> {
#[inline]
fn clone(&self) -> Self {
SimpleProtocol {
info: self.info.clone(),
@ -61,42 +58,39 @@ impl<F> UpgradeInfo for SimpleProtocol<F> {
type Info = Bytes;
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.info.clone())
}
}
impl<C, F, O> InboundUpgrade<C> for SimpleProtocol<F>
impl<C, F, O, A, E> InboundUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(Negotiated<C>) -> O,
O: IntoFuture<Error = IoError>
O: Future<Output = Result<A, E>> + Unpin
{
type Output = O::Item;
type Error = IoError;
type Future = FromErr<O::Future, IoError>;
type Output = A;
type Error = E;
type Future = O;
#[inline]
fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err()
upgrade(socket)
}
}
impl<C, F, O> OutboundUpgrade<C> for SimpleProtocol<F>
impl<C, F, O, A, E> OutboundUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(Negotiated<C>) -> O,
O: IntoFuture<Error = IoError>
O: Future<Output = Result<A, E>> + Unpin
{
type Output = O::Item;
type Error = IoError;
type Future = FromErr<O::Future, IoError>;
type Output = A;
type Error = E;
type Future = O;
#[inline]
fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err()
upgrade(socket)
}
}

View File

@ -33,14 +33,14 @@
//! replaced with respectively an `/ip4/` or an `/ip6/` component.
//!
use futures::{prelude::*, channel::oneshot};
use futures::{prelude::*, channel::oneshot, future::BoxFuture};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{TransportError, ListenerEvent}
};
use log::{error, debug, trace};
use std::{error, fmt, io, net::ToSocketAddrs, pin::Pin};
use std::{error, fmt, io, net::ToSocketAddrs};
/// Represents the configuration for a DNS transport capability of libp2p.
///
@ -90,8 +90,9 @@ where
impl<T> Transport for DnsConfig<T>
where
T: Transport + 'static,
T::Error: 'static,
T: Transport + Send + 'static,
T::Error: Send,
T::Dial: Send
{
type Output = T::Output;
type Error = DnsErr<T::Error>;
@ -102,7 +103,7 @@ where
type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
type Dial = future::Either<
future::MapErr<T::Dial, fn(T::Error) -> Self::Error>,
Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>
BoxFuture<'static, Result<Self::Output, Self::Error>>
>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
@ -166,21 +167,21 @@ where
})
.collect::<stream::FuturesOrdered<_>>();
let inner = self.inner;
Ok(future::Either::Right(Box::pin(async {
let addr = addr;
let outcome: Vec<_> = resolve_futs.collect().await;
let future = resolve_futs.collect::<Vec<_>>()
.then(move |outcome| async move {
let outcome = outcome.into_iter().collect::<Result<Vec<_>, _>>()?;
let outcome = outcome.into_iter().collect::<Multiaddr>();
debug!("DNS resolution outcome: {} => {}", addr, outcome);
match inner.dial(outcome) {
match self.inner.dial(outcome) {
Ok(d) => d.await.map_err(DnsErr::Underlying),
Err(TransportError::MultiaddrNotSupported(_addr)) =>
Err(DnsErr::MultiaddrNotSupported),
Err(TransportError::Other(err)) => Err(DnsErr::Underlying(err)),
Err(TransportError::Other(err)) => Err(DnsErr::Underlying(err))
}
}) as Pin<Box<_>>))
});
Ok(future.boxed().right_future())
}
}
@ -231,14 +232,13 @@ where TErr: error::Error + 'static
#[cfg(test)]
mod tests {
use super::DnsConfig;
use futures::prelude::*;
use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::ListenerEvent,
transport::TransportError,
};
use std::pin::Pin;
#[test]
fn basic_resolve() {
@ -248,9 +248,9 @@ mod tests {
impl Transport for CustomTransport {
type Output = ();
type Error = std::io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>>>>;
type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>>;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
unreachable!()