diff --git a/Cargo.toml b/Cargo.toml index 0f73f93e..a9722817 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"] [dependencies] bytes = "0.4" -futures = "0.1" +futures = "0.3.1" multiaddr = { package = "parity-multiaddr", version = "0.5.1", path = "misc/multiaddr" } multihash = { package = "parity-multihash", version = "0.1.4", path = "misc/multihash" } lazy_static = "1.2" @@ -34,10 +34,7 @@ libp2p-wasm-ext = { version = "0.6.0", path = "transports/wasm-ext" } libp2p-yamux = { version = "0.13.0", path = "muxers/yamux" } parking_lot = "0.9.0" smallvec = "1.0" -tokio-codec = "0.1" -tokio-executor = "0.1" -tokio-io = "0.1" -wasm-timer = "0.1" +wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] libp2p-deflate = { version = "0.5.0", path = "protocols/deflate" } diff --git a/src/bandwidth.rs b/src/bandwidth.rs index 4395d7e7..8e7b882b 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -19,11 +19,11 @@ // DEALINGS IN THE SOFTWARE. use crate::{Multiaddr, core::{Transport, transport::{ListenerEvent, TransportError}}}; -use futures::{prelude::*, try_ready}; +use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready}; use lazy_static::lazy_static; use parking_lot::Mutex; use smallvec::{smallvec, SmallVec}; -use std::{cmp, io, io::Read, io::Write, sync::Arc, time::Duration}; +use std::{cmp, io, pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration}; use wasm_timer::Instant; /// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections. @@ -35,7 +35,6 @@ pub struct BandwidthLogging { impl BandwidthLogging { /// Creates a new `BandwidthLogging` around the transport. - #[inline] pub fn new(inner: TInner, period: Duration) -> (Self, Arc) { let mut period_seconds = cmp::min(period.as_secs(), 86400) as u32; if period.subsec_nanos() > 0 { @@ -58,7 +57,10 @@ impl BandwidthLogging { impl Transport for BandwidthLogging where - TInner: Transport, + TInner: Transport + Unpin, + TInner::Dial: Unpin, + TInner::Listener: Unpin, + TInner::ListenerUpgrade: Unpin { type Output = BandwidthConnecLogging; type Error = TInner::Error; @@ -90,22 +92,23 @@ pub struct BandwidthListener { impl Stream for BandwidthListener where - TInner: Stream>, + TInner: TryStream> + Unpin { - type Item = ListenerEvent>; - type Error = TInner::Error; + type Item = Result>, TInner::Error>; - fn poll(&mut self) -> Poll, Self::Error> { - let event = match try_ready!(self.inner.poll()) { - Some(v) => v, - None => return Ok(Async::Ready(None)) - }; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let event = + if let Some(event) = ready!(self.inner.try_poll_next_unpin(cx)?) { + event + } else { + return Poll::Ready(None) + }; let event = event.map(|inner| { BandwidthFuture { inner, sinks: self.sinks.clone() } }); - Ok(Async::Ready(Some(event))) + Poll::Ready(Some(Ok(event))) } } @@ -116,18 +119,13 @@ pub struct BandwidthFuture { sinks: Arc, } -impl Future for BandwidthFuture - where TInner: Future, -{ - type Item = BandwidthConnecLogging; - type Error = TInner::Error; +impl Future for BandwidthFuture { + type Output = Result, TInner::Error>; - fn poll(&mut self) -> Poll { - let inner = try_ready!(self.inner.poll()); - Ok(Async::Ready(BandwidthConnecLogging { - inner, - sinks: self.sinks.clone(), - })) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = ready!(self.inner.try_poll_unpin(cx)?); + let logged = BandwidthConnecLogging { inner, sinks: self.sinks.clone() }; + Poll::Ready(Ok(logged)) } } @@ -139,13 +137,11 @@ pub struct BandwidthSinks { impl BandwidthSinks { /// Returns the average number of bytes that have been downloaded in the period. - #[inline] pub fn average_download_per_sec(&self) -> u64 { self.download.lock().get() } /// Returns the average number of bytes that have been uploaded in the period. - #[inline] pub fn average_upload_per_sec(&self) -> u64 { self.upload.lock().get() } @@ -157,56 +153,43 @@ pub struct BandwidthConnecLogging { sinks: Arc, } -impl Read for BandwidthConnecLogging - where TInner: Read -{ - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let num_bytes = self.inner.read(buf)?; +impl AsyncRead for BandwidthConnecLogging { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let num_bytes = ready!(Pin::new(&mut self.inner).poll_read(cx, buf))?; self.sinks.download.lock().inject(num_bytes); - Ok(num_bytes) + Poll::Ready(Ok(num_bytes)) + } + + fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll> { + let num_bytes = ready!(Pin::new(&mut self.inner).poll_read_vectored(cx, bufs))?; + self.sinks.download.lock().inject(num_bytes); + Poll::Ready(Ok(num_bytes)) } } -impl tokio_io::AsyncRead for BandwidthConnecLogging - where TInner: tokio_io::AsyncRead -{ - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - self.inner.read_buf(buf) - } -} - -impl Write for BandwidthConnecLogging - where TInner: Write -{ - #[inline] - fn write(&mut self, buf: &[u8]) -> io::Result { - let num_bytes = self.inner.write(buf)?; +impl AsyncWrite for BandwidthConnecLogging { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let num_bytes = ready!(Pin::new(&mut self.inner).poll_write(cx, buf))?; self.sinks.upload.lock().inject(num_bytes); - Ok(num_bytes) + Poll::Ready(Ok(num_bytes)) } - #[inline] - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() + fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll> { + let num_bytes = ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, bufs))?; + self.sinks.upload.lock().inject(num_bytes); + Poll::Ready(Ok(num_bytes)) } -} -impl tokio_io::AsyncWrite for BandwidthConnecLogging - where TInner: tokio_io::AsyncWrite -{ - #[inline] - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.shutdown() + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) } } /// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now. -#[inline] fn current_second() -> u32 { lazy_static! { static ref EPOCH: Instant = Instant::now(); @@ -267,7 +250,6 @@ impl BandwidthSink { self.bytes.remove(0); self.bytes.push(0); } - self.latest_update = current_second; } } diff --git a/src/lib.rs b/src/lib.rs index 43c26d41..28cb9dc6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,8 +158,6 @@ pub use futures; pub use multiaddr; #[doc(inline)] pub use multihash; -pub use tokio_io; -pub use tokio_codec; #[doc(inline)] pub use libp2p_core as core; @@ -229,7 +227,7 @@ use std::{error, io, time::Duration}; /// > **Note**: This `Transport` is not suitable for production usage, as its implementation /// > reserves the right to support additional protocols or remove deprecated protocols. pub fn build_development_transport(keypair: identity::Keypair) - -> impl Transport> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone + -> io::Result> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone> { build_tcp_ws_secio_mplex_yamux(keypair) } @@ -241,14 +239,14 @@ pub fn build_development_transport(keypair: identity::Keypair) /// /// > **Note**: If you ever need to express the type of this `Transport`. pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair) - -> impl Transport> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone + -> io::Result> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone> { - CommonTransport::new() + Ok(CommonTransport::new()? .upgrade(core::upgrade::Version::V1) .authenticate(secio::SecioConfig::new(keypair)) .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) - .timeout(Duration::from_secs(20)) + .timeout(Duration::from_secs(20))) } /// Implementation of `Transport` that supports the most common protocols. @@ -276,27 +274,27 @@ struct CommonTransportInner { impl CommonTransport { /// Initializes the `CommonTransport`. #[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] - pub fn new() -> CommonTransport { + pub fn new() -> io::Result { let tcp = tcp::TcpConfig::new().nodelay(true); - let transport = dns::DnsConfig::new(tcp); + let transport = dns::DnsConfig::new(tcp)?; #[cfg(feature = "libp2p-websocket")] let transport = { let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) }; - CommonTransport { + Ok(CommonTransport { inner: CommonTransportInner { inner: transport } - } + }) } /// Initializes the `CommonTransport`. #[cfg(any(target_os = "emscripten", target_os = "unknown"))] - pub fn new() -> CommonTransport { + pub fn new() -> io::Result { let inner = core::transport::dummy::DummyTransport::new(); - CommonTransport { + Ok(CommonTransport { inner: CommonTransportInner { inner } - } + }) } } diff --git a/src/simple.rs b/src/simple.rs index 2395fb37..b61f2e25 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -20,9 +20,8 @@ use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use bytes::Bytes; -use futures::{future::FromErr, prelude::*}; -use std::{iter, io::Error as IoError, sync::Arc}; -use tokio_io::{AsyncRead, AsyncWrite}; +use futures::prelude::*; +use std::{iter, sync::Arc}; /// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols. #[derive(Debug)] @@ -35,7 +34,6 @@ pub struct SimpleProtocol { impl SimpleProtocol { /// Builds a `SimpleProtocol`. - #[inline] pub fn new(info: N, upgrade: F) -> SimpleProtocol where N: Into, @@ -48,7 +46,6 @@ impl SimpleProtocol { } impl Clone for SimpleProtocol { - #[inline] fn clone(&self) -> Self { SimpleProtocol { info: self.info.clone(), @@ -61,42 +58,39 @@ impl UpgradeInfo for SimpleProtocol { type Info = Bytes; type InfoIter = iter::Once; - #[inline] fn protocol_info(&self) -> Self::InfoIter { iter::once(self.info.clone()) } } -impl InboundUpgrade for SimpleProtocol +impl InboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, F: Fn(Negotiated) -> O, - O: IntoFuture + O: Future> + Unpin { - type Output = O::Item; - type Error = IoError; - type Future = FromErr; + type Output = A; + type Error = E; + type Future = O; - #[inline] fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; - upgrade(socket).into_future().from_err() + upgrade(socket) } } -impl OutboundUpgrade for SimpleProtocol +impl OutboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, F: Fn(Negotiated) -> O, - O: IntoFuture + O: Future> + Unpin { - type Output = O::Item; - type Error = IoError; - type Future = FromErr; + type Output = A; + type Error = E; + type Future = O; - #[inline] fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; - upgrade(socket).into_future().from_err() + upgrade(socket) } } diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 95a1db9e..63d423ea 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -33,14 +33,14 @@ //! replaced with respectively an `/ip4/` or an `/ip6/` component. //! -use futures::{prelude::*, channel::oneshot}; +use futures::{prelude::*, channel::oneshot, future::BoxFuture}; use libp2p_core::{ Transport, multiaddr::{Protocol, Multiaddr}, transport::{TransportError, ListenerEvent} }; use log::{error, debug, trace}; -use std::{error, fmt, io, net::ToSocketAddrs, pin::Pin}; +use std::{error, fmt, io, net::ToSocketAddrs}; /// Represents the configuration for a DNS transport capability of libp2p. /// @@ -90,8 +90,9 @@ where impl Transport for DnsConfig where - T: Transport + 'static, - T::Error: 'static, + T: Transport + Send + 'static, + T::Error: Send, + T::Dial: Send { type Output = T::Output; type Error = DnsErr; @@ -102,7 +103,7 @@ where type ListenerUpgrade = future::MapErr Self::Error>; type Dial = future::Either< future::MapErr Self::Error>, - Pin>>> + BoxFuture<'static, Result> >; fn listen_on(self, addr: Multiaddr) -> Result> { @@ -166,21 +167,21 @@ where }) .collect::>(); - let inner = self.inner; - Ok(future::Either::Right(Box::pin(async { - let addr = addr; - let outcome: Vec<_> = resolve_futs.collect().await; - let outcome = outcome.into_iter().collect::, _>>()?; - let outcome = outcome.into_iter().collect::(); - debug!("DNS resolution outcome: {} => {}", addr, outcome); + let future = resolve_futs.collect::>() + .then(move |outcome| async move { + let outcome = outcome.into_iter().collect::, _>>()?; + let outcome = outcome.into_iter().collect::(); + debug!("DNS resolution outcome: {} => {}", addr, outcome); - match inner.dial(outcome) { - Ok(d) => d.await.map_err(DnsErr::Underlying), - Err(TransportError::MultiaddrNotSupported(_addr)) => - Err(DnsErr::MultiaddrNotSupported), - Err(TransportError::Other(err)) => Err(DnsErr::Underlying(err)), - } - }) as Pin>)) + match self.inner.dial(outcome) { + Ok(d) => d.await.map_err(DnsErr::Underlying), + Err(TransportError::MultiaddrNotSupported(_addr)) => + Err(DnsErr::MultiaddrNotSupported), + Err(TransportError::Other(err)) => Err(DnsErr::Underlying(err)) + } + }); + + Ok(future.boxed().right_future()) } } @@ -231,14 +232,13 @@ where TErr: error::Error + 'static #[cfg(test)] mod tests { use super::DnsConfig; - use futures::prelude::*; + use futures::{future::BoxFuture, prelude::*, stream::BoxStream}; use libp2p_core::{ Transport, multiaddr::{Protocol, Multiaddr}, transport::ListenerEvent, transport::TransportError, }; - use std::pin::Pin; #[test] fn basic_resolve() { @@ -248,9 +248,9 @@ mod tests { impl Transport for CustomTransport { type Output = (); type Error = std::io::Error; - type Listener = Pin, Self::Error>>>>; - type ListenerUpgrade = Pin>>>; - type Dial = Pin>>>; + type Listener = BoxStream<'static, Result, Self::Error>>; + type ListenerUpgrade = BoxFuture<'static, Result>; + type Dial = BoxFuture<'static, Result>; fn listen_on(self, _: Multiaddr) -> Result> { unreachable!()