The Multiaddr of the remote is now a Future (#249)

* The Multiaddr of the remote is now a Future

* The multiaddress future in swarm is now a Box
This commit is contained in:
Pierre Krieger
2018-06-19 14:38:55 +02:00
committed by GitHub
parent 8fb919321b
commit 7b375798a5
40 changed files with 558 additions and 485 deletions

View File

@ -40,9 +40,8 @@
//! `MuxedTransport` trait.
use fnv::FnvHashMap;
use futures::future::{self, Either, FutureResult, IntoFuture};
use futures::future::{self, Either, FutureResult};
use futures::{Async, Future, Poll, Stream};
use futures::stream::Fuse as StreamFuse;
use futures::stream::FuturesUnordered;
use futures::sync::mpsc;
use multiaddr::Multiaddr;
@ -64,7 +63,7 @@ pub struct ConnectionReuse<T, C>
where
T: Transport,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output>,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture>,
C::Output: StreamMuxer,
{
// Underlying transport and connection upgrade for when we need to dial or listen.
@ -97,7 +96,7 @@ impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
where
T: Transport,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output>,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture>,
C::Output: StreamMuxer,
{
#[inline]
@ -120,15 +119,16 @@ impl<T, C> Transport for ConnectionReuse<T, C>
where
T: Transport + 'static, // TODO: 'static :(
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static, // TODO: 'static :(
C: Clone,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + 'static, // TODO: 'static :(
C::Output: StreamMuxer + Clone,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant
{
type Output = <C::Output as StreamMuxer>::Substream;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = FutureResult<(Self::Output, Multiaddr), IoError>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
@ -144,9 +144,17 @@ where
}
};
let listener = listener
.fuse()
.map(|upgr| {
upgr.and_then(|(out, addr)| {
addr.map(move |addr| (out, addr))
})
});
let listener = ConnectionReuseListener {
shared: self.shared.clone(),
listener: listener.fuse(),
listener: listener,
current_upgrades: FuturesUnordered::new(),
connections: Vec::new(),
};
@ -163,7 +171,7 @@ where
.map(|muxer| muxer.clone())
{
let a = addr.clone();
Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, a))))
Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, future::ok(a)))))
} else {
Either::B(future::ok(None))
};
@ -179,7 +187,12 @@ where
debug!("No existing connection to {}; dialing", addr);
match inner.dial(addr.clone()) {
Ok(dial) => {
let future = dial.and_then(move |(muxer, addr)| {
let future = dial
.and_then(move |(muxer, addr_fut)| {
trace!("Waiting for remote's address");
addr_fut.map(move |addr| (muxer, addr))
})
.and_then(move |(muxer, addr)| {
muxer.clone().outbound().and_then(move |substream| {
if let Some(s) = substream {
// Replace the active connection because we are the most recent.
@ -191,7 +204,7 @@ where
muxer.inbound(),
addr.clone(),
));
Ok((s, addr))
Ok((s, future::ok(addr)))
} else {
error!("failed to dial to {}", addr);
shared.lock().active_connections.remove(&addr);
@ -221,14 +234,14 @@ impl<T, C> MuxedTransport for ConnectionReuse<T, C>
where
T: Transport + 'static, // TODO: 'static :(
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static, // TODO: 'static :(
C: Clone,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + 'static, // TODO: 'static :(
C::Output: StreamMuxer + Clone,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant
{
type Incoming = ConnectionReuseIncoming<C::Output>;
type IncomingUpgrade =
future::FutureResult<(<C::Output as StreamMuxer>::Substream, Multiaddr), IoError>;
future::FutureResult<(<C::Output as StreamMuxer>::Substream, Self::MultiaddrFuture), IoError>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -241,12 +254,10 @@ where
/// Implementation of `Stream` for the connections incoming from listening on a specific address.
pub struct ConnectionReuseListener<S, F, M>
where
S: Stream<Item = F, Error = IoError>,
F: Future<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer,
{
// The main listener. `S` is from the underlying transport.
listener: StreamFuse<S>,
listener: S,
current_upgrades: FuturesUnordered<F>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
@ -260,7 +271,7 @@ where
F: Future<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer + Clone + 'static, // TODO: 'static :(
{
type Item = FutureResult<(M::Substream, Multiaddr), IoError>;
type Item = FutureResult<(M::Substream, FutureResult<Multiaddr, IoError>), IoError>;
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -323,7 +334,7 @@ where
self.connections
.push((muxer, new_next, client_addr.clone()));
return Ok(Async::Ready(Some(
Ok((incoming, client_addr)).into_future(),
future::ok((incoming, future::ok(client_addr))),
)));
}
Ok(Async::NotReady) => {
@ -355,7 +366,7 @@ impl<M> Future for ConnectionReuseIncoming<M>
where
M: Clone + StreamMuxer,
{
type Item = future::FutureResult<(M::Substream, Multiaddr), IoError>;
type Item = future::FutureResult<(M::Substream, future::FutureResult<Multiaddr, IoError>), IoError>;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -391,7 +402,7 @@ where
debug!("New incoming substream");
let next = muxer.clone().inbound();
lock.next_incoming.push((muxer, next, addr.clone()));
return Ok(Async::Ready(future::ok((value, addr))));
return Ok(Async::Ready(future::ok((value, future::ok(addr)))));
}
Ok(Async::NotReady) => {
lock.next_incoming.push((muxer, future, addr));

View File

@ -18,8 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use multiaddr::Multiaddr;
use futures::{prelude::*, future};
use muxing::StreamMuxer;
use std::io::{Error as IoError, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};
@ -215,12 +214,12 @@ pub enum EitherListenUpgrade<A, B> {
Second(B),
}
impl<A, B, Ao, Bo> Future for EitherListenUpgrade<A, B>
impl<A, B, Ao, Bo, Af, Bf> Future for EitherListenUpgrade<A, B>
where
A: Future<Item = (Ao, Multiaddr), Error = IoError>,
B: Future<Item = (Bo, Multiaddr), Error = IoError>,
A: Future<Item = (Ao, Af), Error = IoError>,
B: Future<Item = (Bo, Bf), Error = IoError>,
{
type Item = (EitherOutput<Ao, Bo>, Multiaddr);
type Item = (EitherOutput<Ao, Bo>, future::Either<Af, Bf>);
type Error = IoError;
#[inline]
@ -228,11 +227,11 @@ where
match self {
&mut EitherListenUpgrade::First(ref mut a) => {
let (item, addr) = try_ready!(a.poll());
Ok(Async::Ready((EitherOutput::First(item), addr)))
Ok(Async::Ready((EitherOutput::First(item), future::Either::A(addr))))
}
&mut EitherListenUpgrade::Second(ref mut b) => {
let (item, addr) = try_ready!(b.poll());
Ok(Async::Ready((EitherOutput::Second(item), addr)))
Ok(Async::Ready((EitherOutput::Second(item), future::Either::B(addr))))
}
}
}

View File

@ -39,7 +39,7 @@ pub fn swarm<T, H, F>(
) -> (SwarmController<T>, SwarmFuture<T, H, F::Future>)
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
H: FnMut(T::Output, Multiaddr) -> F,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> F,
F: IntoFuture<Item = (), Error = IoError>,
{
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
@ -76,7 +76,7 @@ where
{
transport: T,
new_listeners: mpsc::UnboundedSender<T::Listener>,
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
}
@ -123,7 +123,7 @@ where
match transport.dial(multiaddr.clone()) {
Ok(dial) => {
let dial = Box::new(
dial.map(|(d, client_addr)| (d.into(), client_addr)),
dial.map(|(d, client_addr)| (d.into(), Box::new(client_addr) as Box<Future<Item = _, Error = _>>)),
) as Box<Future<Item = _, Error = _>>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
@ -163,17 +163,17 @@ where
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>,
Item = Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>,
Error = IoError,
>,
>,
>,
>,
listeners_upgrade:
FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
FuturesUnordered<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
new_dialers:
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
}
@ -181,7 +181,7 @@ where
impl<T, H, If, F> Future for SwarmFuture<T, H, F>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
H: FnMut(T::Output, Multiaddr) -> If,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError>,
{
@ -195,6 +195,9 @@ where
Ok(Async::Ready(connec)) => {
debug!("Swarm received new multiplexed incoming connection");
self.next_incoming = self.transport.clone().next_incoming();
let connec = connec.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
});
self.listeners_upgrade.push(Box::new(connec) as Box<_>);
}
Ok(Async::NotReady) => {}
@ -207,7 +210,13 @@ where
match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => {
let new_listener = Box::new(
new_listener.map(|f| Box::new(f) as Box<Future<Item = _, Error = _>>),
new_listener.map(|f| {
let f = f.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
});
Box::new(f) as Box<Future<Item = _, Error = _>>
}),
) as Box<Stream<Item = _, Error = _>>;
self.listeners.push(new_listener.into_future());
}
@ -254,10 +263,7 @@ where
match self.listeners_upgrade.poll() {
Ok(Async::Ready(Some((output, client_addr)))) => {
debug!(
"Successfully upgraded incoming connection with {}",
client_addr
);
debug!("Successfully upgraded incoming connection");
self.to_process.push(future::Either::A(
handler(output, client_addr).into_future(),
));
@ -270,7 +276,7 @@ where
match self.dialers.poll() {
Ok(Async::Ready(Some((output, addr)))) => {
trace!("Successfully upgraded dialed connection with {}", addr);
trace!("Successfully upgraded dialed connection");
self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
}

View File

@ -37,16 +37,18 @@ pub struct AndThen<T, C> {
upgrade: C,
}
impl<T, C, F, O> Transport for AndThen<T, C>
impl<T, C, F, O, Maf> Transport for AndThen<T, C>
where
T: Transport + 'static,
C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
F: Future<Item = O, Error = IoError> + 'static,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static,
F: Future<Item = (O, Maf), Error = IoError> + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type Output = O;
type MultiaddrFuture = Maf;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -71,7 +73,7 @@ where
let stream = listening_stream.map(move |connection| {
let upgrade = upgrade.clone();
let future = connection.and_then(move |(stream, client_addr)| {
upgrade(stream, Endpoint::Listener, client_addr.clone()).map(|o| (o, client_addr))
upgrade(stream, Endpoint::Listener, client_addr)
});
Box::new(future) as Box<_>
@ -99,8 +101,7 @@ where
let future = dialed_fut
// Try to negotiate the protocol.
.and_then(move |(connection, client_addr)| {
upgrade(connection, Endpoint::Dialer, client_addr.clone())
.map(|o| (o, client_addr))
upgrade(connection, Endpoint::Listener, client_addr)
});
Ok(Box::new(future))
@ -112,14 +113,15 @@ where
}
}
impl<T, C, F, O> MuxedTransport for AndThen<T, C>
impl<T, C, F, O, Maf> MuxedTransport for AndThen<T, C>
where
T: MuxedTransport + 'static,
C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
F: Future<Item = O, Error = IoError> + 'static,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static,
F: Future<Item = (O, Maf), Error = IoError> + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -129,8 +131,7 @@ where
// Try to negotiate the protocol.
let future = future.and_then(move |(connection, client_addr)| {
let upgrade = upgrade.clone();
upgrade(connection, Endpoint::Listener, client_addr.clone())
.map(|o| (o, client_addr))
upgrade(connection, Endpoint::Listener, client_addr)
});
Box::new(future) as Box<Future<Item = _, Error = _>>

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use either::{EitherListenStream, EitherListenUpgrade, EitherOutput};
use futures::prelude::*;
use futures::{prelude::*, future};
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::{MuxedTransport, Transport};
@ -42,6 +42,7 @@ where
type Output = EitherOutput<A::Output, B::Output>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherListenUpgrade<A::ListenerUpgrade, B::ListenerUpgrade>;
type MultiaddrFuture = future::Either<A::MultiaddrFuture, B::MultiaddrFuture>;
type Dial =
EitherListenUpgrade<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
@ -93,16 +94,16 @@ where
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade =
Box<Future<Item = (EitherOutput<A::Output, B::Output>, Multiaddr), Error = IoError>>;
Box<Future<Item = (EitherOutput<A::Output, B::Output>, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
let first = self.0.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherOutput::First(v), addr));
let fut = out.map(move |(v, addr)| (EitherOutput::First(v), future::Either::A(addr)));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let second = self.1.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherOutput::Second(v), addr));
let fut = out.map(move |(v, addr)| (EitherOutput::Second(v), future::Either::B(addr)));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e);

View File

@ -32,9 +32,10 @@ pub struct DeniedTransport;
impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable
type Output = Cursor<Vec<u8>>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = io::Error>>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = io::Error>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -54,7 +55,7 @@ impl Transport for DeniedTransport {
impl MuxedTransport for DeniedTransport {
type Incoming = future::Empty<Self::IncomingUpgrade, io::Error>;
type IncomingUpgrade = future::Empty<(Self::Output, Multiaddr), io::Error>;
type IncomingUpgrade = future::Empty<(Self::Output, Self::MultiaddrFuture), io::Error>;
#[inline]
fn next_incoming(self) -> Self::Incoming {

View File

@ -40,7 +40,7 @@ where
T: Transport,
{
type Incoming = future::Empty<Self::IncomingUpgrade, IoError>;
type IncomingUpgrade = future::Empty<(T::Output, Multiaddr), IoError>;
type IncomingUpgrade = future::Empty<(T::Output, Self::MultiaddrFuture), IoError>;
fn next_incoming(self) -> Self::Incoming
where
@ -55,6 +55,7 @@ where
T: Transport,
{
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;

View File

@ -42,12 +42,13 @@ impl<T, F> Map<T, F> {
impl<T, F, D> Transport for Map<T, F>
where
T: Transport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/
{
type Output = D;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let map = self.map;
@ -58,7 +59,7 @@ where
let map = map.clone();
let future = future
.into_future()
.map(move |(output, addr)| (map(output, Endpoint::Listener, addr.clone()), addr));
.map(move |(output, addr)| (map(output, Endpoint::Listener), addr));
Box::new(future) as Box<_>
});
Ok((Box::new(stream), listen_addr))
@ -74,7 +75,7 @@ where
Ok(future) => {
let future = future
.into_future()
.map(move |(output, addr)| (map(output, Endpoint::Dialer, addr.clone()), addr));
.map(move |(output, addr)| (map(output, Endpoint::Dialer), addr));
Ok(Box::new(future))
}
Err((transport, addr)) => Err((Map { transport, map }, addr)),
@ -90,16 +91,16 @@ where
impl<T, F, D> MuxedTransport for Map<T, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
fn next_incoming(self) -> Self::Incoming {
let map = self.map;
let future = self.transport.next_incoming().map(move |upgrade| {
let future = upgrade.map(move |(output, addr)| {
(map(output, Endpoint::Listener, addr.clone()), addr)
(map(output, Endpoint::Listener), addr)
});
Box::new(future) as Box<_>
});

View File

@ -70,13 +70,17 @@ pub trait Transport {
/// taken place, and that connection has been upgraded to the wanted protocols.
type Listener: Stream<Item = Self::ListenerUpgrade, Error = IoError>;
/// Future that produces the multiaddress of the remote.
type MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>;
/// After a connection has been received, we may need to do some asynchronous pre-processing
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
/// want to be able to continue polling on the listener.
type ListenerUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
// TODO: we could move the `MultiaddrFuture` to the `Listener` trait
type ListenerUpgrade: Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>;
/// A future which indicates that we are currently dialing to a peer.
type Dial: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
type Dial: Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>;
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
@ -115,7 +119,7 @@ pub trait Transport {
fn map<F, O>(self, map: F) -> map::Map<Self, F>
where
Self: Sized,
F: FnOnce(Self::Output, Endpoint, Multiaddr) -> O + Clone + 'static, // TODO: 'static :-/
F: FnOnce(Self::Output, Endpoint) -> O + Clone + 'static, // TODO: 'static :-/
{
map::Map::new(self, map)
}
@ -142,7 +146,7 @@ pub trait Transport {
where
Self: Sized,
Self::Output: AsyncRead + AsyncWrite,
U: ConnectionUpgrade<Self::Output>,
U: ConnectionUpgrade<Self::Output, Self::MultiaddrFuture>,
{
UpgradedNode::new(self, upgrade)
}
@ -153,11 +157,12 @@ pub trait Transport {
/// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio*
/// > (communication encryption), *multiplex*, but also a protocol handler.
#[inline]
fn and_then<C, F>(self, upgrade: C) -> and_then::AndThen<Self, C>
fn and_then<C, F, O, Maf>(self, upgrade: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
C: FnOnce(Self::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
F: Future<Error = IoError> + 'static,
C: FnOnce(Self::Output, Endpoint, Self::MultiaddrFuture) -> F + Clone + 'static,
F: Future<Item = (O, Maf), Error = IoError> + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
and_then::and_then(self, upgrade)
}

View File

@ -20,7 +20,6 @@
use futures::prelude::*;
use futures::stream;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::Transport;
@ -30,7 +29,7 @@ pub trait MuxedTransport: Transport {
/// Future resolving to a future that will resolve to an incoming connection.
type Incoming: Future<Item = Self::IncomingUpgrade, Error = IoError>;
/// Future resolving to an incoming connection.
type IncomingUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
type IncomingUpgrade: Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>;
/// Returns the next incoming substream opened by a node that we dialed ourselves.
///

View File

@ -50,7 +50,7 @@ impl<'a, T, C> UpgradedNode<T, C>
where
T: Transport + 'a,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'a,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + 'a,
{
/// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the
/// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`.
@ -77,7 +77,7 @@ where
pub fn dial(
self,
addr: Multiaddr,
) -> Result<Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, (Self, Multiaddr)>
) -> Result<Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + 'a>, (Self, Multiaddr)>
where
C::NamesIter: Clone, // TODO: not elegant
{
@ -113,7 +113,7 @@ where
self,
) -> Box<
Future<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Item = Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + 'a>,
Error = IoError,
>
+ 'a,
@ -150,7 +150,7 @@ where
(
Box<
Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Item = Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + 'a>,
Error = IoError,
>
+ 'a,
@ -200,14 +200,16 @@ impl<T, C> Transport for UpgradedNode<T, C>
where
T: Transport + 'static,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + 'static,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type Output = C::Output;
type MultiaddrFuture = C::MultiaddrFuture;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -229,12 +231,13 @@ impl<T, C> MuxedTransport for UpgradedNode<T, C>
where
T: MuxedTransport + 'static,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + 'static,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {

View File

@ -20,7 +20,6 @@
use bytes::Bytes;
use futures::prelude::*;
use multiaddr::Multiaddr;
use multistream_select;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use tokio_io::{AsyncRead, AsyncWrite};
@ -30,21 +29,21 @@ use upgrade::{ConnectionUpgrade, Endpoint};
///
/// Returns a `Future` that returns the outcome of the connection upgrade.
#[inline]
pub fn apply<'a, C, U>(
pub fn apply<'a, C, U, Maf>(
connection: C,
upgrade: U,
endpoint: Endpoint,
remote_addr: Multiaddr,
) -> Box<Future<Item = (U::Output, Multiaddr), Error = IoError> + 'a>
remote_addr: Maf,
) -> Box<Future<Item = (U::Output, U::MultiaddrFuture), Error = IoError> + 'a>
where
U: ConnectionUpgrade<C> + 'a,
U: ConnectionUpgrade<C, Maf> + 'a,
U::NamesIter: Clone, // TODO: not elegant
C: AsyncRead + AsyncWrite + 'a,
Maf: 'a,
{
let iter = upgrade
.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
let remote_addr2 = remote_addr.clone();
debug!("Starting protocol negotiation");
let negotiation = match endpoint {
@ -56,14 +55,13 @@ where
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.then(move |negotiated| {
match negotiated {
Ok(_) => debug!("Successfully negotiated protocol upgrade with {}", remote_addr2),
Ok(_) => debug!("Successfully negotiated protocol upgrade"),
Err(ref err) => debug!("Error while negotiated protocol upgrade: {:?}", err),
};
negotiated
})
.and_then(move |(upgrade_id, connection)| {
let fut = upgrade.upgrade(connection, upgrade_id, endpoint, &remote_addr);
fut.map(move |c| (c, remote_addr))
upgrade.upgrade(connection, upgrade_id, endpoint, remote_addr)
})
.into_future()
.then(|val| {

View File

@ -19,8 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::prelude::*;
use multiaddr::Multiaddr;
use futures::{future, prelude::*};
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
@ -38,11 +37,11 @@ pub fn or<A, B>(me: A, other: B) -> OrUpgrade<A, B> {
#[derive(Debug, Copy, Clone)]
pub struct OrUpgrade<A, B>(A, B);
impl<C, A, B, O> ConnectionUpgrade<C> for OrUpgrade<A, B>
impl<C, A, B, O, Maf> ConnectionUpgrade<C, Maf> for OrUpgrade<A, B>
where
C: AsyncRead + AsyncWrite,
A: ConnectionUpgrade<C, Output = O>,
B: ConnectionUpgrade<C, Output = O>,
A: ConnectionUpgrade<C, Maf, Output = O>,
B: ConnectionUpgrade<C, Maf, Output = O>,
{
type NamesIter = NamesIterChain<A::NamesIter, B::NamesIter>;
type UpgradeIdentifier = EitherUpgradeIdentifier<A::UpgradeIdentifier, B::UpgradeIdentifier>;
@ -56,6 +55,7 @@ where
}
type Output = O;
type MultiaddrFuture = future::Either<A::MultiaddrFuture, B::MultiaddrFuture>;
type Future = EitherConnUpgrFuture<A::Future, B::Future>;
#[inline]
@ -64,7 +64,7 @@ where
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
remote_addr: Maf,
) -> Self::Future {
match id {
EitherUpgradeIdentifier::First(id) => {
@ -97,24 +97,24 @@ pub enum EitherConnUpgrFuture<A, B> {
Second(B),
}
impl<A, B, O> Future for EitherConnUpgrFuture<A, B>
impl<A, B, O, Ma, Mb> Future for EitherConnUpgrFuture<A, B>
where
A: Future<Error = IoError, Item = O>,
B: Future<Error = IoError, Item = O>,
A: Future<Error = IoError, Item = (O, Ma)>,
B: Future<Error = IoError, Item = (O, Mb)>,
{
type Item = O;
type Item = (O, future::Either<Ma, Mb>);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherConnUpgrFuture::First(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(item))
let (item, fut) = try_ready!(a.poll());
Ok(Async::Ready((item, future::Either::A(fut))))
}
&mut EitherConnUpgrFuture::Second(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(item))
let (item, fut) = try_ready!(b.poll());
Ok(Async::Ready((item, future::Either::B(fut))))
}
}
}

View File

@ -29,14 +29,15 @@ use upgrade::{ConnectionUpgrade, Endpoint};
#[derive(Debug, Copy, Clone)]
pub struct DeniedConnectionUpgrade;
impl<C> ConnectionUpgrade<C> for DeniedConnectionUpgrade
impl<C, Maf> ConnectionUpgrade<C, Maf> for DeniedConnectionUpgrade
where
C: AsyncRead + AsyncWrite,
{
type NamesIter = iter::Empty<(Bytes, ())>;
type UpgradeIdentifier = (); // TODO: could use `!`
type Output = (); // TODO: could use `!`
type Future = Box<Future<Item = (), Error = io::Error>>; // TODO: could use `!`
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error>>; // TODO: could use `!`
type Future = Box<Future<Item = ((), Self::MultiaddrFuture), Error = io::Error>>; // TODO: could use `!`
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
@ -44,7 +45,7 @@ where
}
#[inline]
fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future {
fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: Maf) -> Self::Future {
unreachable!("the denied connection upgrade always fails to negotiate")
}
}

View File

@ -18,8 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{future, prelude::*};
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use futures::prelude::*;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
@ -36,11 +36,12 @@ pub struct Map<U, F> {
map: F,
}
impl<C, U, F, O> ConnectionUpgrade<C> for Map<U, F>
impl<C, U, F, O, Maf> ConnectionUpgrade<C, Maf> for Map<U, F>
where
U: ConnectionUpgrade<C>,
U: ConnectionUpgrade<C, Maf>,
U::Future: 'static, // TODO: 'static :(
C: AsyncRead + AsyncWrite,
F: FnOnce(U::Output) -> O,
F: FnOnce(U::Output) -> O + 'static, // TODO: 'static :(
{
type NamesIter = U::NamesIter;
type UpgradeIdentifier = U::UpgradeIdentifier;
@ -50,17 +51,20 @@ where
}
type Output = O;
type Future = future::Map<U::Future, F>;
type MultiaddrFuture = U::MultiaddrFuture;
type Future = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
remote_addr: Maf,
) -> Self::Future {
self.upgrade
let map = self.map;
let fut = self.upgrade
.upgrade(socket, id, ty, remote_addr)
.map(self.map)
.map(move |(out, maf)| (map(out), maf));
Box::new(fut) as Box<_>
}
}

View File

@ -20,7 +20,6 @@
use bytes::Bytes;
use futures::future::{self, FutureResult};
use multiaddr::Multiaddr;
use std::{iter, io::Error as IoError};
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
@ -33,18 +32,19 @@ use upgrade::{ConnectionUpgrade, Endpoint};
#[derive(Debug, Copy, Clone)]
pub struct PlainTextConfig;
impl<C> ConnectionUpgrade<C> for PlainTextConfig
impl<C, F> ConnectionUpgrade<C, F> for PlainTextConfig
where
C: AsyncRead + AsyncWrite,
{
type Output = C;
type Future = FutureResult<C, IoError>;
type Future = FutureResult<(C, F), IoError>;
type UpgradeIdentifier = ();
type MultiaddrFuture = F;
type NamesIter = iter::Once<(Bytes, ())>;
#[inline]
fn upgrade(self, i: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(i)
fn upgrade(self, i: C, _: (), _: Endpoint, remote_addr: F) -> Self::Future {
future::ok((i, remote_addr))
}
#[inline]

View File

@ -20,7 +20,6 @@
use bytes::Bytes;
use futures::future::Future;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
@ -40,7 +39,8 @@ pub enum Endpoint {
/// > **Note**: The `upgrade` method of this trait uses `self` and not `&self` or `&mut self`.
/// > This has been designed so that you would implement this trait on `&Foo` or
/// > `&mut Foo` instead of directly on `Foo`.
pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
// TODO: remove AsyncRead + AsyncWrite bounds here
pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite, TAddrFut> {
/// Iterator returned by `protocol_names`.
type NamesIter: Iterator<Item = (Bytes, Self::UpgradeIdentifier)>;
/// Type that serves as an identifier for the protocol. This type only exists to be returned
@ -58,8 +58,10 @@ pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
/// > **Note**: For upgrades that add an intermediary layer (such as `secio` or `multiplex`),
/// > this associated type must implement `AsyncRead + AsyncWrite`.
type Output;
/// Type of the future that will resolve to the remote's multiaddr.
type MultiaddrFuture;
/// Type of the future that will resolve to `Self::Output`.
type Future: Future<Item = Self::Output, Error = IoError>;
type Future: Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>;
/// This method is called after protocol negotiation has been performed.
///
@ -70,6 +72,6 @@ pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
remote_addr: TAddrFut,
) -> Self::Future;
}

View File

@ -54,6 +54,7 @@ impl<T: Clone> Clone for OnlyOnce<T> {
}
impl<T: Transport> Transport for OnlyOnce<T> {
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;

View File

@ -100,9 +100,10 @@ where
T: Transport + 'static, // TODO: 'static :-/
{
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -237,9 +238,10 @@ mod tests {
struct CustomTransport;
impl Transport for CustomTransport {
type Output = <TcpConfig as Transport>::Output;
type MultiaddrFuture = <TcpConfig as Transport>::MultiaddrFuture;
type Listener = <TcpConfig as Transport>::Listener;
type ListenerUpgrade = <TcpConfig as Transport>::ListenerUpgrade;
type Dial = future::Empty<(Self::Output, Multiaddr), IoError>;
type Dial = future::Empty<(Self::Output, Self::MultiaddrFuture), IoError>;
#[inline]
fn listen_on(

View File

@ -87,9 +87,10 @@ impl FloodSubUpgrade {
}
}
impl<C> ConnectionUpgrade<C> for FloodSubUpgrade
impl<C, Maf> ConnectionUpgrade<C, Maf> for FloodSubUpgrade
where
C: AsyncRead + AsyncWrite + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
@ -100,7 +101,8 @@ where
}
type Output = FloodSubFuture;
type Future = future::FutureResult<Self::Output, IoError>;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn upgrade(
@ -108,10 +110,11 @@ where
socket: C,
_: Self::UpgradeIdentifier,
_: Endpoint,
remote_addr: &Multiaddr,
remote_addr: Maf,
) -> Self::Future {
debug!("Upgrading connection to {} as floodsub", remote_addr);
debug!("Upgrading connection as floodsub");
let future = remote_addr.and_then(move |remote_addr| {
// Whenever a new node connects, we send to it a message containing the topics we are
// already subscribed to.
let init_msg: Vec<u8> = {
@ -164,7 +167,7 @@ where
}
let inner = self.inner.clone();
let remote_addr = remote_addr.clone();
let remote_addr_ret = future::ok(remote_addr.clone());
let future = future::loop_fn(
(floodsub_sink, messages),
move |(floodsub_sink, messages)| {
@ -211,9 +214,12 @@ where
},
);
future::ok(FloodSubFuture {
future::ok((FloodSubFuture {
inner: Box::new(future) as Box<_>,
})
}, remote_addr_ret))
});
Box::new(future) as Box<_>
}
}

View File

@ -21,7 +21,6 @@
use bytes::{Bytes, BytesMut};
use futures::{future, Future, Sink, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, PublicKeyBytes};
use log::Level;
use multiaddr::Multiaddr;
use protobuf::Message as ProtobufMessage;
use protobuf::parse_from_bytes as protobuf_parse_from_bytes;
@ -113,29 +112,26 @@ pub struct IdentifyInfo {
pub protocols: Vec<String>,
}
impl<C> ConnectionUpgrade<C> for IdentifyProtocolConfig
impl<C, Maf> ConnectionUpgrade<C, Maf> for IdentifyProtocolConfig
where
C: AsyncRead + AsyncWrite + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
type Output = IdentifyOutput<C>;
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
}
fn upgrade(self, socket: C, _: (), ty: Endpoint, observed_addr: &Multiaddr) -> Self::Future {
trace!("Upgrading connection with {:?} as {:?}", observed_addr, ty);
fn upgrade(self, socket: C, _: (), ty: Endpoint, observed_addr: Maf) -> Self::Future {
trace!("Upgrading connection as {:?}", ty);
let socket = socket.framed(VarintCodec::default());
let observed_addr_log = if log_enabled!(Level::Debug) {
Some(observed_addr.clone())
} else {
None
};
match ty {
Endpoint::Dialer => {
@ -144,12 +140,7 @@ where
.map(|(msg, _)| msg)
.map_err(|(err, _)| err)
.and_then(|msg| {
if log_enabled!(Level::Debug) {
debug!("Received identify message from {:?}",
observed_addr_log
.expect("Programmer error: expected `observed_addr_log' to be \
non-None since debug log level is enabled"));
}
debug!("Received identify message");
if let Some(msg) = msg {
let (info, observed_addr) = match parse_proto_msg(msg) {
@ -163,10 +154,12 @@ where
trace!("Remote observes us as {:?}", observed_addr);
trace!("Information received: {:?}", info);
Ok(IdentifyOutput::RemoteInfo {
let out = IdentifyOutput::RemoteInfo {
info,
observed_addr,
})
observed_addr: observed_addr.clone(),
};
Ok((out, future::ok(observed_addr)))
} else {
debug!("Identify protocol stream closed before receiving info");
Err(IoErrorKind::InvalidData.into())
@ -179,9 +172,13 @@ where
Endpoint::Listener => {
let sender = IdentifySender { inner: socket };
let future = future::ok(IdentifyOutput::Sender {
let future = observed_addr.map(move |addr| {
let io = IdentifyOutput::Sender {
sender,
observed_addr: observed_addr.clone(),
observed_addr: addr.clone(),
};
(io, future::ok(addr))
});
Box::new(future) as Box<_>

View File

@ -65,9 +65,10 @@ where
for<'r> &'r PStore: Peerstore,
{
type Output = IdentifyTransportOutput<Trans::Output>;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -93,7 +94,11 @@ where
let listener = listener.map(move |connec| {
let peerstore = peerstore.clone();
let identify_upgrade = identify_upgrade.clone();
let fut = connec.and_then(move |(connec, client_addr)| {
let fut = connec
.and_then(move |(connec, client_addr)| {
client_addr.map(move |a| (connec, a))
})
.and_then(move |(connec, client_addr)| {
for peer_id in peerstore.peers() {
let peer = match peerstore.peer(&peer_id) {
Some(p) => p,
@ -103,7 +108,7 @@ where
if peer.addrs().any(|addr| addr == client_addr) {
debug!("Incoming substream from {} identified as {:?}", client_addr, peer_id);
let out = IdentifyTransportOutput { socket: connec, observed_addr: None };
let ret = (out, AddrComponent::P2P(peer_id.into_bytes()).into());
let ret = (out, future::ok(AddrComponent::P2P(peer_id.into_bytes()).into()));
return future::Either::A(future::ok(ret));
}
}
@ -121,6 +126,9 @@ where
.into_future()
.and_then(move |(dial, connec)| dial.map(move |dial| (dial, connec)))
.and_then(move |((identify, original_addr), connec)| {
original_addr.map(move |a| (identify, a, connec))
})
.and_then(move |(identify, original_addr, connec)| {
// Compute the "real" address of the node (in the form `/p2p/...`) and
// add it to the peerstore.
let (observed, real_addr);
@ -142,7 +150,7 @@ where
debug!("Identified {} as {}", original_addr, real_addr);
let out = IdentifyTransportOutput { socket: connec, observed_addr: Some(observed) };
Ok((out, real_addr))
Ok((out, future::ok(real_addr)))
})
.map_err(move |err| {
debug!("Failed to identify incoming {}", client_addr);
@ -194,7 +202,7 @@ where
.and_then(move |(val, _)| {
match val {
Some((connec, inner_addr)) => {
debug!("Successfully dialed peer {:?} through {}", peer_id, inner_addr);
debug!("Successfully dialed peer {:?}", peer_id);
let out = IdentifyTransportOutput { socket: connec, observed_addr: None };
Ok((out, inner_addr))
},
@ -206,7 +214,7 @@ where
}
})
// Replace the multiaddress with the one of the form `/p2p/...` or `/ipfs/...`.
.map(move |(socket, _inner_client_addr)| (socket, addr));
.map(move |(socket, _inner_client_addr)| (socket, future::ok(addr)));
Ok(Box::new(future) as Box<_>)
}
@ -238,19 +246,20 @@ where
let future = dial.and_then(move |identify| {
// On success, store the information in the peerstore and compute the
// "real" address of the node (of the form `/p2p/...`).
let (real_addr, old_addr, observed);
match identify {
(IdentifyOutput::RemoteInfo { info, observed_addr }, a) => {
old_addr = a.clone();
observed = observed_addr;
real_addr = process_identify_info(&info, &*peerstore, a, addr_ttl)?;
a.and_then(move |old_addr| {
let real_addr = process_identify_info(&info, &*peerstore, old_addr.clone(), addr_ttl)?;
Ok((real_addr, old_addr, observed_addr))
})
}
_ => unreachable!(
"the identify protocol guarantees that we receive \
remote information when we dial a node"
),
};
}
})
.and_then(move |(real_addr, old_addr, observed)| {
// Then dial the same node again.
Ok(transport
.dial(old_addr)
@ -260,7 +269,7 @@ where
.into_future()
.map(move |(dial, _wrong_addr)| {
let out = IdentifyTransportOutput { socket: dial, observed_addr: Some(observed) };
(out, real_addr)
(out, future::ok(real_addr))
}))
}).flatten();
@ -283,7 +292,7 @@ where
for<'r> &'r PStore: Peerstore,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -293,7 +302,11 @@ where
let future = self.transport.next_incoming().map(move |incoming| {
let peerstore = peerstore.clone();
let future = incoming.and_then(move |(connec, client_addr)| {
let future = incoming
.and_then(move |(connec, client_addr)| {
client_addr.map(move |out| (connec, out))
})
.and_then(move |(connec, client_addr)| {
for peer_id in peerstore.peers() {
let peer = match peerstore.peer(&peer_id) {
Some(p) => p,
@ -303,7 +316,7 @@ where
if peer.addrs().any(|addr| addr == client_addr) {
debug!("Incoming substream from {} identified as {:?}", client_addr, peer_id);
let out = IdentifyTransportOutput { socket: connec, observed_addr: None };
let ret = (out, AddrComponent::P2P(peer_id.into_bytes()).into());
let ret = (out, future::ok(AddrComponent::P2P(peer_id.into_bytes()).into()));
return future::Either::A(future::ok(ret));
}
}
@ -322,20 +335,22 @@ where
.and_then(move |(identify, connec)| {
// Add the info to the peerstore and compute the "real" address of the
// node (in the form `/p2p/...`).
let (real_addr, observed);
match identify {
(IdentifyOutput::RemoteInfo { info, observed_addr }, old_addr) => {
observed = observed_addr;
real_addr = process_identify_info(&info, &*peerstore, old_addr, addr_ttl)?;
old_addr.and_then(move |out| {
let real_addr = process_identify_info(&info, &*peerstore, out, addr_ttl)?;
Ok((real_addr, observed_addr, connec))
})
}
_ => unreachable!(
"the identify protocol guarantees that we receive remote \
information when we dial a node"
),
};
}
})
.map(move |(real_addr, observed, connec)| {
let out = IdentifyTransportOutput { socket: connec, observed_addr: Some(observed) };
Ok((out, real_addr))
(out, future::ok(real_addr))
});
future::Either::B(future)
});
@ -426,8 +441,9 @@ mod tests {
}
impl Transport for UnderlyingTrans {
type Output = <TcpConfig as Transport>::Output;
type MultiaddrFuture = <TcpConfig as Transport>::MultiaddrFuture;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = <TcpConfig as Transport>::Dial;
#[inline]
fn listen_on(

View File

@ -247,22 +247,25 @@ impl KademliaUpgrade {
}
}
impl<C> ConnectionUpgrade<C> for KademliaUpgrade
impl<C, Maf> ConnectionUpgrade<C, Maf> for KademliaUpgrade
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :(
{
type Output = KademliaPeerReqStream;
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
ConnectionUpgrade::<C>::protocol_names(&self.upgrade)
ConnectionUpgrade::<C, Maf>::protocol_names(&self.upgrade)
}
#[inline]
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: &Multiaddr) -> Self::Future {
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: Maf) -> Self::Future {
let future = addr.and_then(move |addr| {
let inner = self.inner;
let client_addr = addr.clone();
@ -279,7 +282,7 @@ where
IoErrorKind::InvalidData,
"invalid peer ID sent by remote identification",
);
return Box::new(future::err(err));
return Box::new(future::err(err)) as Box<Future<Item = _, Error = _>>;
}
}
}
@ -291,9 +294,9 @@ where
}
};
let future = self.upgrade.upgrade(incoming, id, endpoint, addr).map(
move |(controller, stream)| {
match inner.connections.lock().entry(client_addr) {
let future = self.upgrade.upgrade(incoming, id, endpoint, future::ok::<_, IoError>(addr)).map(
move |((controller, stream), _)| {
match inner.connections.lock().entry(client_addr.clone()) {
Entry::Occupied(mut entry) => {
match entry.insert(Connection::Active(controller)) {
// If there was already an active connection to this remote, it gets
@ -356,10 +359,13 @@ where
}
}).filter_map(|val| val);
KademliaPeerReqStream { inner: Box::new(stream) }
(KademliaPeerReqStream { inner: Box::new(stream) }, future::ok(client_addr))
},
);
Box::new(future) as Box<_>
});
Box::new(future) as Box<_>
}
}
@ -486,7 +492,7 @@ where
// Need to open a connection.
let map = self.map.clone();
match self.swarm_controller
.dial(addr, self.kademlia_transport.clone().map(move |out, _, _| map(out)))
.dial(addr, self.kademlia_transport.clone().map(move |out, _| map(out)))
{
Ok(()) => (),
Err(_addr) => {

View File

@ -37,7 +37,6 @@ use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{future, Future, Sink, stream, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
use multiaddr::Multiaddr;
use protocol::{self, KadMsg, KademliaProtocolConfig, Peer};
use std::collections::VecDeque;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
@ -65,7 +64,7 @@ impl KademliaServerConfig {
}
}
impl<C> ConnectionUpgrade<C> for KademliaServerConfig
impl<C, Maf> ConnectionUpgrade<C, Maf> for KademliaServerConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
{
@ -73,20 +72,23 @@ where
KademliaServerController,
Box<Stream<Item = KademliaIncomingRequest, Error = IoError>>,
);
type Future = future::Map<<KademliaProtocolConfig as ConnectionUpgrade<C>>::Future, fn(<KademliaProtocolConfig as ConnectionUpgrade<C>>::Output) -> Self::Output>;
type MultiaddrFuture = Maf;
type Future = future::Map<<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Future, fn((<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Output, Maf)) -> (Self::Output, Maf)>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
ConnectionUpgrade::<C>::protocol_names(&self.raw_proto)
ConnectionUpgrade::<C, Maf>::protocol_names(&self.raw_proto)
}
#[inline]
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: &Multiaddr) -> Self::Future {
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: Maf) -> Self::Future {
self.raw_proto
.upgrade(incoming, id, endpoint, addr)
.map(build_from_sink_stream)
.map::<fn(_) -> _, _>(move |(connec, addr)| {
(build_from_sink_stream(connec), addr)
})
}
}

View File

@ -126,12 +126,13 @@ impl Into<protobuf_structs::dht::Message_Peer> for Peer {
#[derive(Debug, Default, Copy, Clone)]
pub struct KademliaProtocolConfig;
impl<C> ConnectionUpgrade<C> for KademliaProtocolConfig
impl<C, Maf> ConnectionUpgrade<C, Maf> for KademliaProtocolConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
{
type Output = KadStreamSink<C>;
type Future = future::FutureResult<Self::Output, IoError>;
type MultiaddrFuture = Maf;
type Future = future::FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -141,8 +142,8 @@ where
}
#[inline]
fn upgrade(self, incoming: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(kademlia_protocol(incoming))
fn upgrade(self, incoming: C, _: (), _: Endpoint, addr: Maf) -> Self::Future {
future::ok((kademlia_protocol(incoming), addr))
}
}

View File

@ -101,14 +101,13 @@ fn main() {
// outgoing connections for us.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(proto),
|socket, client_addr| {
println!("Successfully negotiated protocol with {}", client_addr);
|socket, _client_addr| {
println!("Successfully negotiated protocol");
// The type of `socket` is exactly what the closure of `SimpleProtocol` returns.
// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket
.into_future()
.map_err(|(e, _)| e)
@ -116,15 +115,14 @@ fn main() {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!(
"Received a message from {}: {:?}\n => Sending back \
identical message to remote",
client_addr, msg
"Received a message: {:?}\n => Sending back \
identical message to remote", msg
);
Box::new(rest.send(msg.freeze()).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection", client_addr);
println!("Received EOF\n => Dropping connection");
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}

View File

@ -100,8 +100,8 @@ fn main() {
// outgoing connections for us.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(floodsub_upgrade.clone()),
|socket, client_addr| {
println!("Successfully negotiated protocol with {}", client_addr);
|socket, _| {
println!("Successfully negotiated protocol");
socket
},
);

View File

@ -89,7 +89,7 @@ fn main() {
.into_connection_reuse();
let transport = libp2p::identify::IdentifyTransport::new(transport, peer_store.clone())
.map(|id_out, _, _| {
.map(|id_out, _| {
id_out.socket
});

View File

@ -172,10 +172,11 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box<Error>> {
.and_then(|out, endpoint, addr| {
match out {
libp2p::relay::Output::Sealed(future) => {
Either::A(future.map(Either::A))
Either::A(future.map(|out| (Either::A(out), Either::A(addr))))
}
libp2p::relay::Output::Stream(socket) => {
Either::B(upgrade::apply(socket, echo, endpoint, addr).map(Either::B))
Either::B(upgrade::apply(socket, echo, endpoint, addr)
.map(|(out, addr)| (Either::B(out), Either::B(addr))))
}
}
});
@ -183,7 +184,7 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box<Error>> {
let (control, future) = libp2p::core::swarm(upgraded, |out, _| {
match out {
Either::A(()) => Either::A(future::ok(())),
Either::B((socket, _)) => Either::B(loop_fn(socket, move |socket| {
Either::B(socket) => Either::B(loop_fn(socket, move |socket| {
socket.into_future()
.map_err(|(e, _)| e)
.and_then(move |(msg, socket)| {

View File

@ -97,6 +97,7 @@ impl CommonTransport {
impl Transport for CommonTransport {
type Output = <InnerImplementation as Transport>::Output;
type MultiaddrFuture = <InnerImplementation as Transport>::MultiaddrFuture;
type Listener = <InnerImplementation as Transport>::Listener;
type ListenerUpgrade = <InnerImplementation as Transport>::ListenerUpgrade;
type Dial = <InnerImplementation as Transport>::Dial;

View File

@ -20,8 +20,7 @@
use bytes::Bytes;
use core::upgrade::{ConnectionUpgrade, Endpoint};
use futures::{future::FromErr, prelude::*};
use multiaddr::Multiaddr;
use futures::prelude::*;
use std::{iter, io::Error as IoError, sync::Arc};
use tokio_io::{AsyncRead, AsyncWrite};
@ -58,11 +57,13 @@ impl<F> Clone for SimpleProtocol<F> {
}
}
impl<C, F, O> ConnectionUpgrade<C> for SimpleProtocol<F>
impl<C, F, O, Maf> ConnectionUpgrade<C, Maf> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
O: IntoFuture<Error = IoError>,
O::Future: 'static,
Maf: 'static,
{
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -73,11 +74,13 @@ where
}
type Output = O::Item;
type Future = FromErr<O::Future, IoError>;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item = (O::Item, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn upgrade(self, socket: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
fn upgrade(self, socket: C, _: (), _: Endpoint, client_addr: Maf) -> Self::Future {
let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err()
let fut = upgrade(socket).into_future().from_err().map(move |out| (out, client_addr));
Box::new(fut) as Box<_>
}
}

View File

@ -53,7 +53,7 @@ use std::iter;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use swarm::muxing::StreamMuxer;
use swarm::{ConnectionUpgrade, Endpoint, Multiaddr};
use swarm::{ConnectionUpgrade, Endpoint};
use tokio_io::{AsyncRead, AsyncWrite};
use write::write_stream;
@ -366,18 +366,19 @@ impl<Buf: Array> BufferedMultiplexConfig<Buf> {
}
}
impl<C, Buf: Array> ConnectionUpgrade<C> for BufferedMultiplexConfig<Buf>
impl<C, Maf, Buf: Array> ConnectionUpgrade<C, Maf> for BufferedMultiplexConfig<Buf>
where
C: AsyncRead + AsyncWrite,
{
type Output = BufferedMultiplex<C, Buf>;
type Future = FutureResult<BufferedMultiplex<C, Buf>, io::Error>;
type MultiaddrFuture = Maf;
type Future = FutureResult<(BufferedMultiplex<C, Buf>, Maf), io::Error>;
type UpgradeIdentifier = ();
type NamesIter = iter::Once<(Bytes, ())>;
#[inline]
fn upgrade(self, i: C, _: (), end: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(BufferedMultiplex::new(i, end))
fn upgrade(self, i: C, _: (), end: Endpoint, remote_addr: Maf) -> Self::Future {
future::ok((BufferedMultiplex::new(i, end), remote_addr))
}
#[inline]

View File

@ -92,8 +92,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use futures::future::{loop_fn, FutureResult, IntoFuture, Loop};
use futures::sync::{mpsc, oneshot};
use futures::{Future, Sink, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr};
use log::Level;
use libp2p_core::{ConnectionUpgrade, Endpoint};
use parking_lot::Mutex;
use rand::Rand;
use rand::os::OsRng;
@ -112,7 +111,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug, Copy, Clone, Default)]
pub struct Ping;
impl<C> ConnectionUpgrade<C> for Ping
impl<C, Maf> ConnectionUpgrade<C, Maf> for Ping
where
C: AsyncRead + AsyncWrite + 'static,
{
@ -125,7 +124,8 @@ where
}
type Output = (Pinger, Box<Future<Item = (), Error = IoError>>);
type Future = FutureResult<Self::Output, IoError>;
type MultiaddrFuture = Maf;
type Future = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
#[inline]
fn upgrade(
@ -133,7 +133,7 @@ where
socket: C,
_: Self::UpgradeIdentifier,
_: Endpoint,
remote_addr: &Multiaddr,
remote_addr: Maf,
) -> Self::Future {
// # How does it work?
//
@ -165,15 +165,8 @@ where
.map(|msg| Message::Received(msg.freeze()));
let (sink, stream) = sink_stream.split();
let remote_addr = if log_enabled!(Level::Debug) {
Some(remote_addr.clone())
} else {
None
};
let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| {
let expected_pongs = expected_pongs.clone();
let remote_addr = remote_addr.clone();
stream
.into_future()
@ -185,11 +178,7 @@ where
match message {
Message::Ping(payload, finished) => {
// Ping requested by the user through the `Pinger`.
if log_enabled!(Level::Debug) {
debug!("Sending ping to {:?} with payload {:?}",
remote_addr.expect("debug log level is enabled"),
payload);
}
debug!("Sending ping with payload {:?}", payload);
expected_pongs.insert(payload.clone(), finished);
Box::new(
@ -204,15 +193,13 @@ where
// Payload was ours. Signalling future.
// Errors can happen if the user closed the receiving end of
// the future, which is fine to ignore.
debug!("Received pong from {:?} (payload={:?}) ; ping fufilled",
remote_addr.expect("debug log level is enabled"), payload);
debug!("Received pong (payload={:?}) ; ping fufilled", payload);
let _ = fut.send(());
Box::new(Ok(Loop::Continue((sink, stream))).into_future())
as Box<Future<Item = _, Error = _>>
} else {
// Payload was not ours. Sending it back.
debug!("Received ping from {:?} (payload={:?}) ; sending back",
remote_addr.expect("debug log level is enabled"), payload);
debug!("Received ping (payload={:?}) ; sending back", payload);
Box::new(
sink.send(payload)
.map(|sink| Loop::Continue((sink, stream))),
@ -228,7 +215,7 @@ where
})
});
Ok((pinger, Box::new(future) as Box<_>)).into_future()
Ok(((pinger, Box::new(future) as Box<_>), remote_addr)).into_future()
}
}
@ -305,8 +292,9 @@ mod tests {
use super::Ping;
use futures::Future;
use futures::Stream;
use futures::future::join_all;
use libp2p_core::{ConnectionUpgrade, Endpoint};
use futures::future::{self, join_all};
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr};
use std::io::Error as IoError;
#[test]
fn ping_pong() {
@ -324,10 +312,10 @@ mod tests {
c.unwrap().0,
(),
Endpoint::Listener,
&"/ip4/127.0.0.1/tcp/10000".parse().unwrap(),
future::ok::<Multiaddr, IoError>("/ip4/127.0.0.1/tcp/10000".parse().unwrap()),
)
})
.and_then(|(mut pinger, service)| {
.and_then(|((mut pinger, service), _)| {
pinger
.ping()
.map_err(|_| panic!())
@ -342,10 +330,10 @@ mod tests {
c,
(),
Endpoint::Dialer,
&"/ip4/127.0.0.1/tcp/10000".parse().unwrap(),
future::ok::<Multiaddr, IoError>("/ip4/127.0.0.1/tcp/10000".parse().unwrap()),
)
})
.and_then(|(mut pinger, service)| {
.and_then(|((mut pinger, service), _)| {
pinger
.ping()
.map_err(|_| panic!())
@ -373,10 +361,10 @@ mod tests {
c.unwrap().0,
(),
Endpoint::Listener,
&"/ip4/127.0.0.1/tcp/10000".parse().unwrap(),
future::ok::<Multiaddr, IoError>("/ip4/127.0.0.1/tcp/10000".parse().unwrap()),
)
})
.and_then(|(_, service)| service.map_err(|_| panic!()));
.and_then(|((_, service), _)| service.map_err(|_| panic!()));
let client = TcpStream::connect(&listener_addr, &core.handle())
.map_err(|e| e.into())
@ -385,10 +373,10 @@ mod tests {
c,
(),
Endpoint::Dialer,
&"/ip4/127.0.0.1/tcp/1000".parse().unwrap(),
future::ok::<Multiaddr, IoError>("/ip4/127.0.0.1/tcp/10000".parse().unwrap()),
)
})
.and_then(|(mut pinger, service)| {
.and_then(|((mut pinger, service), _)| {
let pings = (0..20).map(move |_| pinger.ping().map_err(|_| ()));
join_all(pings)

View File

@ -142,7 +142,7 @@ where
T: Transport + 'static,
T::Output: AsyncRead + AsyncWrite,
{
type Item = (Connection<T::Output>, Multiaddr);
type Item = (Connection<T::Output>, T::MultiaddrFuture);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -159,9 +159,10 @@ where
T::Output: AsyncRead + AsyncWrite,
{
type Output = Connection<T::Output>;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = Listener<T>;
type ListenerUpgrade = ListenerUpgrade<T>;
type Dial = Box<Future<Item = (Connection<T::Output>, Multiaddr), Error = io::Error>>;
type Dial = Box<Future<Item = (Connection<T::Output>, Self::MultiaddrFuture), Error = io::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where

View File

@ -22,7 +22,6 @@ use bytes::Bytes;
use core::{ConnectionUpgrade, Endpoint, Transport};
use futures::{stream, future::{self, Either::{A, B}, FutureResult}, prelude::*};
use message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Status, CircuitRelay_Type};
use multiaddr::Multiaddr;
use peerstore::{PeerAccess, PeerId, Peerstore};
use std::{io, iter, ops::Deref};
use tokio_io::{io as aio, AsyncRead, AsyncWrite};
@ -49,13 +48,14 @@ pub enum Output<C> {
Sealed(Box<Future<Item=(), Error=io::Error>>)
}
impl<C, T, P, S> ConnectionUpgrade<C> for RelayConfig<T, P>
impl<C, T, P, S, Maf> ConnectionUpgrade<C, Maf> for RelayConfig<T, P>
where
C: AsyncRead + AsyncWrite + 'static,
T: Transport + Clone + 'static,
T::Output: AsyncRead + AsyncWrite,
P: Deref<Target=S> + Clone + 'static,
S: 'static,
Maf: 'static,
for<'a> &'a S: Peerstore
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
@ -66,9 +66,10 @@ where
}
type Output = Output<C>;
type Future = Box<Future<Item=Self::Output, Error=io::Error>>;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item=(Self::Output, Maf), Error=io::Error>>;
fn upgrade(self, conn: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
fn upgrade(self, conn: C, _: (), _: Endpoint, remote_addr: Maf) -> Self::Future {
let future = Io::new(conn).recv().and_then(move |(message, io)| {
let msg = if let Some(m) = message {
m
@ -89,7 +90,7 @@ where
}
}
});
Box::new(future)
Box::new(future.map(move |out| (out, remote_addr)))
}
}
@ -241,7 +242,7 @@ fn stop_message(from: &Peer, dest: &Peer) -> CircuitRelay {
#[derive(Debug, Clone)]
struct TrivialUpgrade;
impl<C> ConnectionUpgrade<C> for TrivialUpgrade
impl<C, Maf> ConnectionUpgrade<C, Maf> for TrivialUpgrade
where
C: AsyncRead + AsyncWrite + 'static
{
@ -253,19 +254,21 @@ where
}
type Output = C;
type Future = FutureResult<Self::Output, io::Error>;
type MultiaddrFuture = Maf;
type Future = FutureResult<(Self::Output, Maf), io::Error>;
fn upgrade(self, conn: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(conn)
fn upgrade(self, conn: C, _: (), _: Endpoint, remote_addr: Maf) -> Self::Future {
future::ok((conn, remote_addr))
}
}
#[derive(Debug, Clone)]
pub(crate) struct Source(pub(crate) CircuitRelay);
impl<C> ConnectionUpgrade<C> for Source
impl<C, Maf> ConnectionUpgrade<C, Maf> for Source
where
C: AsyncRead + AsyncWrite + 'static,
Maf: 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
@ -275,9 +278,10 @@ where
}
type Output = C;
type Future = Box<Future<Item=Self::Output, Error=io::Error>>;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item=(Self::Output, Maf), Error=io::Error>>;
fn upgrade(self, conn: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
fn upgrade(self, conn: C, _: (), _: Endpoint, remote_addr: Maf) -> Self::Future {
let future = Io::new(conn)
.send(self.0)
.and_then(Io::recv)
@ -292,7 +296,7 @@ where
Err(io_err("no success response from relay"))
}
});
Box::new(future)
Box::new(future.map(move |out| (out, remote_addr)))
}
}

View File

@ -46,9 +46,10 @@ where
for<'a> &'a S: Peerstore
{
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error>>;
type ListenerUpgrade = Box<Future<Item=(Self::Output, Multiaddr), Error=io::Error>>;
type Dial = Box<Future<Item=(Self::Output, Multiaddr), Error=io::Error>>;
type ListenerUpgrade = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error>>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((self, addr))
@ -105,7 +106,7 @@ where
}
// Relay to destination over any available relay node.
fn relay_to(self, destination: &Peer) -> Result<impl Future<Item=(T::Output, Multiaddr), Error=io::Error>, Self> {
fn relay_to(self, destination: &Peer) -> Result<impl Future<Item=(T::Output, T::MultiaddrFuture), Error=io::Error>, Self> {
trace!("relay_to {:?}", destination.id);
let mut dials = Vec::new();
for relay in &*self.relays {
@ -143,7 +144,7 @@ where
}
// Relay to destination via the given peer.
fn relay_via(self, relay: &Peer, destination: &Peer) -> Result<impl Future<Item=(T::Output, Multiaddr), Error=io::Error>, Self> {
fn relay_via(self, relay: &Peer, destination: &Peer) -> Result<impl Future<Item=(T::Output, T::MultiaddrFuture), Error=io::Error>, Self> {
trace!("relay_via {:?} to {:?}", relay.id, destination.id);
let mut addresses = Vec::new();
@ -175,7 +176,7 @@ where
.map_err(|(err, _stream)| err)
.and_then(move |(ok, _stream)| match ok {
Some((out, addr)) => {
debug!("connected to {:?}", addr);
debug!("connected");
Ok((out, addr))
}
None => {

View File

@ -99,7 +99,7 @@ pub use self::error::SecioError;
use bytes::{Bytes, BytesMut};
use futures::stream::MapErr as StreamMapErr;
use futures::{Future, Poll, Sink, StartSend, Stream};
use libp2p_core::{Multiaddr, PeerId, PublicKeyBytes, PublicKeyBytesSlice};
use libp2p_core::{PeerId, PublicKeyBytes, PublicKeyBytesSlice};
use ring::signature::{Ed25519KeyPair, RSAKeyPair};
use ring::rand::SystemRandom;
use rw_stream_sink::RwStreamSink;
@ -285,15 +285,17 @@ impl From<SecioPublicKey> for PublicKeyBytes {
}
}
impl<S> libp2p_core::ConnectionUpgrade<S> for SecioConfig
impl<S, Maf> libp2p_core::ConnectionUpgrade<S, Maf> for SecioConfig
where
S: AsyncRead + AsyncWrite + 'static,
S: AsyncRead + AsyncWrite + 'static, // TODO: 'static :(
Maf: 'static, // TODO: 'static :(
{
type Output = (
RwStreamSink<StreamMapErr<SecioMiddleware<S>, fn(SecioError) -> IoError>>,
SecioPublicKey,
);
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item = (Self::Output, Maf), Error = IoError>>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -308,16 +310,16 @@ where
incoming: S,
_: (),
_: libp2p_core::Endpoint,
remote_addr: &Multiaddr,
remote_addr: Maf,
) -> Self::Future {
debug!("Starting secio upgrade with {:?}", remote_addr);
debug!("Starting secio upgrade");
let fut = SecioMiddleware::handshake(incoming, self.key);
let wrapped = fut.map(|(stream_sink, pubkey)| {
let mapped = stream_sink.map_err(map_err as fn(_) -> _);
(RwStreamSink::new(mapped), pubkey)
}).map_err(map_err);
Box::new(wrapped)
Box::new(wrapped.map(move |out| (out, remote_addr)))
}
}

View File

@ -57,7 +57,7 @@ extern crate multiaddr;
extern crate tokio_core;
extern crate tokio_io;
use futures::future::{self, Future, FutureResult, IntoFuture};
use futures::future::{self, Future, FutureResult};
use futures::stream::Stream;
use multiaddr::{AddrComponent, Multiaddr, ToMultiaddr};
use std::io::Error as IoError;
@ -89,8 +89,9 @@ impl TcpConfig {
impl Transport for TcpConfig {
type Output = TcpStream;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = FutureResult<(Self::Output, Multiaddr), IoError>;
type Dial = Box<Future<Item = (TcpStream, Multiaddr), Error = IoError>>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
type Dial = Box<Future<Item = (TcpStream, Self::MultiaddrFuture), Error = IoError>>;
/// Listen on the given multi-addr.
/// Returns the address back if it isn't supported.
@ -120,7 +121,7 @@ impl Transport for TcpConfig {
let addr = addr.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails");
debug!("Incoming connection from {}", addr);
Ok((sock, addr)).into_future()
future::ok((sock, future::ok(addr)))
})
})
.flatten_stream();
@ -136,7 +137,7 @@ impl Transport for TcpConfig {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
debug!("Dialing {}", addr);
let fut = TcpStream::connect(&socket_addr, &self.event_loop).map(|t| (t, addr));
let fut = TcpStream::connect(&socket_addr, &self.event_loop).map(|t| (t, future::ok(addr)));
Ok(Box::new(fut) as Box<_>)
} else {
Err((self, addr))

View File

@ -20,7 +20,7 @@
use futures::stream::Then as StreamThen;
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Stream};
use futures::{Async, future, Future, Poll, Stream, future::FutureResult};
use multiaddr::{AddrComponent, Multiaddr};
use rw_stream_sink::RwStreamSink;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
@ -53,9 +53,10 @@ impl BrowserWsConfig {
impl Transport for BrowserWsConfig {
type Output = BrowserWsConn;
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>; // TODO: use `!`
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>; // TODO: use `!`
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>; // TODO: use `!`
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
#[inline]
fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -194,7 +195,7 @@ impl Transport for BrowserWsConfig {
Ok(Box::new(open_rx.then(|result| {
match result {
Ok(Ok(r)) => Ok((r, original_addr)),
Ok(Ok(r)) => Ok((r, future::ok(original_addr))),
Ok(Err(e)) => Err(e),
// `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by
// the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself

View File

@ -63,10 +63,11 @@ where
T::Output: AsyncRead + AsyncWrite + Send,
{
type Output = Box<AsyncStream>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = IoError>>;
type Listener =
stream::Map<T::Listener, fn(<T as Transport>::ListenerUpgrade) -> Self::ListenerUpgrade>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
fn listen_on(
self,
@ -98,10 +99,13 @@ where
let listen = inner_listen.map::<_, fn(_) -> _>(|stream| {
// Upgrade the listener to websockets like the websockets library requires us to do.
let upgraded = stream.and_then(|(stream, mut client_addr)| {
let upgraded = stream.and_then(|(stream, client_addr)| {
// Need to suffix `/ws` to each client address.
client_addr.append(AddrComponent::WS);
debug!("Incoming connection from {}", client_addr);
let client_addr = client_addr.map(|mut addr| {
addr.append(AddrComponent::WS);
addr
});
debug!("Incoming connection");
stream
.into_ws()
@ -140,7 +144,7 @@ where
.map(|s| Box::new(Ok(s).into_future()) as Box<Future<Item = _, Error = _>>)
.into_future()
.flatten()
.map(move |v| (v, client_addr))
.map(move |v| (v, Box::new(client_addr) as Box<Future<Item = _, Error = _>>))
});
Box::new(upgraded) as Box<Future<Item = _, Error = _>>
@ -183,6 +187,15 @@ where
let dial = inner_dial
.into_future()
.and_then(move |(connec, client_addr)| {
let client_addr = Box::new(client_addr.map(move |mut addr| {
if is_wss {
addr.append(AddrComponent::WSS);
} else {
addr.append(AddrComponent::WS);
};
addr
})) as Box<Future<Item = _, Error = _>>;
ClientBuilder::new(&ws_addr)
.expect("generated ws address is always valid")
.async_connect_on(connec)
@ -209,13 +222,7 @@ where
Box::new(read_write) as Box<AsyncStream>
})
.map(move |c| {
let mut actual_addr = client_addr;
if is_wss {
actual_addr.append(AddrComponent::WSS);
} else {
actual_addr.append(AddrComponent::WS);
};
(c, actual_addr)
(c, client_addr)
})
});