From acfa1c9c7976bd6c05f5de48bc3687b4612e9756 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 4 Dec 2018 11:24:59 +0100 Subject: [PATCH] Remove some boxed futures. (#718) --- core/src/transport/upgrade.rs | 161 ++++++++++++++++++------ protocols/identify/src/id_transport.rs | 25 ++-- src/simple.rs | 18 ++- transports/dns/src/lib.rs | 164 +++++++++++++++++-------- transports/ratelimit/src/lib.rs | 58 +++++---- transports/uds/src/lib.rs | 57 +++++---- 6 files changed, 321 insertions(+), 162 deletions(-) diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 2193c0ec..d990304f 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -18,11 +18,19 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::prelude::*; +use futures::{future::Either, prelude::*}; use multiaddr::Multiaddr; use crate::{ transport::Transport, - upgrade::{OutboundUpgrade, InboundUpgrade, UpgradeInfo, apply_inbound, apply_outbound, UpgradeError} + upgrade::{ + OutboundUpgrade, + InboundUpgrade, + apply_inbound, + apply_outbound, + UpgradeError, + OutboundUpgradeApply, + InboundUpgradeApply + } }; use tokio_io::{AsyncRead, AsyncWrite}; @@ -38,53 +46,32 @@ impl Upgrade { impl Transport for Upgrade where D: Transport, - D::Dial: Send + 'static, - D::Listener: Send + 'static, - D::ListenerUpgrade: Send + 'static, - D::Output: AsyncRead + AsyncWrite + Send + 'static, + D::Output: AsyncRead + AsyncWrite, U: InboundUpgrade, - U: OutboundUpgrade + Send + Clone + 'static, - ::NamesIter: Send, - ::UpgradeId: Send, - >::Future: Send, - >::Future: Send, + U: OutboundUpgrade + Clone, E: std::error::Error + Send + Sync + 'static { type Output = O; - type Listener = Box + Send>; - type ListenerUpgrade = Box + Send>; - type Dial = Box + Send>; + type Listener = ListenerStream; + type ListenerUpgrade = ListenerUpgradeFuture; + type Dial = DialUpgradeFuture; fn dial(self, addr: Multiaddr) -> Result { - let upgrade = self.upgrade; match self.inner.dial(addr.clone()) { - Ok(outbound) => { - let future = outbound - .and_then(move |x| { - apply_outbound(x, upgrade).map_err(UpgradeError::into_io_error) - }); - Ok(Box::new(future)) - } - Err((dialer, addr)) => Err((Upgrade::new(dialer, upgrade), addr)) + Ok(outbound) => Ok(DialUpgradeFuture { + future: outbound, + upgrade: Either::A(Some(self.upgrade)) + }), + Err((dialer, addr)) => Err((Upgrade::new(dialer, self.upgrade), addr)) } } fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let upgrade = self.upgrade; match self.inner.listen_on(addr) { - Ok((inbound, addr)) => { - let stream = inbound - .map(move |(future, addr)| { - let upgrade = upgrade.clone(); - let future = future - .and_then(move |x| { - apply_inbound(x, upgrade).map_err(UpgradeError::into_io_error) - }); - (Box::new(future) as Box<_>, addr) - }); - Ok((Box::new(stream), addr)) - } - Err((listener, addr)) => Err((Upgrade::new(listener, upgrade), addr)), + Ok((inbound, addr)) => + Ok((ListenerStream { stream: inbound, upgrade: self.upgrade }, addr)), + Err((listener, addr)) => + Err((Upgrade::new(listener, self.upgrade), addr)) } } @@ -92,3 +79,103 @@ where self.inner.nat_traversal(server, observed) } } + +pub struct DialUpgradeFuture +where + T: Future, + T::Item: AsyncRead + AsyncWrite, + U: OutboundUpgrade +{ + future: T, + upgrade: Either, OutboundUpgradeApply> +} + +impl Future for DialUpgradeFuture +where + T: Future, + T::Item: AsyncRead + AsyncWrite, + U: OutboundUpgrade, + U::Error: std::error::Error + Send + Sync + 'static +{ + type Item = U::Output; + type Error = std::io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.upgrade { + Either::A(ref mut up) => { + let x = try_ready!(self.future.poll()); + let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some)."); + Either::B(apply_outbound(x, u)) + } + Either::B(ref mut up) => return up.poll().map_err(UpgradeError::into_io_error) + }; + self.upgrade = next + } + } +} + +pub struct ListenerStream { + stream: T, + upgrade: U +} + +impl Stream for ListenerStream +where + T: Stream, + F: Future, + F::Item: AsyncRead + AsyncWrite, + U: InboundUpgrade + Clone +{ + type Item = (ListenerUpgradeFuture, Multiaddr); + type Error = T::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.stream.poll()) { + Some((x, a)) => { + let f = ListenerUpgradeFuture { + future: x, + upgrade: Either::A(Some(self.upgrade.clone())) + }; + Ok(Async::Ready(Some((f, a)))) + } + None => Ok(Async::Ready(None)) + } + } +} + +pub struct ListenerUpgradeFuture +where + T: Future, + T::Item: AsyncRead + AsyncWrite, + U: InboundUpgrade +{ + future: T, + upgrade: Either, InboundUpgradeApply> +} + +impl Future for ListenerUpgradeFuture +where + T: Future, + T::Item: AsyncRead + AsyncWrite, + U: InboundUpgrade, + U::Error: std::error::Error + Send + Sync + 'static +{ + type Item = U::Output; + type Error = std::io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.upgrade { + Either::A(ref mut up) => { + let x = try_ready!(self.future.poll()); + let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::A(Some)."); + Either::B(apply_inbound(x, u)) + } + Either::B(ref mut up) => return up.poll().map_err(UpgradeError::into_io_error) + }; + self.upgrade = next + } + } +} + diff --git a/protocols/identify/src/id_transport.rs b/protocols/identify/src/id_transport.rs index dbbeeab1..82066ab1 100644 --- a/protocols/identify/src/id_transport.rs +++ b/protocols/identify/src/id_transport.rs @@ -20,7 +20,7 @@ //! Contains the `IdentifyTransport` type. -use futures::prelude::*; +use futures::{future, prelude::*, stream, AndThen, MapErr}; use libp2p_core::{ Multiaddr, PeerId, PublicKey, muxing, Transport, upgrade::{self, OutboundUpgradeApply, UpgradeError} @@ -56,21 +56,20 @@ impl IdentifyTransport { } } -// TODO: don't use boxes impl Transport for IdentifyTransport where TTrans: Transport, TMuxer: muxing::StreamMuxer + Send + Sync + 'static, // TODO: remove unnecessary bounds TMuxer::Substream: Send + Sync + 'static, // TODO: remove unnecessary bounds - TMuxer::OutboundSubstream: Send + 'static, // TODO: remove unnecessary bounds - TTrans::Dial: Send + Sync + 'static, - TTrans::Listener: Send + 'static, - TTrans::ListenerUpgrade: Send + 'static, { type Output = (PeerId, TMuxer); - type Listener = Box + Send>; - type ListenerUpgrade = Box + Send>; - type Dial = Box + Send>; + type Listener = stream::Empty<(Self::ListenerUpgrade, Multiaddr), IoError>; + type ListenerUpgrade = future::Empty; + type Dial = AndThen< + TTrans::Dial, + MapErr, fn(UpgradeError) -> IoError>, + fn(TMuxer) -> MapErr, fn(UpgradeError) -> IoError> + >; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -90,11 +89,9 @@ where } }; - let dial = dial.and_then(move |muxer| { + Ok(dial.and_then(|muxer| { IdRetriever::new(muxer, IdentifyProtocolConfig).map_err(|e| e.into_io_error()) - }); - - Ok(Box::new(dial) as Box<_>) + })) } #[inline] @@ -105,7 +102,7 @@ where /// Implementation of `Future` that asks the remote of its `PeerId`. // TODO: remove unneeded bounds -struct IdRetriever +pub struct IdRetriever where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, TMuxer::Substream: Send, { diff --git a/src/simple.rs b/src/simple.rs index 5a99f38e..94332c93 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -20,7 +20,7 @@ use bytes::Bytes; use core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use futures::prelude::*; +use futures::{future::FromErr, prelude::*}; use std::{iter, io::Error as IoError, sync::Arc}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -71,18 +71,16 @@ impl InboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, F: Fn(C) -> O, - O: IntoFuture, - O::Future: Send + 'static, + O: IntoFuture { type Output = O::Item; type Error = IoError; - type Future = Box + Send>; + type Future = FromErr; #[inline] fn upgrade_inbound(self, socket: C, _: Self::UpgradeId) -> Self::Future { let upgrade = &self.upgrade; - let fut = upgrade(socket).into_future().from_err(); - Box::new(fut) as Box<_> + upgrade(socket).into_future().from_err() } } @@ -90,17 +88,15 @@ impl OutboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, F: Fn(C) -> O, - O: IntoFuture, - O::Future: Send + 'static, + O: IntoFuture { type Output = O::Item; type Error = IoError; - type Future = Box + Send>; + type Future = FromErr; #[inline] fn upgrade_outbound(self, socket: C, _: Self::UpgradeId) -> Self::Future { let upgrade = &self.upgrade; - let fut = upgrade(socket).into_future().from_err(); - Box::new(fut) as Box<_> + upgrade(socket).into_future().from_err() } } diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index b3ab7591..97696155 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -41,7 +41,7 @@ extern crate multiaddr; extern crate tokio_dns; extern crate tokio_io; -use futures::future::{self, Future}; +use futures::{future::{self, Either, FutureResult, JoinAll}, prelude::*, try_ready}; use log::Level; use multiaddr::{Protocol, Multiaddr}; use std::fmt; @@ -94,13 +94,17 @@ where impl Transport for DnsConfig where - T: Transport + Send + 'static, // TODO: 'static :-/ - T::Dial: Send, + T: Transport { type Output = T::Output; type Listener = T::Listener; type ListenerUpgrade = T::ListenerUpgrade; - type Dial = Box + Send>; + type Dial = Either>>, + FutureResult, IoError>>>> + >> + >; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -126,7 +130,7 @@ where if !contains_dns { trace!("Pass-through address without DNS: {}", addr); return match self.inner.dial(addr) { - Ok(d) => Ok(Box::new(d) as Box<_>), + Ok(d) => Ok(Either::A(d)), Err((inner, addr)) => Err(( DnsConfig { inner, @@ -142,33 +146,33 @@ where trace!("Dialing address with DNS: {}", addr); let resolve_iters = addr.iter() .map(move |cmp| match cmp { - Protocol::Dns4(ref name) => { - future::Either::A(resolve_dns(name, &resolver, ResolveTy::Dns4)) - } - Protocol::Dns6(ref name) => { - future::Either::A(resolve_dns(name, &resolver, ResolveTy::Dns6)) - } - cmp => future::Either::B(future::ok(cmp.acquire())), + Protocol::Dns4(ref name) => + Either::A(ResolveFuture { + name: if log_enabled!(Level::Trace) { + Some(name.clone().into_owned()) + } else { + None + }, + inner: resolver.resolve(name), + ty: ResolveTy::Dns4 + }), + Protocol::Dns6(ref name) => + Either::A(ResolveFuture { + name: if log_enabled!(Level::Trace) { + Some(name.clone().into_owned()) + } else { + None + }, + inner: resolver.resolve(name), + ty: ResolveTy::Dns6 + }), + cmp => Either::B(future::ok(cmp.acquire())) }) .collect::>() .into_iter(); - let new_addr = future::join_all(resolve_iters).map(move |outcome| { - let outcome: Multiaddr = outcome.into_iter().collect(); - debug!("DNS resolution outcome: {} => {}", addr, outcome); - outcome - }); - - let inner = self.inner; - let future = new_addr - .and_then(move |addr| { - inner - .dial(addr) - .map_err(|_| IoError::new(IoErrorKind::Other, "multiaddr not supported")) - }) - .flatten(); - - Ok(Box::new(future) as Box<_>) + let new_addr = JoinFuture { addr, future: future::join_all(resolve_iters) }; + Ok(Either::B(DialFuture { trans: Some(self.inner), future: Either::A(new_addr) })) } #[inline] @@ -186,39 +190,91 @@ enum ResolveTy { Dns6, } -// Resolve a DNS name and returns a future with the result. -fn resolve_dns<'a>( - name: &str, - resolver: &CpuPoolResolver, - ty: ResolveTy, -) -> impl Future, Error = IoError> { - let debug_name = if log_enabled!(Level::Trace) { - Some(name.to_owned()) - } else { - None - }; +/// Future, performing DNS resolution. +#[derive(Debug)] +pub struct ResolveFuture { + name: Option, + inner: T, + ty: ResolveTy +} - resolver.resolve(name).and_then(move |addrs| { - if log_enabled!(Level::Trace) { - trace!( - "DNS component resolution: {} => {:?}", - debug_name.expect("trace log level was enabled"), - addrs - ); - } +impl Future for ResolveFuture +where + T: Future, Error = IoError> +{ + type Item = Protocol<'static>; + type Error = IoError; - addrs + fn poll(&mut self) -> Poll { + let ty = self.ty; + let addrs = try_ready!(self.inner.poll()); + trace!("DNS component resolution: {:?} => {:?}", self.name, addrs); + let mut addrs = addrs .into_iter() .filter_map(move |addr| match (addr, ty) { (IpAddr::V4(addr), ResolveTy::Dns4) => Some(Protocol::Ip4(addr)), (IpAddr::V6(addr), ResolveTy::Dns6) => Some(Protocol::Ip6(addr)), _ => None, - }) - .next() - .ok_or_else(|| { - IoError::new(IoErrorKind::Other, "couldn't find any relevant IP address") - }) - }) + }); + match addrs.next() { + Some(a) => Ok(Async::Ready(a)), + None => Err(IoError::new(IoErrorKind::Other, "couldn't find any relevant IP address")) + } + } +} + +/// Build final multi-address from resolving futures. +#[derive(Debug)] +pub struct JoinFuture { + addr: Multiaddr, + future: T +} + +impl Future for JoinFuture +where + T: Future>, Error = IoError> +{ + type Item = Multiaddr; + type Error = IoError; + + fn poll(&mut self) -> Poll { + let outcome = try_ready!(self.future.poll()); + let outcome: Multiaddr = outcome.into_iter().collect(); + debug!("DNS resolution outcome: {} => {}", self.addr, outcome); + Ok(Async::Ready(outcome)) + } +} + +/// Future, dialing the resolved multi-address. +#[derive(Debug)] +pub struct DialFuture { + trans: Option, + future: Either, +} + +impl Future for DialFuture +where + T: Transport, + F: Future +{ + type Item = T::Output; + type Error = IoError; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.future { + Either::A(ref mut f) => { + let addr = try_ready!(f.poll()); + match self.trans.take().unwrap().dial(addr) { + Ok(dial) => Either::B(dial), + Err(_) => return Err(IoError::new(IoErrorKind::Other, "multiaddr not supported")) + } + } + Either::B(ref mut f) => return f.poll() + }; + self.future = next + } + } } #[cfg(test)] diff --git a/transports/ratelimit/src/lib.rs b/transports/ratelimit/src/lib.rs index 215806a5..a83be87f 100644 --- a/transports/ratelimit/src/lib.rs +++ b/transports/ratelimit/src/lib.rs @@ -140,8 +140,8 @@ pub struct ListenerUpgrade(RateLimited); impl Future for ListenerUpgrade where - T: Transport + 'static, - T::Output: AsyncRead + AsyncWrite, + T: Transport, + T::Output: AsyncRead + AsyncWrite { type Item = Connection; type Error = io::Error; @@ -156,19 +156,15 @@ where impl Transport for RateLimited where - T: Transport + 'static, - T::Dial: Send, - T::Output: AsyncRead + AsyncWrite + Send, + T: Transport, + T::Output: AsyncRead + AsyncWrite { type Output = Connection; type Listener = Listener; type ListenerUpgrade = ListenerUpgrade; - type Dial = Box, Error = io::Error> + Send>; + type Dial = DialFuture; - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> - where - Self: Sized, - { + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { let r = self.rlimiter; let w = self.wlimiter; self.value @@ -182,26 +178,38 @@ where .map_err(|(transport, a)| (RateLimited::from_parts(transport, r, w), a)) } - fn dial(self, addr: Multiaddr) -> Result - where - Self: Sized, - { + fn dial(self, addr: Multiaddr) -> Result { let r = self.rlimiter; let w = self.wlimiter; - let r2 = r.clone(); - let w2 = w.clone(); - - self.value - .dial(addr) - .map(move |dial| { - let future = dial - .and_then(move |conn| Ok(Connection::new(conn, r, w)?)); - Box::new(future) as Box<_> - }) - .map_err(|(transport, a)| (RateLimited::from_parts(transport, r2, w2), a)) + match self.value.dial(addr) { + Ok(dial) => Ok(DialFuture { r, w, f: dial }), + Err((t, a)) => Err((RateLimited::from_parts(t, r, w), a)) + } } fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.value.nat_traversal(server, observed) } } + +/// Future to avoid boxing. +pub struct DialFuture { + r: Limiter, + w: Limiter, + f: T +} + +impl Future for DialFuture +where + T: Future, + T::Item: AsyncRead + AsyncWrite, + T::Error: From +{ + type Item = Connection; + type Error = T::Error; + + fn poll(&mut self) -> Poll { + let item = try_ready!(self.f.poll()); + Ok(Async::Ready(Connection::new(item, self.r.clone(), self.w.clone())?)) + } +} diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index 9f6cb8f7..7cae5ddb 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -60,7 +60,7 @@ extern crate tokio_io; #[cfg(test)] extern crate tokio; -use futures::future::{self, Future, FutureResult}; +use futures::{future::{self, FutureResult}, prelude::*, try_ready}; use futures::stream::Stream; use multiaddr::{Protocol, Multiaddr}; use std::io::Error as IoError; @@ -86,9 +86,9 @@ impl UdsConfig { impl Transport for UdsConfig { type Output = UnixStream; - type Listener = Box + Send + Sync>; + type Listener = ListenerStream; type ListenerUpgrade = FutureResult; - type Dial = Box + Send + Sync>; // TODO: name this type + type Dial = tokio_uds::ConnectFuture; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { if let Ok(path) = multiaddr_to_path(&addr) { @@ -96,23 +96,16 @@ impl Transport for UdsConfig { // We need to build the `Multiaddr` to return from this function. If an error happened, // just return the original multiaddr. match listener { - Ok(_) => {}, + Ok(listener) => { + debug!("Now listening on {}", addr); + let future = ListenerStream { + stream: listener.incoming(), + addr: addr.clone() + }; + Ok((future, addr)) + } Err(_) => return Err((self, addr)), - }; - - debug!("Now listening on {}", addr); - let new_addr = addr.clone(); - - let future = future::result(listener) - .map(move |listener| { - // Pull out a stream of sockets for incoming connections - listener.incoming().map(move |sock| { - debug!("Incoming connection on {}", addr); - (future::ok(sock), addr.clone()) - }) - }) - .flatten_stream(); - Ok((Box::new(future), new_addr)) + } } else { Err((self, addr)) } @@ -121,8 +114,7 @@ impl Transport for UdsConfig { fn dial(self, addr: Multiaddr) -> Result { if let Ok(path) = multiaddr_to_path(&addr) { debug!("Dialing {}", addr); - let fut = UnixStream::connect(&path); - Ok(Box::new(fut) as Box<_>) + Ok(UnixStream::connect(&path)) } else { Err((self, addr)) } @@ -162,6 +154,29 @@ fn multiaddr_to_path(addr: &Multiaddr) -> Result { Ok(out) } +pub struct ListenerStream { + stream: T, + addr: Multiaddr +} + +impl Stream for ListenerStream +where + T: Stream +{ + type Item = (FutureResult, Multiaddr); + type Error = T::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.stream.poll()) { + Some(item) => { + debug!("incoming connection on {}", self.addr); + Ok(Async::Ready(Some((future::ok(item), self.addr.clone())))) + } + None => Ok(Async::Ready(None)) + } + } +} + #[cfg(test)] mod tests { use tokio::runtime::current_thread::Runtime;