mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Remove some boxed futures. (#718)
This commit is contained in:
parent
a5766fdfac
commit
acfa1c9c79
@ -18,11 +18,19 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::{future::Either, prelude::*};
|
||||||
use multiaddr::Multiaddr;
|
use multiaddr::Multiaddr;
|
||||||
use crate::{
|
use crate::{
|
||||||
transport::Transport,
|
transport::Transport,
|
||||||
upgrade::{OutboundUpgrade, InboundUpgrade, UpgradeInfo, apply_inbound, apply_outbound, UpgradeError}
|
upgrade::{
|
||||||
|
OutboundUpgrade,
|
||||||
|
InboundUpgrade,
|
||||||
|
apply_inbound,
|
||||||
|
apply_outbound,
|
||||||
|
UpgradeError,
|
||||||
|
OutboundUpgradeApply,
|
||||||
|
InboundUpgradeApply
|
||||||
|
}
|
||||||
};
|
};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
@ -38,53 +46,32 @@ impl<T, U> Upgrade<T, U> {
|
|||||||
impl<D, U, O, E> Transport for Upgrade<D, U>
|
impl<D, U, O, E> Transport for Upgrade<D, U>
|
||||||
where
|
where
|
||||||
D: Transport,
|
D: Transport,
|
||||||
D::Dial: Send + 'static,
|
D::Output: AsyncRead + AsyncWrite,
|
||||||
D::Listener: Send + 'static,
|
|
||||||
D::ListenerUpgrade: Send + 'static,
|
|
||||||
D::Output: AsyncRead + AsyncWrite + Send + 'static,
|
|
||||||
U: InboundUpgrade<D::Output, Output = O, Error = E>,
|
U: InboundUpgrade<D::Output, Output = O, Error = E>,
|
||||||
U: OutboundUpgrade<D::Output, Output = O, Error = E> + Send + Clone + 'static,
|
U: OutboundUpgrade<D::Output, Output = O, Error = E> + Clone,
|
||||||
<U as UpgradeInfo>::NamesIter: Send,
|
|
||||||
<U as UpgradeInfo>::UpgradeId: Send,
|
|
||||||
<U as InboundUpgrade<D::Output>>::Future: Send,
|
|
||||||
<U as OutboundUpgrade<D::Output>>::Future: Send,
|
|
||||||
E: std::error::Error + Send + Sync + 'static
|
E: std::error::Error + Send + Sync + 'static
|
||||||
{
|
{
|
||||||
type Output = O;
|
type Output = O;
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = std::io::Error> + Send>;
|
type Listener = ListenerStream<D::Listener, U>;
|
||||||
type ListenerUpgrade = Box<Future<Item = Self::Output, Error = std::io::Error> + Send>;
|
type ListenerUpgrade = ListenerUpgradeFuture<D::ListenerUpgrade, U>;
|
||||||
type Dial = Box<Future<Item = Self::Output, Error = std::io::Error> + Send>;
|
type Dial = DialUpgradeFuture<D::Dial, U>;
|
||||||
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
let upgrade = self.upgrade;
|
|
||||||
match self.inner.dial(addr.clone()) {
|
match self.inner.dial(addr.clone()) {
|
||||||
Ok(outbound) => {
|
Ok(outbound) => Ok(DialUpgradeFuture {
|
||||||
let future = outbound
|
future: outbound,
|
||||||
.and_then(move |x| {
|
upgrade: Either::A(Some(self.upgrade))
|
||||||
apply_outbound(x, upgrade).map_err(UpgradeError::into_io_error)
|
}),
|
||||||
});
|
Err((dialer, addr)) => Err((Upgrade::new(dialer, self.upgrade), addr))
|
||||||
Ok(Box::new(future))
|
|
||||||
}
|
|
||||||
Err((dialer, addr)) => Err((Upgrade::new(dialer, upgrade), addr))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
let upgrade = self.upgrade;
|
|
||||||
match self.inner.listen_on(addr) {
|
match self.inner.listen_on(addr) {
|
||||||
Ok((inbound, addr)) => {
|
Ok((inbound, addr)) =>
|
||||||
let stream = inbound
|
Ok((ListenerStream { stream: inbound, upgrade: self.upgrade }, addr)),
|
||||||
.map(move |(future, addr)| {
|
Err((listener, addr)) =>
|
||||||
let upgrade = upgrade.clone();
|
Err((Upgrade::new(listener, self.upgrade), addr))
|
||||||
let future = future
|
|
||||||
.and_then(move |x| {
|
|
||||||
apply_inbound(x, upgrade).map_err(UpgradeError::into_io_error)
|
|
||||||
});
|
|
||||||
(Box::new(future) as Box<_>, addr)
|
|
||||||
});
|
|
||||||
Ok((Box::new(stream), addr))
|
|
||||||
}
|
|
||||||
Err((listener, addr)) => Err((Upgrade::new(listener, upgrade), addr)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,3 +79,103 @@ where
|
|||||||
self.inner.nat_traversal(server, observed)
|
self.inner.nat_traversal(server, observed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct DialUpgradeFuture<T, U>
|
||||||
|
where
|
||||||
|
T: Future,
|
||||||
|
T::Item: AsyncRead + AsyncWrite,
|
||||||
|
U: OutboundUpgrade<T::Item>
|
||||||
|
{
|
||||||
|
future: T,
|
||||||
|
upgrade: Either<Option<U>, OutboundUpgradeApply<T::Item, U>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U> Future for DialUpgradeFuture<T, U>
|
||||||
|
where
|
||||||
|
T: Future<Error = std::io::Error>,
|
||||||
|
T::Item: AsyncRead + AsyncWrite,
|
||||||
|
U: OutboundUpgrade<T::Item>,
|
||||||
|
U::Error: std::error::Error + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
type Item = U::Output;
|
||||||
|
type Error = std::io::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
loop {
|
||||||
|
let next = match self.upgrade {
|
||||||
|
Either::A(ref mut up) => {
|
||||||
|
let x = try_ready!(self.future.poll());
|
||||||
|
let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some).");
|
||||||
|
Either::B(apply_outbound(x, u))
|
||||||
|
}
|
||||||
|
Either::B(ref mut up) => return up.poll().map_err(UpgradeError::into_io_error)
|
||||||
|
};
|
||||||
|
self.upgrade = next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ListenerStream<T, U> {
|
||||||
|
stream: T,
|
||||||
|
upgrade: U
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, F> Stream for ListenerStream<T, U>
|
||||||
|
where
|
||||||
|
T: Stream<Item = (F, Multiaddr)>,
|
||||||
|
F: Future,
|
||||||
|
F::Item: AsyncRead + AsyncWrite,
|
||||||
|
U: InboundUpgrade<F::Item> + Clone
|
||||||
|
{
|
||||||
|
type Item = (ListenerUpgradeFuture<F, U>, Multiaddr);
|
||||||
|
type Error = T::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
|
match try_ready!(self.stream.poll()) {
|
||||||
|
Some((x, a)) => {
|
||||||
|
let f = ListenerUpgradeFuture {
|
||||||
|
future: x,
|
||||||
|
upgrade: Either::A(Some(self.upgrade.clone()))
|
||||||
|
};
|
||||||
|
Ok(Async::Ready(Some((f, a))))
|
||||||
|
}
|
||||||
|
None => Ok(Async::Ready(None))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ListenerUpgradeFuture<T, U>
|
||||||
|
where
|
||||||
|
T: Future,
|
||||||
|
T::Item: AsyncRead + AsyncWrite,
|
||||||
|
U: InboundUpgrade<T::Item>
|
||||||
|
{
|
||||||
|
future: T,
|
||||||
|
upgrade: Either<Option<U>, InboundUpgradeApply<T::Item, U>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U> Future for ListenerUpgradeFuture<T, U>
|
||||||
|
where
|
||||||
|
T: Future<Error = std::io::Error>,
|
||||||
|
T::Item: AsyncRead + AsyncWrite,
|
||||||
|
U: InboundUpgrade<T::Item>,
|
||||||
|
U::Error: std::error::Error + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
type Item = U::Output;
|
||||||
|
type Error = std::io::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
loop {
|
||||||
|
let next = match self.upgrade {
|
||||||
|
Either::A(ref mut up) => {
|
||||||
|
let x = try_ready!(self.future.poll());
|
||||||
|
let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::A(Some).");
|
||||||
|
Either::B(apply_inbound(x, u))
|
||||||
|
}
|
||||||
|
Either::B(ref mut up) => return up.poll().map_err(UpgradeError::into_io_error)
|
||||||
|
};
|
||||||
|
self.upgrade = next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
//! Contains the `IdentifyTransport` type.
|
//! Contains the `IdentifyTransport` type.
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::{future, prelude::*, stream, AndThen, MapErr};
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
Multiaddr, PeerId, PublicKey, muxing, Transport,
|
Multiaddr, PeerId, PublicKey, muxing, Transport,
|
||||||
upgrade::{self, OutboundUpgradeApply, UpgradeError}
|
upgrade::{self, OutboundUpgradeApply, UpgradeError}
|
||||||
@ -56,21 +56,20 @@ impl<TTrans> IdentifyTransport<TTrans> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: don't use boxes
|
|
||||||
impl<TTrans, TMuxer> Transport for IdentifyTransport<TTrans>
|
impl<TTrans, TMuxer> Transport for IdentifyTransport<TTrans>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = TMuxer>,
|
TTrans: Transport<Output = TMuxer>,
|
||||||
TMuxer: muxing::StreamMuxer + Send + Sync + 'static, // TODO: remove unnecessary bounds
|
TMuxer: muxing::StreamMuxer + Send + Sync + 'static, // TODO: remove unnecessary bounds
|
||||||
TMuxer::Substream: Send + Sync + 'static, // TODO: remove unnecessary bounds
|
TMuxer::Substream: Send + Sync + 'static, // TODO: remove unnecessary bounds
|
||||||
TMuxer::OutboundSubstream: Send + 'static, // TODO: remove unnecessary bounds
|
|
||||||
TTrans::Dial: Send + Sync + 'static,
|
|
||||||
TTrans::Listener: Send + 'static,
|
|
||||||
TTrans::ListenerUpgrade: Send + 'static,
|
|
||||||
{
|
{
|
||||||
type Output = (PeerId, TMuxer);
|
type Output = (PeerId, TMuxer);
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError> + Send>;
|
type Listener = stream::Empty<(Self::ListenerUpgrade, Multiaddr), IoError>;
|
||||||
type ListenerUpgrade = Box<Future<Item = Self::Output, Error = IoError> + Send>;
|
type ListenerUpgrade = future::Empty<Self::Output, IoError>;
|
||||||
type Dial = Box<Future<Item = Self::Output, Error = IoError> + Send>;
|
type Dial = AndThen<
|
||||||
|
TTrans::Dial,
|
||||||
|
MapErr<IdRetriever<TMuxer>, fn(UpgradeError<IoError>) -> IoError>,
|
||||||
|
fn(TMuxer) -> MapErr<IdRetriever<TMuxer>, fn(UpgradeError<IoError>) -> IoError>
|
||||||
|
>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
@ -90,11 +89,9 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let dial = dial.and_then(move |muxer| {
|
Ok(dial.and_then(|muxer| {
|
||||||
IdRetriever::new(muxer, IdentifyProtocolConfig).map_err(|e| e.into_io_error())
|
IdRetriever::new(muxer, IdentifyProtocolConfig).map_err(|e| e.into_io_error())
|
||||||
});
|
}))
|
||||||
|
|
||||||
Ok(Box::new(dial) as Box<_>)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -105,7 +102,7 @@ where
|
|||||||
|
|
||||||
/// Implementation of `Future` that asks the remote of its `PeerId`.
|
/// Implementation of `Future` that asks the remote of its `PeerId`.
|
||||||
// TODO: remove unneeded bounds
|
// TODO: remove unneeded bounds
|
||||||
struct IdRetriever<TMuxer>
|
pub struct IdRetriever<TMuxer>
|
||||||
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
{
|
{
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
use core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
||||||
use futures::prelude::*;
|
use futures::{future::FromErr, prelude::*};
|
||||||
use std::{iter, io::Error as IoError, sync::Arc};
|
use std::{iter, io::Error as IoError, sync::Arc};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
@ -71,18 +71,16 @@ impl<C, F, O> InboundUpgrade<C> for SimpleProtocol<F>
|
|||||||
where
|
where
|
||||||
C: AsyncRead + AsyncWrite,
|
C: AsyncRead + AsyncWrite,
|
||||||
F: Fn(C) -> O,
|
F: Fn(C) -> O,
|
||||||
O: IntoFuture<Error = IoError>,
|
O: IntoFuture<Error = IoError>
|
||||||
O::Future: Send + 'static,
|
|
||||||
{
|
{
|
||||||
type Output = O::Item;
|
type Output = O::Item;
|
||||||
type Error = IoError;
|
type Error = IoError;
|
||||||
type Future = Box<Future<Item = O::Item, Error = Self::Error> + Send>;
|
type Future = FromErr<O::Future, IoError>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn upgrade_inbound(self, socket: C, _: Self::UpgradeId) -> Self::Future {
|
fn upgrade_inbound(self, socket: C, _: Self::UpgradeId) -> Self::Future {
|
||||||
let upgrade = &self.upgrade;
|
let upgrade = &self.upgrade;
|
||||||
let fut = upgrade(socket).into_future().from_err();
|
upgrade(socket).into_future().from_err()
|
||||||
Box::new(fut) as Box<_>
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,17 +88,15 @@ impl<C, F, O> OutboundUpgrade<C> for SimpleProtocol<F>
|
|||||||
where
|
where
|
||||||
C: AsyncRead + AsyncWrite,
|
C: AsyncRead + AsyncWrite,
|
||||||
F: Fn(C) -> O,
|
F: Fn(C) -> O,
|
||||||
O: IntoFuture<Error = IoError>,
|
O: IntoFuture<Error = IoError>
|
||||||
O::Future: Send + 'static,
|
|
||||||
{
|
{
|
||||||
type Output = O::Item;
|
type Output = O::Item;
|
||||||
type Error = IoError;
|
type Error = IoError;
|
||||||
type Future = Box<Future<Item = O::Item, Error = Self::Error> + Send>;
|
type Future = FromErr<O::Future, IoError>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn upgrade_outbound(self, socket: C, _: Self::UpgradeId) -> Self::Future {
|
fn upgrade_outbound(self, socket: C, _: Self::UpgradeId) -> Self::Future {
|
||||||
let upgrade = &self.upgrade;
|
let upgrade = &self.upgrade;
|
||||||
let fut = upgrade(socket).into_future().from_err();
|
upgrade(socket).into_future().from_err()
|
||||||
Box::new(fut) as Box<_>
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ extern crate multiaddr;
|
|||||||
extern crate tokio_dns;
|
extern crate tokio_dns;
|
||||||
extern crate tokio_io;
|
extern crate tokio_io;
|
||||||
|
|
||||||
use futures::future::{self, Future};
|
use futures::{future::{self, Either, FutureResult, JoinAll}, prelude::*, try_ready};
|
||||||
use log::Level;
|
use log::Level;
|
||||||
use multiaddr::{Protocol, Multiaddr};
|
use multiaddr::{Protocol, Multiaddr};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
@ -94,13 +94,17 @@ where
|
|||||||
|
|
||||||
impl<T> Transport for DnsConfig<T>
|
impl<T> Transport for DnsConfig<T>
|
||||||
where
|
where
|
||||||
T: Transport + Send + 'static, // TODO: 'static :-/
|
T: Transport
|
||||||
T::Dial: Send,
|
|
||||||
{
|
{
|
||||||
type Output = T::Output;
|
type Output = T::Output;
|
||||||
type Listener = T::Listener;
|
type Listener = T::Listener;
|
||||||
type ListenerUpgrade = T::ListenerUpgrade;
|
type ListenerUpgrade = T::ListenerUpgrade;
|
||||||
type Dial = Box<Future<Item = Self::Output, Error = IoError> + Send>;
|
type Dial = Either<T::Dial,
|
||||||
|
DialFuture<T, JoinFuture<JoinAll<std::vec::IntoIter<Either<
|
||||||
|
ResolveFuture<tokio_dns::IoFuture<Vec<IpAddr>>>,
|
||||||
|
FutureResult<Protocol<'static>, IoError>>>>
|
||||||
|
>>
|
||||||
|
>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
@ -126,7 +130,7 @@ where
|
|||||||
if !contains_dns {
|
if !contains_dns {
|
||||||
trace!("Pass-through address without DNS: {}", addr);
|
trace!("Pass-through address without DNS: {}", addr);
|
||||||
return match self.inner.dial(addr) {
|
return match self.inner.dial(addr) {
|
||||||
Ok(d) => Ok(Box::new(d) as Box<_>),
|
Ok(d) => Ok(Either::A(d)),
|
||||||
Err((inner, addr)) => Err((
|
Err((inner, addr)) => Err((
|
||||||
DnsConfig {
|
DnsConfig {
|
||||||
inner,
|
inner,
|
||||||
@ -142,33 +146,33 @@ where
|
|||||||
trace!("Dialing address with DNS: {}", addr);
|
trace!("Dialing address with DNS: {}", addr);
|
||||||
let resolve_iters = addr.iter()
|
let resolve_iters = addr.iter()
|
||||||
.map(move |cmp| match cmp {
|
.map(move |cmp| match cmp {
|
||||||
Protocol::Dns4(ref name) => {
|
Protocol::Dns4(ref name) =>
|
||||||
future::Either::A(resolve_dns(name, &resolver, ResolveTy::Dns4))
|
Either::A(ResolveFuture {
|
||||||
}
|
name: if log_enabled!(Level::Trace) {
|
||||||
Protocol::Dns6(ref name) => {
|
Some(name.clone().into_owned())
|
||||||
future::Either::A(resolve_dns(name, &resolver, ResolveTy::Dns6))
|
} else {
|
||||||
}
|
None
|
||||||
cmp => future::Either::B(future::ok(cmp.acquire())),
|
},
|
||||||
|
inner: resolver.resolve(name),
|
||||||
|
ty: ResolveTy::Dns4
|
||||||
|
}),
|
||||||
|
Protocol::Dns6(ref name) =>
|
||||||
|
Either::A(ResolveFuture {
|
||||||
|
name: if log_enabled!(Level::Trace) {
|
||||||
|
Some(name.clone().into_owned())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
},
|
||||||
|
inner: resolver.resolve(name),
|
||||||
|
ty: ResolveTy::Dns6
|
||||||
|
}),
|
||||||
|
cmp => Either::B(future::ok(cmp.acquire()))
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.into_iter();
|
.into_iter();
|
||||||
|
|
||||||
let new_addr = future::join_all(resolve_iters).map(move |outcome| {
|
let new_addr = JoinFuture { addr, future: future::join_all(resolve_iters) };
|
||||||
let outcome: Multiaddr = outcome.into_iter().collect();
|
Ok(Either::B(DialFuture { trans: Some(self.inner), future: Either::A(new_addr) }))
|
||||||
debug!("DNS resolution outcome: {} => {}", addr, outcome);
|
|
||||||
outcome
|
|
||||||
});
|
|
||||||
|
|
||||||
let inner = self.inner;
|
|
||||||
let future = new_addr
|
|
||||||
.and_then(move |addr| {
|
|
||||||
inner
|
|
||||||
.dial(addr)
|
|
||||||
.map_err(|_| IoError::new(IoErrorKind::Other, "multiaddr not supported"))
|
|
||||||
})
|
|
||||||
.flatten();
|
|
||||||
|
|
||||||
Ok(Box::new(future) as Box<_>)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -186,39 +190,91 @@ enum ResolveTy {
|
|||||||
Dns6,
|
Dns6,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve a DNS name and returns a future with the result.
|
/// Future, performing DNS resolution.
|
||||||
fn resolve_dns<'a>(
|
#[derive(Debug)]
|
||||||
name: &str,
|
pub struct ResolveFuture<T> {
|
||||||
resolver: &CpuPoolResolver,
|
name: Option<String>,
|
||||||
ty: ResolveTy,
|
inner: T,
|
||||||
) -> impl Future<Item = Protocol<'a>, Error = IoError> {
|
ty: ResolveTy
|
||||||
let debug_name = if log_enabled!(Level::Trace) {
|
}
|
||||||
Some(name.to_owned())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
resolver.resolve(name).and_then(move |addrs| {
|
impl<T> Future for ResolveFuture<T>
|
||||||
if log_enabled!(Level::Trace) {
|
where
|
||||||
trace!(
|
T: Future<Item = Vec<IpAddr>, Error = IoError>
|
||||||
"DNS component resolution: {} => {:?}",
|
{
|
||||||
debug_name.expect("trace log level was enabled"),
|
type Item = Protocol<'static>;
|
||||||
addrs
|
type Error = IoError;
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
addrs
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let ty = self.ty;
|
||||||
|
let addrs = try_ready!(self.inner.poll());
|
||||||
|
trace!("DNS component resolution: {:?} => {:?}", self.name, addrs);
|
||||||
|
let mut addrs = addrs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(move |addr| match (addr, ty) {
|
.filter_map(move |addr| match (addr, ty) {
|
||||||
(IpAddr::V4(addr), ResolveTy::Dns4) => Some(Protocol::Ip4(addr)),
|
(IpAddr::V4(addr), ResolveTy::Dns4) => Some(Protocol::Ip4(addr)),
|
||||||
(IpAddr::V6(addr), ResolveTy::Dns6) => Some(Protocol::Ip6(addr)),
|
(IpAddr::V6(addr), ResolveTy::Dns6) => Some(Protocol::Ip6(addr)),
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
});
|
||||||
.next()
|
match addrs.next() {
|
||||||
.ok_or_else(|| {
|
Some(a) => Ok(Async::Ready(a)),
|
||||||
IoError::new(IoErrorKind::Other, "couldn't find any relevant IP address")
|
None => Err(IoError::new(IoErrorKind::Other, "couldn't find any relevant IP address"))
|
||||||
})
|
}
|
||||||
})
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build final multi-address from resolving futures.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct JoinFuture<T> {
|
||||||
|
addr: Multiaddr,
|
||||||
|
future: T
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Future for JoinFuture<T>
|
||||||
|
where
|
||||||
|
T: Future<Item = Vec<Protocol<'static>>, Error = IoError>
|
||||||
|
{
|
||||||
|
type Item = Multiaddr;
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let outcome = try_ready!(self.future.poll());
|
||||||
|
let outcome: Multiaddr = outcome.into_iter().collect();
|
||||||
|
debug!("DNS resolution outcome: {} => {}", self.addr, outcome);
|
||||||
|
Ok(Async::Ready(outcome))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future, dialing the resolved multi-address.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DialFuture<T: Transport, F> {
|
||||||
|
trans: Option<T>,
|
||||||
|
future: Either<F, T::Dial>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, F> Future for DialFuture<T, F>
|
||||||
|
where
|
||||||
|
T: Transport,
|
||||||
|
F: Future<Item = Multiaddr, Error = IoError>
|
||||||
|
{
|
||||||
|
type Item = T::Output;
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
loop {
|
||||||
|
let next = match self.future {
|
||||||
|
Either::A(ref mut f) => {
|
||||||
|
let addr = try_ready!(f.poll());
|
||||||
|
match self.trans.take().unwrap().dial(addr) {
|
||||||
|
Ok(dial) => Either::B(dial),
|
||||||
|
Err(_) => return Err(IoError::new(IoErrorKind::Other, "multiaddr not supported"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Either::B(ref mut f) => return f.poll()
|
||||||
|
};
|
||||||
|
self.future = next
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -140,8 +140,8 @@ pub struct ListenerUpgrade<T: Transport>(RateLimited<T::ListenerUpgrade>);
|
|||||||
|
|
||||||
impl<T> Future for ListenerUpgrade<T>
|
impl<T> Future for ListenerUpgrade<T>
|
||||||
where
|
where
|
||||||
T: Transport + 'static,
|
T: Transport,
|
||||||
T::Output: AsyncRead + AsyncWrite,
|
T::Output: AsyncRead + AsyncWrite
|
||||||
{
|
{
|
||||||
type Item = Connection<T::Output>;
|
type Item = Connection<T::Output>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
@ -156,19 +156,15 @@ where
|
|||||||
|
|
||||||
impl<T> Transport for RateLimited<T>
|
impl<T> Transport for RateLimited<T>
|
||||||
where
|
where
|
||||||
T: Transport + 'static,
|
T: Transport,
|
||||||
T::Dial: Send,
|
T::Output: AsyncRead + AsyncWrite
|
||||||
T::Output: AsyncRead + AsyncWrite + Send,
|
|
||||||
{
|
{
|
||||||
type Output = Connection<T::Output>;
|
type Output = Connection<T::Output>;
|
||||||
type Listener = Listener<T>;
|
type Listener = Listener<T>;
|
||||||
type ListenerUpgrade = ListenerUpgrade<T>;
|
type ListenerUpgrade = ListenerUpgrade<T>;
|
||||||
type Dial = Box<Future<Item = Connection<T::Output>, Error = io::Error> + Send>;
|
type Dial = DialFuture<T::Dial>;
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
let r = self.rlimiter;
|
let r = self.rlimiter;
|
||||||
let w = self.wlimiter;
|
let w = self.wlimiter;
|
||||||
self.value
|
self.value
|
||||||
@ -182,26 +178,38 @@ where
|
|||||||
.map_err(|(transport, a)| (RateLimited::from_parts(transport, r, w), a))
|
.map_err(|(transport, a)| (RateLimited::from_parts(transport, r, w), a))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
let r = self.rlimiter;
|
let r = self.rlimiter;
|
||||||
let w = self.wlimiter;
|
let w = self.wlimiter;
|
||||||
let r2 = r.clone();
|
match self.value.dial(addr) {
|
||||||
let w2 = w.clone();
|
Ok(dial) => Ok(DialFuture { r, w, f: dial }),
|
||||||
|
Err((t, a)) => Err((RateLimited::from_parts(t, r, w), a))
|
||||||
self.value
|
}
|
||||||
.dial(addr)
|
|
||||||
.map(move |dial| {
|
|
||||||
let future = dial
|
|
||||||
.and_then(move |conn| Ok(Connection::new(conn, r, w)?));
|
|
||||||
Box::new(future) as Box<_>
|
|
||||||
})
|
|
||||||
.map_err(|(transport, a)| (RateLimited::from_parts(transport, r2, w2), a))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
||||||
self.value.nat_traversal(server, observed)
|
self.value.nat_traversal(server, observed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Future to avoid boxing.
|
||||||
|
pub struct DialFuture<T> {
|
||||||
|
r: Limiter,
|
||||||
|
w: Limiter,
|
||||||
|
f: T
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Future for DialFuture<T>
|
||||||
|
where
|
||||||
|
T: Future,
|
||||||
|
T::Item: AsyncRead + AsyncWrite,
|
||||||
|
T::Error: From<io::Error>
|
||||||
|
{
|
||||||
|
type Item = Connection<T::Item>;
|
||||||
|
type Error = T::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let item = try_ready!(self.f.poll());
|
||||||
|
Ok(Async::Ready(Connection::new(item, self.r.clone(), self.w.clone())?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -60,7 +60,7 @@ extern crate tokio_io;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
extern crate tokio;
|
extern crate tokio;
|
||||||
|
|
||||||
use futures::future::{self, Future, FutureResult};
|
use futures::{future::{self, FutureResult}, prelude::*, try_ready};
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use multiaddr::{Protocol, Multiaddr};
|
use multiaddr::{Protocol, Multiaddr};
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
@ -86,9 +86,9 @@ impl UdsConfig {
|
|||||||
|
|
||||||
impl Transport for UdsConfig {
|
impl Transport for UdsConfig {
|
||||||
type Output = UnixStream;
|
type Output = UnixStream;
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError> + Send + Sync>;
|
type Listener = ListenerStream<tokio_uds::Incoming>;
|
||||||
type ListenerUpgrade = FutureResult<Self::Output, IoError>;
|
type ListenerUpgrade = FutureResult<Self::Output, IoError>;
|
||||||
type Dial = Box<Future<Item = UnixStream, Error = IoError> + Send + Sync>; // TODO: name this type
|
type Dial = tokio_uds::ConnectFuture;
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
if let Ok(path) = multiaddr_to_path(&addr) {
|
if let Ok(path) = multiaddr_to_path(&addr) {
|
||||||
@ -96,23 +96,16 @@ impl Transport for UdsConfig {
|
|||||||
// We need to build the `Multiaddr` to return from this function. If an error happened,
|
// We need to build the `Multiaddr` to return from this function. If an error happened,
|
||||||
// just return the original multiaddr.
|
// just return the original multiaddr.
|
||||||
match listener {
|
match listener {
|
||||||
Ok(_) => {},
|
Ok(listener) => {
|
||||||
|
debug!("Now listening on {}", addr);
|
||||||
|
let future = ListenerStream {
|
||||||
|
stream: listener.incoming(),
|
||||||
|
addr: addr.clone()
|
||||||
|
};
|
||||||
|
Ok((future, addr))
|
||||||
|
}
|
||||||
Err(_) => return Err((self, addr)),
|
Err(_) => return Err((self, addr)),
|
||||||
};
|
}
|
||||||
|
|
||||||
debug!("Now listening on {}", addr);
|
|
||||||
let new_addr = addr.clone();
|
|
||||||
|
|
||||||
let future = future::result(listener)
|
|
||||||
.map(move |listener| {
|
|
||||||
// Pull out a stream of sockets for incoming connections
|
|
||||||
listener.incoming().map(move |sock| {
|
|
||||||
debug!("Incoming connection on {}", addr);
|
|
||||||
(future::ok(sock), addr.clone())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.flatten_stream();
|
|
||||||
Ok((Box::new(future), new_addr))
|
|
||||||
} else {
|
} else {
|
||||||
Err((self, addr))
|
Err((self, addr))
|
||||||
}
|
}
|
||||||
@ -121,8 +114,7 @@ impl Transport for UdsConfig {
|
|||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
if let Ok(path) = multiaddr_to_path(&addr) {
|
if let Ok(path) = multiaddr_to_path(&addr) {
|
||||||
debug!("Dialing {}", addr);
|
debug!("Dialing {}", addr);
|
||||||
let fut = UnixStream::connect(&path);
|
Ok(UnixStream::connect(&path))
|
||||||
Ok(Box::new(fut) as Box<_>)
|
|
||||||
} else {
|
} else {
|
||||||
Err((self, addr))
|
Err((self, addr))
|
||||||
}
|
}
|
||||||
@ -162,6 +154,29 @@ fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> {
|
|||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct ListenerStream<T> {
|
||||||
|
stream: T,
|
||||||
|
addr: Multiaddr
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Stream for ListenerStream<T>
|
||||||
|
where
|
||||||
|
T: Stream
|
||||||
|
{
|
||||||
|
type Item = (FutureResult<T::Item, T::Error>, Multiaddr);
|
||||||
|
type Error = T::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
|
match try_ready!(self.stream.poll()) {
|
||||||
|
Some(item) => {
|
||||||
|
debug!("incoming connection on {}", self.addr);
|
||||||
|
Ok(Async::Ready(Some((future::ok(item), self.addr.clone()))))
|
||||||
|
}
|
||||||
|
None => Ok(Async::Ready(None))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use tokio::runtime::current_thread::Runtime;
|
use tokio::runtime::current_thread::Runtime;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user