Change Transport::Dial to be Future and not IntoFuture (#193)

This commit is contained in:
Pierre Krieger
2018-05-17 13:09:22 +02:00
committed by GitHub
parent 5c1890e66a
commit cb800624f5
8 changed files with 19 additions and 35 deletions

View File

@ -179,7 +179,7 @@ where
debug!(target: "libp2p-core", "No existing connection to {}; dialing", addr); debug!(target: "libp2p-core", "No existing connection to {}; dialing", addr);
match inner.dial(addr.clone()) { match inner.dial(addr.clone()) {
Ok(dial) => { Ok(dial) => {
let future = dial.into_future().and_then(move |(muxer, addr)| { let future = dial.and_then(move |(muxer, addr)| {
muxer.clone().outbound().and_then(move |substream| { muxer.clone().outbound().and_then(move |substream| {
if let Some(s) = substream { if let Some(s) = substream {
// Replace the active connection because we are the most recent. // Replace the active connection because we are the most recent.

View File

@ -123,8 +123,7 @@ where
match transport.dial(multiaddr.clone()) { match transport.dial(multiaddr.clone()) {
Ok(dial) => { Ok(dial) => {
let dial = Box::new( let dial = Box::new(
dial.into_future() dial.map(|(d, client_addr)| (d.into(), client_addr)),
.map(|(d, client_addr)| (d.into(), client_addr)),
) as Box<Future<Item = _, Error = _>>; ) as Box<Future<Item = _, Error = _>>;
// Ignoring errors if the receiver has been closed, because in that situation // Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway. // nothing is going to be processed anyway.
@ -156,7 +155,7 @@ where
match transport.dial(multiaddr) { match transport.dial(multiaddr) {
Ok(dial) => { Ok(dial) => {
let dial = Box::new(dial.into_future().and_then(|(d, m)| and_then(d, m))) as Box<_>; let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>;
// Ignoring errors if the receiver has been closed, because in that situation // Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway. // nothing is going to be processed anyway.
let _ = self.new_toprocess.unbounded_send(dial); let _ = self.new_toprocess.unbounded_send(dial);

View File

@ -85,7 +85,7 @@ where
let upgrade = self.upgrade; let upgrade = self.upgrade;
let dialed_fut = match self.transport.dial(addr.clone()) { let dialed_fut = match self.transport.dial(addr.clone()) {
Ok(f) => f.into_future(), Ok(f) => f,
Err((trans, addr)) => { Err((trans, addr)) => {
let builder = AndThen { let builder = AndThen {
transport: trans, transport: trans,

View File

@ -59,12 +59,12 @@ where
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let (first, addr) = match self.0.dial(addr) { let (first, addr) = match self.0.dial(addr) {
Ok(connec) => return Ok(EitherListenUpgrade::First(connec.into_future())), Ok(connec) => return Ok(EitherListenUpgrade::First(connec)),
Err(err) => err, Err(err) => err,
}; };
match self.1.dial(addr) { match self.1.dial(addr) {
Ok(connec) => Ok(EitherListenUpgrade::Second(connec.into_future())), Ok(connec) => Ok(EitherListenUpgrade::Second(connec)),
Err((second, addr)) => Err((OrTransport(first, second), addr)), Err((second, addr)) => Err((OrTransport(first, second), addr)),
} }
} }

View File

@ -76,7 +76,7 @@ pub trait Transport {
type ListenerUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>; type ListenerUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
/// A future which indicates that we are currently dialing to a peer. /// A future which indicates that we are currently dialing to a peer.
type Dial: IntoFuture<Item = (Self::Output, Multiaddr), Error = IoError>; type Dial: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified /// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised /// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised

View File

@ -84,7 +84,7 @@ where
let upgrade = self.upgrade; let upgrade = self.upgrade;
let dialed_fut = match self.transports.dial(addr.clone()) { let dialed_fut = match self.transports.dial(addr.clone()) {
Ok(f) => f.into_future(), Ok(f) => f,
Err((trans, addr)) => { Err((trans, addr)) => {
let builder = UpgradedNode { let builder = UpgradedNode {
transports: trans, transports: trans,

View File

@ -128,7 +128,7 @@ where
if !contains_dns { if !contains_dns {
trace!(target: "libp2p-dns", "Pass-through address without DNS: {}", addr); trace!(target: "libp2p-dns", "Pass-through address without DNS: {}", addr);
return match self.inner.dial(addr) { return match self.inner.dial(addr) {
Ok(d) => Ok(Box::new(d.into_future()) as Box<_>), Ok(d) => Ok(Box::new(d) as Box<_>),
Err((inner, addr)) => Err(( Err((inner, addr)) => Err((
DnsConfig { DnsConfig {
inner, inner,

View File

@ -153,28 +153,6 @@ where
} }
} }
pub struct Dial<T: Transport>(RateLimited<T::Dial>);
impl<T> IntoFuture for Dial<T>
where
T: Transport + 'static,
T::Output: AsyncRead + AsyncWrite,
{
type Future = Box<Future<Item = Self::Item, Error = Self::Error>>;
type Item = (Connection<T::Output>, Multiaddr);
type Error = io::Error;
fn into_future(self) -> Self::Future {
let r = self.0.rlimiter;
let w = self.0.wlimiter;
let future = self.0
.value
.into_future()
.and_then(move |(conn, addr)| Ok((Connection::new(conn, r, w)?, addr)));
Box::new(future)
}
}
impl<T> Transport for RateLimited<T> impl<T> Transport for RateLimited<T>
where where
T: Transport + 'static, T: Transport + 'static,
@ -183,7 +161,7 @@ where
type Output = Connection<T::Output>; type Output = Connection<T::Output>;
type Listener = Listener<T>; type Listener = Listener<T>;
type ListenerUpgrade = ListenerUpgrade<T>; type ListenerUpgrade = ListenerUpgrade<T>;
type Dial = Dial<T>; type Dial = Box<Future<Item = (Connection<T::Output>, Multiaddr), Error = io::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where where
@ -208,10 +186,17 @@ where
{ {
let r = self.rlimiter; let r = self.rlimiter;
let w = self.wlimiter; let w = self.wlimiter;
let r2 = r.clone();
let w2 = w.clone();
self.value self.value
.dial(addr) .dial(addr)
.map(|dial| Dial(RateLimited::from_parts(dial, r.clone(), w.clone()))) .map(move |dial| {
.map_err(|(transport, a)| (RateLimited::from_parts(transport, r, w), a)) let future = dial
.and_then(move |(conn, addr)| Ok((Connection::new(conn, r, w)?, addr)));
Box::new(future) as Box<_>
})
.map_err(|(transport, a)| (RateLimited::from_parts(transport, r2, w2), a))
} }
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {