Merge pull request #81 from tomaka/muxed-transport-change

Change the MuxedTransport trait
This commit is contained in:
Fredrik Harrysson 2018-01-03 17:05:47 +01:00 committed by GitHub
commit cba24ea39e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 171 additions and 194 deletions

View File

@ -30,7 +30,7 @@ extern crate tokio_io;
use bytes::BytesMut;
use futures::{Future, Sink, Stream};
use std::env;
use swarm::{UpgradeExt, SimpleProtocol, Transport};
use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited;
@ -68,8 +68,10 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse()
.into_connection_reuse();
let transport_with_echo = transport
.clone()
// On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol
// just for this example.
// For this purpose, we create a `SimpleProtocol` struct.
@ -86,34 +88,23 @@ fn main() {
// of any opened stream.
// We use it to dial the address.
let dialer = transport
.dial_and_listen(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
let dialer = transport_with_echo
.dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr"))
// If the multiaddr protocol exists but is not supported, then we get an error containing
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()`
// or `expect()`, but have to add a `map_err()` beforehand.
.map_err(|(_, addr)| addr).expect("unsupported multiaddr")
.and_then(|(incoming, echo)| {
.and_then(|echo| {
// `echo` is what the closure used when initializing "echo" returns.
// Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method.
println!("Sending \"hello world\" to listener");
echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some))
.select(
incoming
.for_each(|_| {
println!("opened");
Ok(())
})
.map(|()| None),
)
.map(|(n, _)| n)
.map_err(|(e, _)| e)
echo.send("hello world".into())
})
.and_then(|echo| {
// The message has been successfully sent. Now wait for an answer.
echo.unwrap()
.into_future()
echo.into_future()
.map(|(msg, rest)| {
println!("Received message from listener: {:?}", msg);
rest
@ -124,5 +115,6 @@ fn main() {
// `dialer` is a future that contains all the behaviour that we want, but nothing has actually
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future
// through the tokio core.
core.run(dialer).unwrap();
core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(()))))
.unwrap_or_else(|_| panic!());
}

View File

@ -8,6 +8,7 @@ bytes = "0.4"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = { version = "0.1", features = ["use_std"] }
parking_lot = "0.5.3"
smallvec = "0.5"
tokio-io = "0.1"

View File

@ -37,20 +37,23 @@
//!
//! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has
//! already been opened earlier, and open an outgoing substream on it. If none is available, it
//! will dial the given multiaddress.
//! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with
//! us. In order to handle these new substreams you should use the `next_incoming` method of the
//! `MuxedTransport` trait.
//! TODO: this raises several questions ^
//!
//! TODO: this whole code is a dummy and should be rewritten after the design has been properly
//! figured out.
use futures::{Async, Future, Poll, Stream};
use futures::future::{IntoFuture, FutureResult};
use futures::future::{self, IntoFuture, FutureResult};
use futures::{stream, Async, Future, Poll, Stream, task};
use futures::stream::Fuse as StreamFuse;
use futures::stream;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::io::Error as IoError;
use std::sync::Arc;
use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode};
/// Allows reusing the same muxed connection multiple times.
@ -66,6 +69,14 @@ where
{
// Underlying transport and connection upgrade for when we need to dial or listen.
inner: UpgradedNode<T, C>,
shared: Arc<Mutex<Shared<C::Output>>>,
}
struct Shared<O> {
// List of futures to dialed connections.
incoming: Vec<Box<Stream<Item = (O, Multiaddr), Error = future::SharedError<Mutex<Option<IoError>>>>>>,
// Tasks to signal when an element is added to `incoming`. Only used when `incoming` is empty.
to_signal: Vec<task::Task>,
}
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
@ -75,7 +86,13 @@ where
{
#[inline]
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
ConnectionReuse { inner: node }
ConnectionReuse {
inner: node,
shared: Arc::new(Mutex::new(Shared {
incoming: Vec::new(),
to_signal: Vec::new(),
})),
}
}
}
@ -96,7 +113,7 @@ where
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
Ok((l, a)) => (l, a),
Err((inner, addr)) => {
return Err((ConnectionReuse { inner: inner }, addr));
return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr));
}
};
@ -110,14 +127,30 @@ where
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let dial = match self.inner.dial(addr) {
let dial = match self.inner.dial(addr.clone()) {
Ok(l) => l,
Err((inner, addr)) => {
return Err((ConnectionReuse { inner: inner }, addr));
return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr));
}
};
let future = dial.and_then(|dial| dial.outbound());
let dial = dial
.map_err::<fn(IoError) -> Mutex<Option<IoError>>, _>(|err| Mutex::new(Some(err)))
.shared();
let ingoing = dial.clone()
.map(|muxer| stream::repeat(muxer))
.flatten_stream()
.map(move |muxer| ((&*muxer).clone(), addr.clone()));
let mut lock = self.shared.lock();
lock.incoming.push(Box::new(ingoing) as Box<_>);
for task in lock.to_signal.drain(..) { task.notify(); }
drop(lock);
let future = dial
.map_err(|err| err.lock().take().expect("error can only be extracted once"))
.and_then(|dial| (&*dial).clone().outbound());
Ok(Box::new(future) as Box<_>)
}
}
@ -130,29 +163,15 @@ where
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone, // TODO: not elegant
{
type Incoming = stream::AndThen<
stream::Repeat<C::Output, IoError>,
fn(C::Output)
-> <<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::InboundSubstream,
<<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::InboundSubstream,
>;
type Outgoing =
<<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::OutboundSubstream;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
type Incoming = Box<Future<Item = (<C::Output as StreamMuxer>::Substream, Multiaddr), Error = IoError>>;
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
self.inner
.dial(addr)
.map_err(|(inner, addr)| (ConnectionReuse { inner: inner }, addr))
.map(|fut| {
fut.map(|muxer| {
(
stream::repeat(muxer.clone()).and_then(StreamMuxer::inbound as fn(_) -> _),
muxer.outbound(),
)
})
})
.map(|fut| Box::new(fut) as _)
#[inline]
fn next_incoming(self) -> Self::Incoming {
let future = ConnectionReuseIncoming { shared: self.shared.clone() }
.and_then(|(out, addr)| {
out.inbound().map(|o| (o, addr))
});
Box::new(future) as Box<_>
}
}
@ -253,3 +272,50 @@ where S: Stream<Item = (F, Multiaddr), Error = IoError>,
Ok(Async::NotReady)
}
}
/// Implementation of `Future<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
pub struct ConnectionReuseIncoming<O> {
shared: Arc<Mutex<Shared<O>>>,
}
impl<O> Future for ConnectionReuseIncoming<O>
where O: Clone
{
type Item = (O, Multiaddr);
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut lock = self.shared.lock();
let mut to_remove = SmallVec::<[_; 8]>::new();
let mut ret_value = None;
for (offset, future) in lock.incoming.iter_mut().enumerate() {
match future.poll() {
Ok(Async::Ready(Some((value, addr)))) => {
ret_value = Some((value.clone(), addr));
break;
},
Ok(Async::Ready(None)) => {
to_remove.push(offset);
},
Ok(Async::NotReady) => {},
Err(_) => {
to_remove.push(offset);
},
}
}
for offset in to_remove.into_iter().rev() {
lock.incoming.swap_remove(offset);
}
if let Some(ret_value) = ret_value {
Ok(Async::Ready(ret_value))
} else {
lock.to_signal.push(task::current());
Ok(Async::NotReady)
}
}
}

View File

@ -48,11 +48,12 @@
//! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
//! transports that can receive incoming connections on streams that have been opened with `dial()`.
//!
//! The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of
//! incoming connections.
//! The trait provides the `next_incoming()` method, which returns a future that will resolve to
//! the next substream that arrives from a dialed node.
//!
//! > **Note**: This trait is mainly implemented for transports that provide stream muxing
//! > capabilities.
//! > capabilities, but it can also be implemented in a dummy way by returning an empty
//! > iterator.
//!
//! # Connection upgrades
//!
@ -78,7 +79,7 @@
//! `Transport` trait. The return value of this method also implements the `Transport` trait, which
//! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
//! upgraded connection or a listener that will yield upgraded connections. Similarly, the
//! `dial_and_listen()` method will automatically apply the upgrade on both the dialer and the
//! `next_incoming()` method will automatically apply the upgrade on both the dialer and the
//! listener. An error is produced if the remote doesn't support the protocol corresponding to the
//! connection upgrade.
//!
@ -123,7 +124,7 @@
//! transport.
//!
//! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
//! `dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`,
//! `dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`,
//! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
//! way to use the protocol.
//!
@ -167,6 +168,7 @@ extern crate bytes;
#[macro_use]
extern crate futures;
extern crate multistream_select;
extern crate parking_lot;
extern crate smallvec;
extern crate tokio_io;

View File

@ -31,7 +31,7 @@
use bytes::Bytes;
use connection_reuse::ConnectionReuse;
use futures::{Async, Poll, Stream};
use futures::{Async, Poll, stream, Stream};
use futures::future::{self, FromErr, Future, FutureResult, IntoFuture};
use multiaddr::Multiaddr;
use multistream_select;
@ -123,21 +123,24 @@ pub trait Transport {
/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which
/// the dialed node can dial you back.
pub trait MuxedTransport: Transport {
/// Produces substreams on the dialed connection.
type Incoming: Stream<Item = Self::RawConn, Error = IoError>;
/// Future resolving to an incoming connection.
type Incoming: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
/// Future resolving to an outgoing connection
type Outgoing: Future<Item = Self::RawConn, Error = IoError>;
/// Future resolving to a tuple of `(Incoming, Outgoing)`
type DialAndListen: Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>;
/// Dial to the given multi-addr, and listen to incoming substreams on the dialed connection.
/// Returns the next incoming substream opened by a node that we dialed ourselves.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)>
where
Self: Sized;
/// > **Note**: Doesn't produce incoming substreams coming from addresses we are listening on.
/// > This only concerns nodes that we dialed with `dial()`.
fn next_incoming(self) -> Self::Incoming
where Self: Sized;
/// Returns a stream of incoming connections.
#[inline]
fn incoming(self) -> stream::AndThen<stream::Repeat<Self, IoError>, fn(Self) -> Self::Incoming,
Self::Incoming>
where Self: Sized + Clone
{
stream::repeat(self).and_then(|me| me.next_incoming())
}
}
/// Dummy implementation of `Transport` that just denies every single attempt.
@ -163,14 +166,11 @@ impl Transport for DeniedTransport {
}
impl MuxedTransport for DeniedTransport {
// TODO: could use `!` once stable
type Incoming = Box<Stream<Item = Self::RawConn, Error = IoError>>;
type Outgoing = Box<Future<Item = Self::RawConn, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
type Incoming = future::Empty<(Self::RawConn, Multiaddr), IoError>;
#[inline]
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
Err((DeniedTransport, addr))
fn next_incoming(self) -> Self::Incoming {
future::empty()
}
}
@ -251,38 +251,19 @@ impl<A, B> MuxedTransport for OrTransport<A, B>
where
A: MuxedTransport,
B: MuxedTransport,
A::DialAndListen: 'static,
B::DialAndListen: 'static,
A::Incoming: 'static, // TODO: meh :-/
B::Incoming: 'static, // TODO: meh :-/
{
type Incoming = EitherIncomingStream<A::Incoming, B::Incoming>;
type Outgoing = future::Either<
future::Map<A::Outgoing, fn(A::RawConn) -> Self::RawConn>,
future::Map<B::Outgoing, fn(B::RawConn) -> Self::RawConn>,
>;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
type Incoming = Box<Future<Item = (EitherSocket<A::RawConn, B::RawConn>, Multiaddr), Error = IoError>>;
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
let (first, addr) = match self.0.dial_and_listen(addr) {
Ok(connec) => {
return Ok(Box::new(connec.map(|(inc, out)| {
(
EitherIncomingStream::First(inc),
future::Either::A(out.map(EitherSocket::First as fn(_) -> _)),
)
})));
}
Err(err) => err,
};
match self.1.dial_and_listen(addr) {
Ok(connec) => Ok(Box::new(connec.map(|(inc, out)| {
(
EitherIncomingStream::Second(inc),
future::Either::B(out.map(EitherSocket::Second as fn(_) -> _)),
)
}))),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
#[inline]
fn next_incoming(self) -> Self::Incoming {
let first = self.0.next_incoming().map(|(out, addr)| (EitherSocket::First(out), addr));
let second = self.1.next_incoming().map(|(out, addr)| (EitherSocket::Second(out), addr));
let future = first.select(second)
.map(|(i, _)| i)
.map_err(|(e, _)| e);
Box::new(future) as Box<_>
}
}
@ -765,93 +746,31 @@ where
Ok(Box::new(future))
}
/// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade
/// the connection. Also listens to incoming substream requires on that dialed connection, and
/// automatically upgrades the incoming substreams.
/// If the underlying transport is a `MuxedTransport`, then after calling `dial` we may receive
/// substreams opened by the dialed nodes.
///
/// Note that this does the same as `MuxedTransport::dial_and_listen`, but with less
/// restrictions on the trait requirements.
pub fn dial_and_listen(
self,
addr: Multiaddr,
) -> Result<
Box<
Future<
Item = (
Box<Stream<Item = C::Output, Error = IoError> + 'a>,
Box<Future<Item = C::Output, Error = IoError> + 'a>,
),
Error = IoError,
>
+ 'a,
>,
(Self, Multiaddr),
>
where
T: MuxedTransport,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
/// This function returns the next incoming substream. You are strongly encouraged to call it
/// if you have a muxed transport.
pub fn next_incoming(self) -> Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>
where T: MuxedTransport
{
let upgrade = self.upgrade;
let upgrade2 = upgrade.clone();
self.transports
.dial_and_listen(addr)
.map_err(move |(trans, addr)| {
let builder = UpgradedNode {
transports: trans,
upgrade: upgrade,
};
let future = self.transports.next_incoming()
// Try to negotiate the protocol.
.and_then(move |(connection, addr)| {
let iter = upgrade.protocol_names()
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
let negotiated = multistream_select::dialer_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
})
.and_then(|(upgrade_id, connection, upgrade, addr)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer)
.map(|u| (u, addr))
});
(builder, addr)
})
.map(move |dialed_fut| {
let dialed_fut = dialed_fut
// Try to negotiate the protocol.
.map(move |(in_stream, dialer)| {
let upgrade = upgrade2.clone();
let dialer = {
let iter = upgrade2.protocol_names()
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
let negotiated = dialer.and_then(|dialer| {
multistream_select::dialer_select_proto(dialer, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
});
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2))
}
.and_then(|(upgrade_id, connection, upgrade)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer)
});
let in_stream = in_stream
// Try to negotiate the protocol.
.and_then(move |connection| {
let upgrade = upgrade.clone();
let iter = upgrade.protocol_names()
.map((|(n, t)| {
(n, <Bytes as PartialEq>::eq, t)
}) as fn(_) -> _);
let negotiated = multistream_select::listener_select_proto(
connection,
iter,
);
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade))
.map_err(|err| IoError::new(IoErrorKind::Other, err))
})
.and_then(|(upgrade_id, connection, upgrade)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
});
(
Box::new(in_stream) as Box<Stream<Item = _, Error = _>>,
Box::new(dialer) as Box<Future<Item = _, Error = _>>,
)
});
Box::new(dialed_fut) as _
})
Box::new(future) as Box<_>
}
/// Start listening on the multiaddr using the transport that was passed to `new`.
@ -944,13 +863,10 @@ where
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type Incoming = Box<Stream<Item = C::Output, Error = IoError>>;
type Outgoing = Box<Future<Item = Self::RawConn, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
type Incoming = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
#[inline]
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
// Calls an inherent function above
self.dial_and_listen(addr)
fn next_incoming(self) -> Self::Incoming {
self.next_incoming()
}
}