Rework the StreamMuxer trait (#437)

* Stronger typing for the swarm handler future
* The swarm future is now a swarm stream of events
* Rewrite StreamMuxer in core
* Update libp2p_mplex for the new stream muxer
* Update yamux for new stream muxer
This commit is contained in:
Pierre Krieger
2018-08-31 10:31:34 +02:00
committed by Benjamin Kampmann
parent c02dea8128
commit 0e1483f02e
8 changed files with 573 additions and 301 deletions

View File

@ -45,7 +45,7 @@ use futures::{Async, Future, Poll, Stream};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::sync::mpsc; use futures::sync::mpsc;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use muxing::StreamMuxer; use muxing::{self, StreamMuxer};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::io::{self, Error as IoError}; use std::io::{self, Error as IoError};
use std::sync::Arc; use std::sync::Arc;
@ -58,7 +58,6 @@ use upgrade::ConnectionUpgrade;
/// Can be created from an `UpgradedNode` through the `From` trait. /// Can be created from an `UpgradedNode` through the `From` trait.
/// ///
/// Implements the `Transport` trait. /// Implements the `Transport` trait.
#[derive(Clone)]
pub struct ConnectionReuse<T, C> pub struct ConnectionReuse<T, C>
where where
T: Transport, T: Transport,
@ -73,23 +72,36 @@ where
shared: Arc<Mutex<Shared<C::Output>>>, shared: Arc<Mutex<Shared<C::Output>>>,
} }
struct Shared<M> impl<T, C> Clone for ConnectionReuse<T, C>
where where
M: StreamMuxer, T: Transport + Clone,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone,
C::Output: StreamMuxer
{ {
#[inline]
fn clone(&self) -> Self {
ConnectionReuse {
inner: self.inner.clone(),
shared: self.shared.clone(),
}
}
}
struct Shared<M> {
// List of active muxers. // List of active muxers.
active_connections: FnvHashMap<Multiaddr, M>, active_connections: FnvHashMap<Multiaddr, Arc<M>>,
// List of pending inbound substreams from dialed nodes. // List of pending inbound substreams from dialed nodes.
// Only add to this list elements received through `add_to_next_rx`. // Only add to this list elements received through `add_to_next_rx`.
next_incoming: Vec<(M, M::InboundSubstream, Multiaddr)>, next_incoming: Vec<(Arc<M>, Multiaddr)>,
// New elements are not directly added to `next_incoming`. Instead they are sent to this // New elements are not directly added to `next_incoming`. Instead they are sent to this
// channel. This is done so that we can wake up tasks whenever a new element is added. // channel. This is done so that we can wake up tasks whenever a new element is added.
add_to_next_rx: mpsc::UnboundedReceiver<(M, M::InboundSubstream, Multiaddr)>, add_to_next_rx: mpsc::UnboundedReceiver<(Arc<M>, Multiaddr)>,
// Other side of `add_to_next_rx`. // Other side of `add_to_next_rx`.
add_to_next_tx: mpsc::UnboundedSender<(M, M::InboundSubstream, Multiaddr)>, add_to_next_tx: mpsc::UnboundedSender<(Arc<M>, Multiaddr)>,
} }
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C> impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
@ -120,11 +132,11 @@ where
T: Transport + 'static, // TODO: 'static :( T: Transport + 'static, // TODO: 'static :(
T::Output: AsyncRead + AsyncWrite, T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + 'static, // TODO: 'static :( C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + 'static, // TODO: 'static :(
C::Output: StreamMuxer + Clone, C::Output: StreamMuxer,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>, C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant C::NamesIter: Clone, // TODO: not elegant
{ {
type Output = <C::Output as StreamMuxer>::Substream; type Output = muxing::SubstreamRef<Arc<C::Output>>;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>; type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>; type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
@ -171,7 +183,7 @@ where
.map(|muxer| muxer.clone()) .map(|muxer| muxer.clone())
{ {
let a = addr.clone(); let a = addr.clone();
Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, future::ok(a))))) Either::A(muxing::outbound_from_ref_and_wrap(muxer).map(|o| o.map(move |s| (s, future::ok(a)))))
} else { } else {
Either::B(future::ok(None)) Either::B(future::ok(None))
}; };
@ -190,10 +202,10 @@ where
let future = dial let future = dial
.and_then(move |(muxer, addr_fut)| { .and_then(move |(muxer, addr_fut)| {
trace!("Waiting for remote's address"); trace!("Waiting for remote's address");
addr_fut.map(move |addr| (muxer, addr)) addr_fut.map(move |addr| (Arc::new(muxer), addr))
}) })
.and_then(move |(muxer, addr)| { .and_then(move |(muxer, addr)| {
muxer.clone().outbound().and_then(move |substream| { muxing::outbound_from_ref(muxer.clone()).and_then(move |substream| {
if let Some(s) = substream { if let Some(s) = substream {
// Replace the active connection because we are the most recent. // Replace the active connection because we are the most recent.
let mut lock = shared.lock(); let mut lock = shared.lock();
@ -201,9 +213,9 @@ where
// TODO: doesn't need locking ; the sender could be extracted // TODO: doesn't need locking ; the sender could be extracted
let _ = lock.add_to_next_tx.unbounded_send(( let _ = lock.add_to_next_tx.unbounded_send((
muxer.clone(), muxer.clone(),
muxer.inbound(),
addr.clone(), addr.clone(),
)); ));
let s = muxing::substream_from_ref(muxer, s);
Ok((s, future::ok(addr))) Ok((s, future::ok(addr)))
} else { } else {
error!("failed to dial to {}", addr); error!("failed to dial to {}", addr);
@ -235,13 +247,13 @@ where
T: Transport + 'static, // TODO: 'static :( T: Transport + 'static, // TODO: 'static :(
T::Output: AsyncRead + AsyncWrite, T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + 'static, // TODO: 'static :( C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + 'static, // TODO: 'static :(
C::Output: StreamMuxer + Clone, C::Output: StreamMuxer,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>, C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant C::NamesIter: Clone, // TODO: not elegant
{ {
type Incoming = ConnectionReuseIncoming<C::Output>; type Incoming = ConnectionReuseIncoming<C::Output>;
type IncomingUpgrade = type IncomingUpgrade =
future::FutureResult<(<C::Output as StreamMuxer>::Substream, Self::MultiaddrFuture), IoError>; future::FutureResult<(muxing::SubstreamRef<Arc<C::Output>>, Self::MultiaddrFuture), IoError>;
#[inline] #[inline]
fn next_incoming(self) -> Self::Incoming { fn next_incoming(self) -> Self::Incoming {
@ -259,7 +271,7 @@ where
// The main listener. `S` is from the underlying transport. // The main listener. `S` is from the underlying transport.
listener: S, listener: S,
current_upgrades: FuturesUnordered<F>, current_upgrades: FuturesUnordered<F>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>, connections: Vec<(Arc<M>, Multiaddr)>,
// Shared between the whole connection reuse mechanism. // Shared between the whole connection reuse mechanism.
shared: Arc<Mutex<Shared<M>>>, shared: Arc<Mutex<Shared<M>>>,
@ -269,9 +281,9 @@ impl<S, F, M> Stream for ConnectionReuseListener<S, F, M>
where where
S: Stream<Item = F, Error = IoError>, S: Stream<Item = F, Error = IoError>,
F: Future<Item = (M, Multiaddr), Error = IoError>, F: Future<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer + Clone + 'static, // TODO: 'static :( M: StreamMuxer + 'static, // TODO: 'static :(
{ {
type Item = FutureResult<(M::Substream, FutureResult<Multiaddr, IoError>), IoError>; type Item = FutureResult<(muxing::SubstreamRef<Arc<M>>, FutureResult<Multiaddr, IoError>), IoError>;
type Error = IoError; type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -304,9 +316,7 @@ where
loop { loop {
match self.current_upgrades.poll() { match self.current_upgrades.poll() {
Ok(Async::Ready(Some((muxer, client_addr)))) => { Ok(Async::Ready(Some((muxer, client_addr)))) => {
let next_incoming = muxer.clone().inbound(); self.connections.push((Arc::new(muxer), client_addr.clone()));
self.connections
.push((muxer.clone(), next_incoming, client_addr.clone()));
} }
Err(err) => { Err(err) => {
debug!("error while upgrading listener connection: {:?}", err); debug!("error while upgrading listener connection: {:?}", err);
@ -318,8 +328,8 @@ where
// Check whether any incoming substream is ready. // Check whether any incoming substream is ready.
for n in (0..self.connections.len()).rev() { for n in (0..self.connections.len()).rev() {
let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n); let (muxer, client_addr) = self.connections.swap_remove(n);
match next_incoming.poll() { match muxer.poll_inbound() {
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
// stream muxer gave us a `None` => connection should be considered closed // stream muxer gave us a `None` => connection should be considered closed
debug!("no more inbound substreams on {}", client_addr); debug!("no more inbound substreams on {}", client_addr);
@ -333,15 +343,15 @@ where
.active_connections .active_connections
.insert(client_addr.clone(), muxer.clone()); .insert(client_addr.clone(), muxer.clone());
// A new substream is ready. // A new substream is ready.
let mut new_next = muxer.clone().inbound();
self.connections self.connections
.push((muxer, new_next, client_addr.clone())); .push((muxer.clone(), client_addr.clone()));
let incoming = muxing::substream_from_ref(muxer, incoming);
return Ok(Async::Ready(Some( return Ok(Async::Ready(Some(
future::ok((incoming, future::ok(client_addr))), future::ok((incoming, future::ok(client_addr))),
))); )));
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
self.connections.push((muxer, next_incoming, client_addr)); self.connections.push((muxer, client_addr));
} }
Err(err) => { Err(err) => {
debug!("error while upgrading the multiplexed incoming connection: {:?}", err); debug!("error while upgrading the multiplexed incoming connection: {:?}", err);
@ -367,9 +377,9 @@ where
impl<M> Future for ConnectionReuseIncoming<M> impl<M> Future for ConnectionReuseIncoming<M>
where where
M: Clone + StreamMuxer, M: StreamMuxer,
{ {
type Item = future::FutureResult<(M::Substream, future::FutureResult<Multiaddr, IoError>), IoError>; type Item = future::FutureResult<(muxing::SubstreamRef<Arc<M>>, future::FutureResult<Multiaddr, IoError>), IoError>;
type Error = IoError; type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -393,8 +403,8 @@ where
// Check whether any incoming substream is ready. // Check whether any incoming substream is ready.
for n in (0..lock.next_incoming.len()).rev() { for n in (0..lock.next_incoming.len()).rev() {
let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n); let (muxer, addr) = lock.next_incoming.swap_remove(n);
match future.poll() { match muxer.poll_inbound() {
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
debug!("no inbound substream for {}", addr); debug!("no inbound substream for {}", addr);
lock.active_connections.remove(&addr); lock.active_connections.remove(&addr);
@ -403,12 +413,12 @@ where
// A substream is ready ; push back the muxer for the next time this function // A substream is ready ; push back the muxer for the next time this function
// is called, then return. // is called, then return.
debug!("New incoming substream"); debug!("New incoming substream");
let next = muxer.clone().inbound(); lock.next_incoming.push((muxer.clone(), addr.clone()));
lock.next_incoming.push((muxer, next, addr.clone())); let substream = muxing::substream_from_ref(muxer, value);
return Ok(Async::Ready(future::ok((value, future::ok(addr))))); return Ok(Async::Ready(future::ok((substream, future::ok(addr)))));
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
lock.next_incoming.push((muxer, future, addr)); lock.next_incoming.push((muxer, addr));
} }
Err(err) => { Err(err) => {
// In case of error, we just not push back the element, which drops it. // In case of error, we just not push back the element, which drops it.

View File

@ -101,51 +101,113 @@ where
B: StreamMuxer, B: StreamMuxer,
{ {
type Substream = EitherOutput<A::Substream, B::Substream>; type Substream = EitherOutput<A::Substream, B::Substream>;
type InboundSubstream = EitherInbound<A, B>;
type OutboundSubstream = EitherOutbound<A, B>; type OutboundSubstream = EitherOutbound<A, B>;
#[inline] fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
fn inbound(self) -> Self::InboundSubstream {
match self {
EitherOutput::First(a) => EitherInbound::A(a.inbound()),
EitherOutput::Second(b) => EitherInbound::B(b.inbound()),
}
}
#[inline]
fn outbound(self) -> Self::OutboundSubstream {
match self {
EitherOutput::First(a) => EitherOutbound::A(a.outbound()),
EitherOutput::Second(b) => EitherOutbound::B(b.outbound()),
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum EitherInbound<A: StreamMuxer, B: StreamMuxer> {
A(A::InboundSubstream),
B(B::InboundSubstream),
}
impl<A, B> Future for EitherInbound<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Item = Option<EitherOutput<A::Substream, B::Substream>>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self { match *self {
EitherInbound::A(ref mut a) => { EitherOutput::First(ref inner) => inner.poll_inbound().map(|p| p.map(|o| o.map(EitherOutput::First))),
let item = try_ready!(a.poll()); EitherOutput::Second(ref inner) => inner.poll_inbound().map(|p| p.map(|o| o.map(EitherOutput::Second))),
Ok(Async::Ready(item.map(EitherOutput::First))) }
} }
EitherInbound::B(ref mut b) => {
let item = try_ready!(b.poll()); fn open_outbound(&self) -> Self::OutboundSubstream {
Ok(Async::Ready(item.map(EitherOutput::Second))) match *self {
} EitherOutput::First(ref inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(ref inner) => EitherOutbound::B(inner.open_outbound()),
}
}
fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => {
inner.poll_outbound(substream).map(|p| p.map(|o| o.map(EitherOutput::First)))
},
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => {
inner.poll_outbound(substream).map(|p| p.map(|o| o.map(EitherOutput::Second)))
},
_ => panic!("Wrong API usage")
}
}
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
match *self {
EitherOutput::First(ref inner) => {
match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage")
}
},
EitherOutput::Second(ref inner) => {
match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage")
}
},
}
}
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result<usize, IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.read_substream(substream, buf)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.read_substream(substream, buf)
},
_ => panic!("Wrong API usage")
}
}
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result<usize, IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.write_substream(substream, buf)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.write_substream(substream, buf)
},
_ => panic!("Wrong API usage")
}
}
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.flush_substream(substream)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.flush_substream(substream)
},
_ => panic!("Wrong API usage")
}
}
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => {
inner.shutdown_substream(substream)
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => {
inner.shutdown_substream(substream)
},
_ => panic!("Wrong API usage")
}
}
fn destroy_substream(&self, substream: Self::Substream) {
match *self {
EitherOutput::First(ref inner) => {
match substream {
EitherOutput::First(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage")
}
},
EitherOutput::Second(ref inner) => {
match substream {
EitherOutput::Second(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage")
}
},
} }
} }
} }
@ -156,29 +218,6 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
B(B::OutboundSubstream), B(B::OutboundSubstream),
} }
impl<A, B> Future for EitherOutbound<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Item = Option<EitherOutput<A::Substream, B::Substream>>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
EitherOutbound::A(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(item.map(EitherOutput::First)))
}
EitherOutbound::B(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(item.map(EitherOutput::Second)))
}
}
}
}
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`. /// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum EitherListenStream<A, B> { pub enum EitherListenStream<A, B> {

View File

@ -1,4 +1,4 @@
// Copyright 2017 Parity Technologies (UK) Ltd. // Copyright 2018 Parity Technologies (UK) Ltd.
// //
// Permission is hereby granted, free of charge, to any person obtaining a // Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"), // copy of this software and associated documentation files (the "Software"),
@ -18,34 +18,257 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::future::Future; use futures::{future, prelude::*};
use std::io::Error as IoError; use std::io::{Error as IoError, Read, Write};
use std::ops::Deref;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Implemented on objects that can be turned into a substream. /// Implemented on objects that can open and manage substreams.
///
/// > **Note**: The methods of this trait consume the object, but if the object implements `Clone`
/// > then you can clone it and keep the original in order to open additional substreams.
pub trait StreamMuxer { pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written. /// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite; type Substream;
/// Future that will be resolved when a new incoming substream is open.
///
/// A `None` item signals that the underlying resource has been exhausted and
/// no more substreams can be created.
type InboundSubstream: Future<Item = Option<Self::Substream>, Error = IoError>;
/// Future that will be resolved when the outgoing substream is open. /// Future that will be resolved when the outgoing substream is open.
/// type OutboundSubstream;
/// A `None` item signals that the underlying resource has been exhausted and
/// no more substreams can be created.
type OutboundSubstream: Future<Item = Option<Self::Substream>, Error = IoError>;
/// Produces a future that will be resolved when a new incoming substream arrives. /// Polls for an inbound substream.
fn inbound(self) -> Self::InboundSubstream; ///
/// This function behaves the same as a `Stream`.
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError>;
/// Opens a new outgoing substream, and produces a future that will be resolved when it becomes /// Opens a new outgoing substream, and produces a future that will be resolved when it becomes
/// available. /// available.
fn outbound(self) -> Self::OutboundSubstream; fn open_outbound(&self) -> Self::OutboundSubstream;
/// Polls the outbound substream.
///
/// May panic or produce an undefined result if an earlier polling returned `Ready` or `Err`.
fn poll_outbound(
&self,
substream: &mut Self::OutboundSubstream,
) -> Poll<Option<Self::Substream>, IoError>;
/// Destroys an outbound substream. Use this after the outbound substream has finished, or if
/// you want to interrupt it.
fn destroy_outbound(&self, substream: Self::OutboundSubstream);
/// Reads data from a substream. The behaviour is the same as `std::io::Read::read`.
fn read_substream(
&self,
substream: &mut Self::Substream,
buf: &mut [u8],
) -> Result<usize, IoError>;
/// Write data to a substream. The behaviour is the same as `std::io::Write::write`.
fn write_substream(
&self,
substream: &mut Self::Substream,
buf: &[u8],
) -> Result<usize, IoError>;
/// Flushes a substream. The behaviour is the same as `std::io::Write::flush`.
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError>;
/// Attempts to shut down a substream. The behaviour is the same as
/// `tokio_io::AsyncWrite::shutdown`.
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError>;
/// Destroys a substream.
fn destroy_substream(&self, substream: Self::Substream);
}
/// Polls for an inbound from the muxer but wraps the output in an object that
/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`.
#[inline]
pub fn inbound_from_ref_and_wrap<P>(
muxer: P,
) -> impl Future<Item = Option<SubstreamRef<P>>, Error = IoError>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let muxer2 = muxer.clone();
future::poll_fn(move || muxer.poll_inbound())
.map(|substream| substream.map(move |s| substream_from_ref(muxer2, s)))
}
/// Same as `outbound_from_ref`, but wraps the output in an object that
/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`.
#[inline]
pub fn outbound_from_ref_and_wrap<P>(muxer: P) -> OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let inner = outbound_from_ref(muxer);
OutboundSubstreamRefWrapFuture { inner }
}
/// Future returned by `outbound_from_ref_and_wrap`.
pub struct OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
inner: OutboundSubstreamRefFuture<P>,
}
impl<P> Future for OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
type Item = Option<SubstreamRef<P>>;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(Some(substream))) => {
let out = substream_from_ref(self.inner.muxer.clone(), substream);
Ok(Async::Ready(Some(out)))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
}
}
/// Builds a new future for an outbound substream, where the muxer is a reference.
#[inline]
pub fn outbound_from_ref<P>(muxer: P) -> OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
let outbound = muxer.open_outbound();
OutboundSubstreamRefFuture {
muxer,
outbound: Some(outbound),
}
}
/// Future returned by `outbound_from_ref`.
pub struct OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
outbound: Option<<P::Target as StreamMuxer>::OutboundSubstream>,
}
impl<P> Future for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
type Item = Option<<P::Target as StreamMuxer>::Substream>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.muxer
.poll_outbound(self.outbound.as_mut().expect("outbound was empty"))
}
}
impl<P> Drop for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn drop(&mut self) {
self.muxer
.destroy_outbound(self.outbound.take().expect("outbound was empty"))
}
}
/// Builds an implementation of `Read`/`Write`/`AsyncRead`/`AsyncWrite` from an `Arc` to the
/// muxer and a substream.
#[inline]
pub fn substream_from_ref<P>(
muxer: P,
substream: <P::Target as StreamMuxer>::Substream,
) -> SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
SubstreamRef {
muxer,
substream: Some(substream),
}
}
/// Stream returned by `substream_from_ref`.
pub struct SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
substream: Option<<P::Target as StreamMuxer>::Substream>,
}
impl<P> Read for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
self.muxer
.read_substream(self.substream.as_mut().expect("substream was empty"), buf)
}
}
impl<P> AsyncRead for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
}
impl<P> Write for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
self.muxer
.write_substream(self.substream.as_mut().expect("substream was empty"), buf)
}
#[inline]
fn flush(&mut self) -> Result<(), IoError> {
self.muxer
.flush_substream(self.substream.as_mut().expect("substream was empty"))
}
}
impl<P> AsyncWrite for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> {
self.muxer
.shutdown_substream(self.substream.as_mut().expect("substream was empty"))
}
}
impl<P> Drop for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn drop(&mut self) {
self.muxer
.destroy_substream(self.substream.take().expect("substream was empty"))
}
} }

View File

@ -29,8 +29,8 @@ extern crate tokio_io;
use bytes::BytesMut; use bytes::BytesMut;
use futures::future::Future; use futures::future::Future;
use futures::{Sink, Stream}; use futures::{Sink, Stream};
use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport, transport}; use libp2p_core::{muxing, Multiaddr, MuxedTransport, Transport, transport};
use std::sync::atomic; use std::sync::{atomic, Arc};
use std::thread; use std::thread;
use tokio_io::codec::length_delimited::Framed; use tokio_io::codec::length_delimited::Framed;
@ -105,7 +105,7 @@ fn client_to_server_outbound() {
.with_upgrade(multiplex::MplexConfig::new()) .with_upgrade(multiplex::MplexConfig::new())
.dial("/memory".parse().unwrap()) .dial("/memory".parse().unwrap())
.unwrap_or_else(|_| panic!()) .unwrap_or_else(|_| panic!())
.and_then(|client| client.0.outbound()) .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client.0)))
.map(|server| Framed::<_, BytesMut>::new(server.unwrap())) .map(|server| Framed::<_, BytesMut>::new(server.unwrap()))
.and_then(|server| server.send("hello world".into())) .and_then(|server| server.send("hello world".into()))
.map(|_| ()); .map(|_| ());
@ -199,10 +199,10 @@ fn use_opened_listen_to_dial() {
.into_future() .into_future()
.map_err(|(err, _)| err) .map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap()) .and_then(|(client, _)| client.unwrap())
.map(|client| client.0) .map(|client| Arc::new(client.0))
.and_then(|c| { .and_then(|c| {
let c2 = c.clone(); let c2 = c.clone();
c.clone().inbound().map(move |i| (c2, i)) muxing::inbound_from_ref_and_wrap(c.clone()).map(move |i| (c2, i))
}) })
.map(|(muxer, client)| (muxer, Framed::<_, BytesMut>::new(client.unwrap()))) .map(|(muxer, client)| (muxer, Framed::<_, BytesMut>::new(client.unwrap())))
.and_then(|(muxer, client)| { .and_then(|(muxer, client)| {
@ -214,7 +214,7 @@ fn use_opened_listen_to_dial() {
.and_then(|(muxer, (msg, _))| { .and_then(|(muxer, (msg, _))| {
let msg = msg.unwrap(); let msg = msg.unwrap();
assert_eq!(msg, "hello world"); assert_eq!(msg, "hello world");
muxer.outbound() muxing::outbound_from_ref_and_wrap(muxer)
}) })
.map(|client| Framed::<_, BytesMut>::new(client.unwrap())) .map(|client| Framed::<_, BytesMut>::new(client.unwrap()))
.and_then(|client| client.into_future().map_err(|(err, _)| err)) .and_then(|client| client.into_future().map_err(|(err, _)| err))

View File

@ -33,8 +33,8 @@ extern crate unsigned_varint;
mod codec; mod codec;
use std::{cmp, iter}; use std::{cmp, iter};
use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; use std::sync::{atomic::AtomicUsize, atomic::Ordering};
use bytes::Bytes; use bytes::Bytes;
use core::{ConnectionUpgrade, Endpoint, StreamMuxer}; use core::{ConnectionUpgrade, Endpoint, StreamMuxer};
use parking_lot::Mutex; use parking_lot::Mutex;
@ -129,7 +129,7 @@ where
let max_buffer_len = self.max_buffer_len; let max_buffer_len = self.max_buffer_len;
let out = Multiplex { let out = Multiplex {
inner: Arc::new(Mutex::new(MultiplexInner { inner: Mutex::new(MultiplexInner {
error: Ok(()), error: Ok(()),
inner: Framed::new(i, codec::Codec::new()).fuse(), inner: Framed::new(i, codec::Codec::new()).fuse(),
config: self, config: self,
@ -137,7 +137,7 @@ where
opened_substreams: Default::default(), opened_substreams: Default::default(),
next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 }, next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 },
to_notify: Default::default(), to_notify: Default::default(),
})) })
}; };
future::ok((out, remote_addr)) future::ok((out, remote_addr))
@ -151,16 +151,7 @@ where
/// Multiplexer. Implements the `StreamMuxer` trait. /// Multiplexer. Implements the `StreamMuxer` trait.
pub struct Multiplex<C> { pub struct Multiplex<C> {
inner: Arc<Mutex<MultiplexInner<C>>>, inner: Mutex<MultiplexInner<C>>,
}
impl<C> Clone for Multiplex<C> {
#[inline]
fn clone(&self) -> Self {
Multiplex {
inner: self.inner.clone(),
}
}
} }
// Struct shared throughout the implementation. // Struct shared throughout the implementation.
@ -308,81 +299,12 @@ where C: AsyncRead + AsyncWrite
} }
impl<C> StreamMuxer for Multiplex<C> impl<C> StreamMuxer for Multiplex<C>
where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/
{
type Substream = Substream<C>;
type InboundSubstream = InboundSubstream<C>;
type OutboundSubstream = Box<Future<Item = Option<Self::Substream>, Error = IoError> + 'static>;
#[inline]
fn inbound(self) -> Self::InboundSubstream {
InboundSubstream { inner: self.inner }
}
#[inline]
fn outbound(self) -> Self::OutboundSubstream {
let mut inner = self.inner.lock();
// Assign a substream ID now.
let substream_id = {
let n = inner.next_outbound_stream_id;
inner.next_outbound_stream_id += 2;
n
};
// We use an RAII guard, so that we close the substream in case of an error.
struct OpenedSubstreamGuard<C>(Option<Arc<Mutex<MultiplexInner<C>>>>, u32);
impl<C> Drop for OpenedSubstreamGuard<C> {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
debug!("Failed to open outbound substream {}", self.1);
inner.lock().buffer.retain(|elem| elem.substream_id() != self.1);
}
}
}
inner.opened_substreams.insert(substream_id);
let mut guard = OpenedSubstreamGuard(Some(self.inner.clone()), substream_id);
// We send `Open { substream_id }`, then flush, then only produce the substream.
let future = {
future::poll_fn({
let inner = self.inner.clone();
move || {
let elem = codec::Elem::Open { substream_id };
poll_send(&mut inner.lock(), elem)
}
}).and_then({
let inner = self.inner.clone();
move |()| {
future::poll_fn(move || inner.lock().inner.poll_complete())
}
}).map(move |()| {
debug!("Successfully opened outbound substream {}", substream_id);
Some(Substream {
inner: guard.0.take().unwrap(),
num: substream_id,
current_data: Bytes::new(),
endpoint: Endpoint::Dialer,
})
})
};
Box::new(future) as Box<_>
}
}
/// Future to the next incoming substream.
pub struct InboundSubstream<C> {
inner: Arc<Mutex<MultiplexInner<C>>>,
}
impl<C> Future for InboundSubstream<C>
where C: AsyncRead + AsyncWrite where C: AsyncRead + AsyncWrite
{ {
type Item = Option<Substream<C>>; type Substream = Substream;
type Error = IoError; type OutboundSubstream = OutboundSubstream;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if inner.opened_substreams.len() >= inner.config.max_substreams { if inner.opened_substreams.len() >= inner.config.max_substreams {
@ -401,7 +323,6 @@ where C: AsyncRead + AsyncWrite
if let Some(num) = num { if let Some(num) = num {
debug!("Successfully opened inbound substream {}", num); debug!("Successfully opened inbound substream {}", num);
Ok(Async::Ready(Some(Substream { Ok(Async::Ready(Some(Substream {
inner: self.inner.clone(),
current_data: Bytes::new(), current_data: Bytes::new(),
num, num,
endpoint: Endpoint::Listener, endpoint: Endpoint::Listener,
@ -410,28 +331,81 @@ where C: AsyncRead + AsyncWrite
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} }
} }
}
/// Active substream to the remote. Implements `AsyncRead` and `AsyncWrite`. fn open_outbound(&self) -> Self::OutboundSubstream {
pub struct Substream<C> let mut inner = self.inner.lock();
where C: AsyncRead + AsyncWrite
{
inner: Arc<Mutex<MultiplexInner<C>>>,
num: u32,
// Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`.
current_data: Bytes,
endpoint: Endpoint,
}
impl<C> Read for Substream<C> // Assign a substream ID now.
where C: AsyncRead + AsyncWrite let substream_id = {
{ let n = inner.next_outbound_stream_id;
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> { inner.next_outbound_stream_id += 2;
n
};
inner.opened_substreams.insert(substream_id);
OutboundSubstream {
num: substream_id,
state: OutboundSubstreamState::SendElem(codec::Elem::Open { substream_id }),
}
}
fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> {
loop {
let polling = match substream.state {
OutboundSubstreamState::SendElem(ref elem) => {
let mut inner = self.inner.lock();
poll_send(&mut inner, elem.clone())
},
OutboundSubstreamState::Flush => {
let mut inner = self.inner.lock();
inner.inner.poll_complete()
},
OutboundSubstreamState::Done => {
panic!("Polling outbound substream after it's been succesfully open");
},
};
match polling {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Failed to open outbound substream {}", substream.num);
self.inner.lock().buffer.retain(|elem| elem.substream_id() != substream.num);
return Err(err)
},
};
// Going to next step.
match substream.state {
OutboundSubstreamState::SendElem(_) => {
substream.state = OutboundSubstreamState::Flush;
},
OutboundSubstreamState::Flush => {
debug!("Successfully opened outbound substream {}", substream.num);
substream.state = OutboundSubstreamState::Done;
return Ok(Async::Ready(Some(Substream {
num: substream.num,
current_data: Bytes::new(),
endpoint: Endpoint::Dialer,
})));
},
OutboundSubstreamState::Done => unreachable!(),
}
}
}
#[inline]
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
// Nothing to do.
}
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result<usize, IoError> {
loop { loop {
// First, transfer from `current_data`. // First, transfer from `current_data`.
if self.current_data.len() != 0 { if substream.current_data.len() != 0 {
let len = cmp::min(self.current_data.len(), buf.len()); let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&self.current_data.split_to(len)); buf[..len].copy_from_slice(&substream.current_data.split_to(len));
return Ok(len); return Ok(len);
} }
@ -439,22 +413,22 @@ where C: AsyncRead + AsyncWrite
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let next_data_poll = next_match(&mut inner, |elem| { let next_data_poll = next_match(&mut inner, |elem| {
match elem { match elem {
&codec::Elem::Data { ref substream_id, ref data, .. } if *substream_id == self.num => { // TODO: check endpoint? &codec::Elem::Data { ref substream_id, ref data, .. } if *substream_id == substream.num => { // TODO: check endpoint?
Some(data.clone()) Some(data.clone())
}, },
_ => None, _ => None,
} }
}); });
// We're in a loop, so all we need to do is set `self.current_data` to the data we // We're in a loop, so all we need to do is set `substream.current_data` to the data we
// just read and wait for the next iteration. // just read and wait for the next iteration.
match next_data_poll { match next_data_poll {
Ok(Async::Ready(Some(data))) => self.current_data = data.freeze(), Ok(Async::Ready(Some(data))) => substream.current_data = data.freeze(),
Ok(Async::Ready(None)) => return Ok(0), Ok(Async::Ready(None)) => return Ok(0),
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// There was no data packet in the buffer about this substream ; maybe it's // There was no data packet in the buffer about this substream ; maybe it's
// because it has been closed. // because it has been closed.
if inner.opened_substreams.contains(&self.num) { if inner.opened_substreams.contains(&substream.num) {
return Err(IoErrorKind::WouldBlock.into()); return Err(IoErrorKind::WouldBlock.into());
} else { } else {
return Ok(0); return Ok(0);
@ -464,21 +438,12 @@ where C: AsyncRead + AsyncWrite
} }
} }
} }
}
impl<C> AsyncRead for Substream<C> fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result<usize, IoError> {
where C: AsyncRead + AsyncWrite
{
}
impl<C> Write for Substream<C>
where C: AsyncRead + AsyncWrite
{
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
let elem = codec::Elem::Data { let elem = codec::Elem::Data {
substream_id: self.num, substream_id: substream.num,
data: From::from(buf), data: From::from(buf),
endpoint: self.endpoint, endpoint: substream.endpoint,
}; };
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -489,7 +454,7 @@ where C: AsyncRead + AsyncWrite
} }
} }
fn flush(&mut self) -> Result<(), IoError> { fn flush_substream(&self, _substream: &mut Self::Substream) -> Result<(), IoError> {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
match inner.inner.poll_complete() { match inner.inner.poll_complete() {
Ok(Async::Ready(())) => Ok(()), Ok(Async::Ready(())) => Ok(()),
@ -497,27 +462,44 @@ where C: AsyncRead + AsyncWrite
Err(err) => Err(err), Err(err) => Err(err),
} }
} }
}
impl<C> AsyncWrite for Substream<C> fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
where C: AsyncRead + AsyncWrite
{
fn shutdown(&mut self) -> Poll<(), IoError> {
let elem = codec::Elem::Reset { let elem = codec::Elem::Reset {
substream_id: self.num, substream_id: substream.num,
endpoint: self.endpoint, endpoint: substream.endpoint,
}; };
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
poll_send(&mut inner, elem) poll_send(&mut inner, elem)
} }
}
impl<C> Drop for Substream<C> fn destroy_substream(&self, mut substream: Self::Substream) {
where C: AsyncRead + AsyncWrite let _ = self.shutdown_substream(&mut substream); // TODO: this doesn't necessarily send the close message
{ self.inner.lock().buffer.retain(|elem| elem.substream_id() != substream.num);
fn drop(&mut self) {
let _ = self.shutdown(); // TODO: this doesn't necessarily send the close message
self.inner.lock().buffer.retain(|elem| elem.substream_id() != self.num);
} }
} }
/// Active attempt to open an outbound substream.
pub struct OutboundSubstream {
/// Substream number.
num: u32,
state: OutboundSubstreamState,
}
enum OutboundSubstreamState {
/// We need to send `Elem` on the underlying stream.
SendElem(codec::Elem),
/// We need to flush the underlying stream.
Flush,
/// The substream is open and the `OutboundSubstream` is now useless.
Done,
}
/// Active substream to the remote.
pub struct Substream {
/// Substream number.
num: u32,
// Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`.
current_data: Bytes,
endpoint: Endpoint,
}

View File

@ -28,9 +28,9 @@ extern crate tokio_io;
use futures::future::Future; use futures::future::Future;
use futures::{Sink, Stream}; use futures::{Sink, Stream};
use std::sync::mpsc; use std::sync::{Arc, mpsc};
use std::thread; use std::thread;
use swarm::{StreamMuxer, Transport}; use swarm::{muxing, Transport};
use tcp::TcpConfig; use tcp::TcpConfig;
use tokio_io::codec::length_delimited::Framed; use tokio_io::codec::length_delimited::Framed;
@ -52,8 +52,8 @@ fn client_to_server_outbound() {
let future = listener let future = listener
.into_future() .into_future()
.map_err(|(err, _)| err) .map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().map(|v| v.0)) .and_then(|(client, _)| client.unwrap().map(|v| Arc::new(v.0)))
.and_then(|client| client.outbound()) .and_then(|client| muxing::outbound_from_ref_and_wrap(client))
.map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap())) .map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap()))
.and_then(|client| { .and_then(|client| {
client client
@ -75,7 +75,7 @@ fn client_to_server_outbound() {
let future = transport let future = transport
.dial(rx.recv().unwrap()) .dial(rx.recv().unwrap())
.unwrap() .unwrap()
.and_then(|client| client.0.inbound()) .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client.0)))
.map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap())) .map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap()))
.and_then(|server| server.send("hello world".into())) .and_then(|server| server.send("hello world".into()))
.map(|_| ()); .map(|_| ());
@ -103,7 +103,7 @@ fn client_to_server_inbound() {
.into_future() .into_future()
.map_err(|(err, _)| err) .map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().map(|v| v.0)) .and_then(|(client, _)| client.unwrap().map(|v| v.0))
.and_then(|client| client.inbound()) .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client)))
.map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap())) .map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap()))
.and_then(|client| { .and_then(|client| {
client client
@ -125,7 +125,7 @@ fn client_to_server_inbound() {
let future = transport let future = transport
.dial(rx.recv().unwrap()) .dial(rx.recv().unwrap())
.unwrap() .unwrap()
.and_then(|(client, _)| client.outbound()) .and_then(|(client, _)| muxing::outbound_from_ref_and_wrap(Arc::new(client)))
.map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap())) .map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap()))
.and_then(|server| server.send("hello world".into())) .and_then(|server| server.send("hello world".into()))
.map(|_| ()); .map(|_| ());

View File

@ -9,5 +9,6 @@ bytes = "0.4"
futures = "0.1" futures = "0.1"
libp2p-core = { path = "../../core" } libp2p-core = { path = "../../core" }
log = "0.4" log = "0.4"
parking_lot = "0.6"
tokio-io = "0.1" tokio-io = "0.1"
yamux = { git = "https://github.com/paritytech/yamux" } yamux = { git = "https://github.com/paritytech/yamux" }

View File

@ -23,30 +23,27 @@ extern crate futures;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate libp2p_core as core; extern crate libp2p_core as core;
extern crate parking_lot;
extern crate tokio_io; extern crate tokio_io;
extern crate yamux; extern crate yamux;
use bytes::Bytes; use bytes::Bytes;
use core::Endpoint; use core::Endpoint;
use futures::{future::{self, FutureResult}, prelude::*}; use futures::{future::{self, FutureResult}, prelude::*};
use parking_lot::Mutex;
use std::{io, iter}; use std::{io, iter};
use std::io::{Read, Write, Error as IoError};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
pub struct Yamux<C>(yamux::Connection<C>); pub struct Yamux<C>(Mutex<yamux::Connection<C>>);
impl<C> Clone for Yamux<C> {
fn clone(&self) -> Self {
Yamux(self.0.clone())
}
}
impl<C> Yamux<C> impl<C> Yamux<C>
where where
C: AsyncRead + AsyncWrite + 'static C: AsyncRead + AsyncWrite + 'static
{ {
pub fn new(c: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { pub fn new(c: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
Yamux(yamux::Connection::new(c, cfg, mode)) Yamux(Mutex::new(yamux::Connection::new(c, cfg, mode)))
} }
} }
@ -55,31 +52,11 @@ where
C: AsyncRead + AsyncWrite + 'static C: AsyncRead + AsyncWrite + 'static
{ {
type Substream = yamux::StreamHandle<C>; type Substream = yamux::StreamHandle<C>;
type InboundSubstream = InboundFuture<C>;
type OutboundSubstream = FutureResult<Option<Self::Substream>, io::Error>; type OutboundSubstream = FutureResult<Option<Self::Substream>, io::Error>;
fn inbound(self) -> Self::InboundSubstream { #[inline]
InboundFuture(self.0) fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
} match self.0.lock().poll() {
fn outbound(self) -> Self::OutboundSubstream {
let stream = self.0.open_stream().map_err(|e| io::Error::new(io::ErrorKind::Other, e));
future::result(stream)
}
}
pub struct InboundFuture<C>(yamux::Connection<C>);
impl<C> Future for InboundFuture<C>
where
C: AsyncRead + AsyncWrite + 'static
{
type Item = Option<yamux::StreamHandle<C>>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.poll() {
Err(e) => { Err(e) => {
error!("connection error: {}", e); error!("connection error: {}", e);
Err(io::Error::new(io::ErrorKind::Other, e)) Err(io::Error::new(io::ErrorKind::Other, e))
@ -89,9 +66,49 @@ where
Ok(Async::Ready(Some(stream))) => Ok(Async::Ready(Some(stream))) Ok(Async::Ready(Some(stream))) => Ok(Async::Ready(Some(stream)))
} }
} }
#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
let stream = self.0.lock().open_stream().map_err(|e| io::Error::new(io::ErrorKind::Other, e));
future::result(stream)
}
#[inline]
fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> {
substream.poll()
}
#[inline]
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
}
#[inline]
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result<usize, IoError> {
substream.read(buf)
}
#[inline]
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result<usize, IoError> {
substream.write(buf)
}
#[inline]
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
substream.flush()
}
#[inline]
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
substream.shutdown()
}
#[inline]
fn destroy_substream(&self, _substream: Self::Substream) {
}
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Config(yamux::Config); pub struct Config(yamux::Config);