diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index a1439956..64530441 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -179,7 +179,7 @@ where debug!(target: "libp2p-core", "No existing connection to {}; dialing", addr); match inner.dial(addr.clone()) { Ok(dial) => { - let future = dial.into_future().and_then(move |(muxer, addr)| { + let future = dial.and_then(move |(muxer, addr)| { muxer.clone().outbound().and_then(move |substream| { if let Some(s) = substream { // Replace the active connection because we are the most recent. diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 260e5612..ffa6cf20 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -123,8 +123,7 @@ where match transport.dial(multiaddr.clone()) { Ok(dial) => { let dial = Box::new( - dial.into_future() - .map(|(d, client_addr)| (d.into(), client_addr)), + dial.map(|(d, client_addr)| (d.into(), client_addr)), ) as Box>; // Ignoring errors if the receiver has been closed, because in that situation // nothing is going to be processed anyway. @@ -156,7 +155,7 @@ where match transport.dial(multiaddr) { Ok(dial) => { - let dial = Box::new(dial.into_future().and_then(|(d, m)| and_then(d, m))) as Box<_>; + let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>; // Ignoring errors if the receiver has been closed, because in that situation // nothing is going to be processed anyway. let _ = self.new_toprocess.unbounded_send(dial); diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index b87bc012..c1997bcb 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -85,7 +85,7 @@ where let upgrade = self.upgrade; let dialed_fut = match self.transport.dial(addr.clone()) { - Ok(f) => f.into_future(), + Ok(f) => f, Err((trans, addr)) => { let builder = AndThen { transport: trans, diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index dd981524..69a1265a 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -59,12 +59,12 @@ where fn dial(self, addr: Multiaddr) -> Result { let (first, addr) = match self.0.dial(addr) { - Ok(connec) => return Ok(EitherListenUpgrade::First(connec.into_future())), + Ok(connec) => return Ok(EitherListenUpgrade::First(connec)), Err(err) => err, }; match self.1.dial(addr) { - Ok(connec) => Ok(EitherListenUpgrade::Second(connec.into_future())), + Ok(connec) => Ok(EitherListenUpgrade::Second(connec)), Err((second, addr)) => Err((OrTransport(first, second), addr)), } } diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 6ed9c2ec..af573878 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -76,7 +76,7 @@ pub trait Transport { type ListenerUpgrade: Future; /// A future which indicates that we are currently dialing to a peer. - type Dial: IntoFuture; + type Dial: Future; /// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified /// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 6c8ba0c3..fd1b39d2 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -84,7 +84,7 @@ where let upgrade = self.upgrade; let dialed_fut = match self.transports.dial(addr.clone()) { - Ok(f) => f.into_future(), + Ok(f) => f, Err((trans, addr)) => { let builder = UpgradedNode { transports: trans, diff --git a/dns/src/lib.rs b/dns/src/lib.rs index 17d692b7..313d753c 100644 --- a/dns/src/lib.rs +++ b/dns/src/lib.rs @@ -128,7 +128,7 @@ where if !contains_dns { trace!(target: "libp2p-dns", "Pass-through address without DNS: {}", addr); return match self.inner.dial(addr) { - Ok(d) => Ok(Box::new(d.into_future()) as Box<_>), + Ok(d) => Ok(Box::new(d) as Box<_>), Err((inner, addr)) => Err(( DnsConfig { inner, diff --git a/ratelimit/src/lib.rs b/ratelimit/src/lib.rs index 74512a62..6423afcd 100644 --- a/ratelimit/src/lib.rs +++ b/ratelimit/src/lib.rs @@ -153,28 +153,6 @@ where } } -pub struct Dial(RateLimited); - -impl IntoFuture for Dial -where - T: Transport + 'static, - T::Output: AsyncRead + AsyncWrite, -{ - type Future = Box>; - type Item = (Connection, Multiaddr); - type Error = io::Error; - - fn into_future(self) -> Self::Future { - let r = self.0.rlimiter; - let w = self.0.wlimiter; - let future = self.0 - .value - .into_future() - .and_then(move |(conn, addr)| Ok((Connection::new(conn, r, w)?, addr))); - Box::new(future) - } -} - impl Transport for RateLimited where T: Transport + 'static, @@ -183,7 +161,7 @@ where type Output = Connection; type Listener = Listener; type ListenerUpgrade = ListenerUpgrade; - type Dial = Dial; + type Dial = Box, Multiaddr), Error = io::Error>>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> where @@ -208,10 +186,17 @@ where { let r = self.rlimiter; let w = self.wlimiter; + let r2 = r.clone(); + let w2 = w.clone(); + self.value .dial(addr) - .map(|dial| Dial(RateLimited::from_parts(dial, r.clone(), w.clone()))) - .map_err(|(transport, a)| (RateLimited::from_parts(transport, r, w), a)) + .map(move |dial| { + let future = dial + .and_then(move |(conn, addr)| Ok((Connection::new(conn, r, w)?, addr))); + Box::new(future) as Box<_> + }) + .map_err(|(transport, a)| (RateLimited::from_parts(transport, r2, w2), a)) } fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option {