diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index 67bf9469..ab07bb97 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -45,7 +45,7 @@ use futures::{Async, Future, Poll, Stream}; use futures::stream::FuturesUnordered; use futures::sync::mpsc; use multiaddr::Multiaddr; -use muxing::StreamMuxer; +use muxing::{self, StreamMuxer}; use parking_lot::Mutex; use std::io::{self, Error as IoError}; use std::sync::Arc; @@ -58,7 +58,6 @@ use upgrade::ConnectionUpgrade; /// Can be created from an `UpgradedNode` through the `From` trait. /// /// Implements the `Transport` trait. -#[derive(Clone)] pub struct ConnectionReuse where T: Transport, @@ -73,23 +72,36 @@ where shared: Arc>>, } -struct Shared +impl Clone for ConnectionReuse where - M: StreamMuxer, + T: Transport + Clone, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer { + #[inline] + fn clone(&self) -> Self { + ConnectionReuse { + inner: self.inner.clone(), + shared: self.shared.clone(), + } + } +} + +struct Shared { // List of active muxers. - active_connections: FnvHashMap, + active_connections: FnvHashMap>, // List of pending inbound substreams from dialed nodes. // Only add to this list elements received through `add_to_next_rx`. - next_incoming: Vec<(M, M::InboundSubstream, Multiaddr)>, + next_incoming: Vec<(Arc, Multiaddr)>, // New elements are not directly added to `next_incoming`. Instead they are sent to this // channel. This is done so that we can wake up tasks whenever a new element is added. - add_to_next_rx: mpsc::UnboundedReceiver<(M, M::InboundSubstream, Multiaddr)>, + add_to_next_rx: mpsc::UnboundedReceiver<(Arc, Multiaddr)>, // Other side of `add_to_next_rx`. - add_to_next_tx: mpsc::UnboundedSender<(M, M::InboundSubstream, Multiaddr)>, + add_to_next_tx: mpsc::UnboundedSender<(Arc, Multiaddr)>, } impl From> for ConnectionReuse @@ -120,11 +132,11 @@ where T: Transport + 'static, // TODO: 'static :( T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( - C::Output: StreamMuxer + Clone, + C::Output: StreamMuxer, C::MultiaddrFuture: Future, C::NamesIter: Clone, // TODO: not elegant { - type Output = ::Substream; + type Output = muxing::SubstreamRef>; type MultiaddrFuture = future::FutureResult; type Listener = Box>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; @@ -171,7 +183,7 @@ where .map(|muxer| muxer.clone()) { let a = addr.clone(); - Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, future::ok(a))))) + Either::A(muxing::outbound_from_ref_and_wrap(muxer).map(|o| o.map(move |s| (s, future::ok(a))))) } else { Either::B(future::ok(None)) }; @@ -190,10 +202,10 @@ where let future = dial .and_then(move |(muxer, addr_fut)| { trace!("Waiting for remote's address"); - addr_fut.map(move |addr| (muxer, addr)) + addr_fut.map(move |addr| (Arc::new(muxer), addr)) }) .and_then(move |(muxer, addr)| { - muxer.clone().outbound().and_then(move |substream| { + muxing::outbound_from_ref(muxer.clone()).and_then(move |substream| { if let Some(s) = substream { // Replace the active connection because we are the most recent. let mut lock = shared.lock(); @@ -201,9 +213,9 @@ where // TODO: doesn't need locking ; the sender could be extracted let _ = lock.add_to_next_tx.unbounded_send(( muxer.clone(), - muxer.inbound(), addr.clone(), )); + let s = muxing::substream_from_ref(muxer, s); Ok((s, future::ok(addr))) } else { error!("failed to dial to {}", addr); @@ -235,13 +247,13 @@ where T: Transport + 'static, // TODO: 'static :( T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( - C::Output: StreamMuxer + Clone, + C::Output: StreamMuxer, C::MultiaddrFuture: Future, C::NamesIter: Clone, // TODO: not elegant { type Incoming = ConnectionReuseIncoming; type IncomingUpgrade = - future::FutureResult<(::Substream, Self::MultiaddrFuture), IoError>; + future::FutureResult<(muxing::SubstreamRef>, Self::MultiaddrFuture), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -259,7 +271,7 @@ where // The main listener. `S` is from the underlying transport. listener: S, current_upgrades: FuturesUnordered, - connections: Vec<(M, ::InboundSubstream, Multiaddr)>, + connections: Vec<(Arc, Multiaddr)>, // Shared between the whole connection reuse mechanism. shared: Arc>>, @@ -269,9 +281,9 @@ impl Stream for ConnectionReuseListener where S: Stream, F: Future, - M: StreamMuxer + Clone + 'static, // TODO: 'static :( + M: StreamMuxer + 'static, // TODO: 'static :( { - type Item = FutureResult<(M::Substream, FutureResult), IoError>; + type Item = FutureResult<(muxing::SubstreamRef>, FutureResult), IoError>; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { @@ -304,9 +316,7 @@ where loop { match self.current_upgrades.poll() { Ok(Async::Ready(Some((muxer, client_addr)))) => { - let next_incoming = muxer.clone().inbound(); - self.connections - .push((muxer.clone(), next_incoming, client_addr.clone())); + self.connections.push((Arc::new(muxer), client_addr.clone())); } Err(err) => { debug!("error while upgrading listener connection: {:?}", err); @@ -318,8 +328,8 @@ where // Check whether any incoming substream is ready. for n in (0..self.connections.len()).rev() { - let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n); - match next_incoming.poll() { + let (muxer, client_addr) = self.connections.swap_remove(n); + match muxer.poll_inbound() { Ok(Async::Ready(None)) => { // stream muxer gave us a `None` => connection should be considered closed debug!("no more inbound substreams on {}", client_addr); @@ -333,15 +343,15 @@ where .active_connections .insert(client_addr.clone(), muxer.clone()); // A new substream is ready. - let mut new_next = muxer.clone().inbound(); self.connections - .push((muxer, new_next, client_addr.clone())); + .push((muxer.clone(), client_addr.clone())); + let incoming = muxing::substream_from_ref(muxer, incoming); return Ok(Async::Ready(Some( future::ok((incoming, future::ok(client_addr))), ))); } Ok(Async::NotReady) => { - self.connections.push((muxer, next_incoming, client_addr)); + self.connections.push((muxer, client_addr)); } Err(err) => { debug!("error while upgrading the multiplexed incoming connection: {:?}", err); @@ -367,9 +377,9 @@ where impl Future for ConnectionReuseIncoming where - M: Clone + StreamMuxer, + M: StreamMuxer, { - type Item = future::FutureResult<(M::Substream, future::FutureResult), IoError>; + type Item = future::FutureResult<(muxing::SubstreamRef>, future::FutureResult), IoError>; type Error = IoError; fn poll(&mut self) -> Poll { @@ -393,8 +403,8 @@ where // Check whether any incoming substream is ready. for n in (0..lock.next_incoming.len()).rev() { - let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n); - match future.poll() { + let (muxer, addr) = lock.next_incoming.swap_remove(n); + match muxer.poll_inbound() { Ok(Async::Ready(None)) => { debug!("no inbound substream for {}", addr); lock.active_connections.remove(&addr); @@ -403,12 +413,12 @@ where // A substream is ready ; push back the muxer for the next time this function // is called, then return. debug!("New incoming substream"); - let next = muxer.clone().inbound(); - lock.next_incoming.push((muxer, next, addr.clone())); - return Ok(Async::Ready(future::ok((value, future::ok(addr))))); + lock.next_incoming.push((muxer.clone(), addr.clone())); + let substream = muxing::substream_from_ref(muxer, value); + return Ok(Async::Ready(future::ok((substream, future::ok(addr))))); } Ok(Async::NotReady) => { - lock.next_incoming.push((muxer, future, addr)); + lock.next_incoming.push((muxer, addr)); } Err(err) => { // In case of error, we just not push back the element, which drops it. diff --git a/core/src/either.rs b/core/src/either.rs index 1773a1f8..9c85d427 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -101,51 +101,113 @@ where B: StreamMuxer, { type Substream = EitherOutput; - type InboundSubstream = EitherInbound; type OutboundSubstream = EitherOutbound; - #[inline] - fn inbound(self) -> Self::InboundSubstream { - match self { - EitherOutput::First(a) => EitherInbound::A(a.inbound()), - EitherOutput::Second(b) => EitherInbound::B(b.inbound()), - } - } - - #[inline] - fn outbound(self) -> Self::OutboundSubstream { - match self { - EitherOutput::First(a) => EitherOutbound::A(a.outbound()), - EitherOutput::Second(b) => EitherOutbound::B(b.outbound()), - } - } -} - -#[derive(Debug, Copy, Clone)] -pub enum EitherInbound { - A(A::InboundSubstream), - B(B::InboundSubstream), -} - -impl Future for EitherInbound -where - A: StreamMuxer, - B: StreamMuxer, -{ - type Item = Option>; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { + fn poll_inbound(&self) -> Poll, IoError> { match *self { - EitherInbound::A(ref mut a) => { - let item = try_ready!(a.poll()); - Ok(Async::Ready(item.map(EitherOutput::First))) - } - EitherInbound::B(ref mut b) => { - let item = try_ready!(b.poll()); - Ok(Async::Ready(item.map(EitherOutput::Second))) - } + EitherOutput::First(ref inner) => inner.poll_inbound().map(|p| p.map(|o| o.map(EitherOutput::First))), + EitherOutput::Second(ref inner) => inner.poll_inbound().map(|p| p.map(|o| o.map(EitherOutput::Second))), + } + } + + fn open_outbound(&self) -> Self::OutboundSubstream { + match *self { + EitherOutput::First(ref inner) => EitherOutbound::A(inner.open_outbound()), + EitherOutput::Second(ref inner) => EitherOutbound::B(inner.open_outbound()), + } + } + + fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll, IoError> { + match (self, substream) { + (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => { + inner.poll_outbound(substream).map(|p| p.map(|o| o.map(EitherOutput::First))) + }, + (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => { + inner.poll_outbound(substream).map(|p| p.map(|o| o.map(EitherOutput::Second))) + }, + _ => panic!("Wrong API usage") + } + } + + fn destroy_outbound(&self, substream: Self::OutboundSubstream) { + match *self { + EitherOutput::First(ref inner) => { + match substream { + EitherOutbound::A(substream) => inner.destroy_outbound(substream), + _ => panic!("Wrong API usage") + } + }, + EitherOutput::Second(ref inner) => { + match substream { + EitherOutbound::B(substream) => inner.destroy_outbound(substream), + _ => panic!("Wrong API usage") + } + }, + } + } + + fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result { + match (self, substream) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { + inner.read_substream(substream, buf) + }, + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { + inner.read_substream(substream, buf) + }, + _ => panic!("Wrong API usage") + } + } + + fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result { + match (self, substream) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { + inner.write_substream(substream, buf) + }, + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { + inner.write_substream(substream, buf) + }, + _ => panic!("Wrong API usage") + } + } + + fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> { + match (self, substream) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { + inner.flush_substream(substream) + }, + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { + inner.flush_substream(substream) + }, + _ => panic!("Wrong API usage") + } + } + + fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { + match (self, substream) { + (EitherOutput::First(ref inner), EitherOutput::First(ref mut substream)) => { + inner.shutdown_substream(substream) + }, + (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut substream)) => { + inner.shutdown_substream(substream) + }, + _ => panic!("Wrong API usage") + } + } + + fn destroy_substream(&self, substream: Self::Substream) { + match *self { + EitherOutput::First(ref inner) => { + match substream { + EitherOutput::First(substream) => inner.destroy_substream(substream), + _ => panic!("Wrong API usage") + } + }, + EitherOutput::Second(ref inner) => { + match substream { + EitherOutput::Second(substream) => inner.destroy_substream(substream), + _ => panic!("Wrong API usage") + } + }, } } } @@ -156,29 +218,6 @@ pub enum EitherOutbound { B(B::OutboundSubstream), } -impl Future for EitherOutbound -where - A: StreamMuxer, - B: StreamMuxer, -{ - type Item = Option>; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - match *self { - EitherOutbound::A(ref mut a) => { - let item = try_ready!(a.poll()); - Ok(Async::Ready(item.map(EitherOutput::First))) - } - EitherOutbound::B(ref mut b) => { - let item = try_ready!(b.poll()); - Ok(Async::Ready(item.map(EitherOutput::Second))) - } - } - } -} - /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. #[derive(Debug, Copy, Clone)] pub enum EitherListenStream { diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 7b87ac93..b673b8a8 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,34 +18,257 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::future::Future; -use std::io::Error as IoError; +use futures::{future, prelude::*}; +use std::io::{Error as IoError, Read, Write}; +use std::ops::Deref; use tokio_io::{AsyncRead, AsyncWrite}; -/// Implemented on objects that can be turned into a substream. -/// -/// > **Note**: The methods of this trait consume the object, but if the object implements `Clone` -/// > then you can clone it and keep the original in order to open additional substreams. +/// Implemented on objects that can open and manage substreams. pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. - type Substream: AsyncRead + AsyncWrite; - - /// Future that will be resolved when a new incoming substream is open. - /// - /// A `None` item signals that the underlying resource has been exhausted and - /// no more substreams can be created. - type InboundSubstream: Future, Error = IoError>; + type Substream; /// Future that will be resolved when the outgoing substream is open. - /// - /// A `None` item signals that the underlying resource has been exhausted and - /// no more substreams can be created. - type OutboundSubstream: Future, Error = IoError>; + type OutboundSubstream; - /// Produces a future that will be resolved when a new incoming substream arrives. - fn inbound(self) -> Self::InboundSubstream; + /// Polls for an inbound substream. + /// + /// This function behaves the same as a `Stream`. + fn poll_inbound(&self) -> Poll, IoError>; /// Opens a new outgoing substream, and produces a future that will be resolved when it becomes /// available. - fn outbound(self) -> Self::OutboundSubstream; + fn open_outbound(&self) -> Self::OutboundSubstream; + + /// Polls the outbound substream. + /// + /// May panic or produce an undefined result if an earlier polling returned `Ready` or `Err`. + fn poll_outbound( + &self, + substream: &mut Self::OutboundSubstream, + ) -> Poll, IoError>; + + /// Destroys an outbound substream. Use this after the outbound substream has finished, or if + /// you want to interrupt it. + fn destroy_outbound(&self, substream: Self::OutboundSubstream); + + /// Reads data from a substream. The behaviour is the same as `std::io::Read::read`. + fn read_substream( + &self, + substream: &mut Self::Substream, + buf: &mut [u8], + ) -> Result; + + /// Write data to a substream. The behaviour is the same as `std::io::Write::write`. + fn write_substream( + &self, + substream: &mut Self::Substream, + buf: &[u8], + ) -> Result; + + /// Flushes a substream. The behaviour is the same as `std::io::Write::flush`. + fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError>; + + /// Attempts to shut down a substream. The behaviour is the same as + /// `tokio_io::AsyncWrite::shutdown`. + fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError>; + + /// Destroys a substream. + fn destroy_substream(&self, substream: Self::Substream); +} + +/// Polls for an inbound from the muxer but wraps the output in an object that +/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. +#[inline] +pub fn inbound_from_ref_and_wrap

( + muxer: P, +) -> impl Future>, Error = IoError> +where + P: Deref + Clone, + P::Target: StreamMuxer, +{ + let muxer2 = muxer.clone(); + future::poll_fn(move || muxer.poll_inbound()) + .map(|substream| substream.map(move |s| substream_from_ref(muxer2, s))) +} + +/// Same as `outbound_from_ref`, but wraps the output in an object that +/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. +#[inline] +pub fn outbound_from_ref_and_wrap

(muxer: P) -> OutboundSubstreamRefWrapFuture

+where + P: Deref + Clone, + P::Target: StreamMuxer, +{ + let inner = outbound_from_ref(muxer); + OutboundSubstreamRefWrapFuture { inner } +} + +/// Future returned by `outbound_from_ref_and_wrap`. +pub struct OutboundSubstreamRefWrapFuture

+where + P: Deref + Clone, + P::Target: StreamMuxer, +{ + inner: OutboundSubstreamRefFuture

, +} + +impl

Future for OutboundSubstreamRefWrapFuture

+where + P: Deref + Clone, + P::Target: StreamMuxer, +{ + type Item = Option>; + type Error = IoError; + + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::Ready(Some(substream))) => { + let out = substream_from_ref(self.inner.muxer.clone(), substream); + Ok(Async::Ready(Some(out))) + } + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err), + } + } +} + +/// Builds a new future for an outbound substream, where the muxer is a reference. +#[inline] +pub fn outbound_from_ref

(muxer: P) -> OutboundSubstreamRefFuture

+where + P: Deref, + P::Target: StreamMuxer, +{ + let outbound = muxer.open_outbound(); + OutboundSubstreamRefFuture { + muxer, + outbound: Some(outbound), + } +} + +/// Future returned by `outbound_from_ref`. +pub struct OutboundSubstreamRefFuture

+where + P: Deref, + P::Target: StreamMuxer, +{ + muxer: P, + outbound: Option<::OutboundSubstream>, +} + +impl

Future for OutboundSubstreamRefFuture

+where + P: Deref, + P::Target: StreamMuxer, +{ + type Item = Option<::Substream>; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + self.muxer + .poll_outbound(self.outbound.as_mut().expect("outbound was empty")) + } +} + +impl

Drop for OutboundSubstreamRefFuture

+where + P: Deref, + P::Target: StreamMuxer, +{ + #[inline] + fn drop(&mut self) { + self.muxer + .destroy_outbound(self.outbound.take().expect("outbound was empty")) + } +} + +/// Builds an implementation of `Read`/`Write`/`AsyncRead`/`AsyncWrite` from an `Arc` to the +/// muxer and a substream. +#[inline] +pub fn substream_from_ref

( + muxer: P, + substream: ::Substream, +) -> SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ + SubstreamRef { + muxer, + substream: Some(substream), + } +} + +/// Stream returned by `substream_from_ref`. +pub struct SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ + muxer: P, + substream: Option<::Substream>, +} + +impl

Read for SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.muxer + .read_substream(self.substream.as_mut().expect("substream was empty"), buf) + } +} + +impl

AsyncRead for SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ +} + +impl

Write for SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + self.muxer + .write_substream(self.substream.as_mut().expect("substream was empty"), buf) + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + self.muxer + .flush_substream(self.substream.as_mut().expect("substream was empty")) + } +} + +impl

AsyncWrite for SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + self.muxer + .shutdown_substream(self.substream.as_mut().expect("substream was empty")) + } +} + +impl

Drop for SubstreamRef

+where + P: Deref, + P::Target: StreamMuxer, +{ + #[inline] + fn drop(&mut self) { + self.muxer + .destroy_substream(self.substream.take().expect("substream was empty")) + } } diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs index 4c0c9210..2cdc0580 100644 --- a/core/tests/multiplex.rs +++ b/core/tests/multiplex.rs @@ -29,8 +29,8 @@ extern crate tokio_io; use bytes::BytesMut; use futures::future::Future; use futures::{Sink, Stream}; -use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport, transport}; -use std::sync::atomic; +use libp2p_core::{muxing, Multiaddr, MuxedTransport, Transport, transport}; +use std::sync::{atomic, Arc}; use std::thread; use tokio_io::codec::length_delimited::Framed; @@ -105,7 +105,7 @@ fn client_to_server_outbound() { .with_upgrade(multiplex::MplexConfig::new()) .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) - .and_then(|client| client.0.outbound()) + .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client.0))) .map(|server| Framed::<_, BytesMut>::new(server.unwrap())) .and_then(|server| server.send("hello world".into())) .map(|_| ()); @@ -199,10 +199,10 @@ fn use_opened_listen_to_dial() { .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap()) - .map(|client| client.0) + .map(|client| Arc::new(client.0)) .and_then(|c| { let c2 = c.clone(); - c.clone().inbound().map(move |i| (c2, i)) + muxing::inbound_from_ref_and_wrap(c.clone()).map(move |i| (c2, i)) }) .map(|(muxer, client)| (muxer, Framed::<_, BytesMut>::new(client.unwrap()))) .and_then(|(muxer, client)| { @@ -214,7 +214,7 @@ fn use_opened_listen_to_dial() { .and_then(|(muxer, (msg, _))| { let msg = msg.unwrap(); assert_eq!(msg, "hello world"); - muxer.outbound() + muxing::outbound_from_ref_and_wrap(muxer) }) .map(|client| Framed::<_, BytesMut>::new(client.unwrap())) .and_then(|client| client.into_future().map_err(|(err, _)| err)) diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 3028a5f3..3664adcb 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -33,8 +33,8 @@ extern crate unsigned_varint; mod codec; use std::{cmp, iter}; -use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind}; -use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::sync::{atomic::AtomicUsize, atomic::Ordering}; use bytes::Bytes; use core::{ConnectionUpgrade, Endpoint, StreamMuxer}; use parking_lot::Mutex; @@ -129,7 +129,7 @@ where let max_buffer_len = self.max_buffer_len; let out = Multiplex { - inner: Arc::new(Mutex::new(MultiplexInner { + inner: Mutex::new(MultiplexInner { error: Ok(()), inner: Framed::new(i, codec::Codec::new()).fuse(), config: self, @@ -137,7 +137,7 @@ where opened_substreams: Default::default(), next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 }, to_notify: Default::default(), - })) + }) }; future::ok((out, remote_addr)) @@ -151,16 +151,7 @@ where /// Multiplexer. Implements the `StreamMuxer` trait. pub struct Multiplex { - inner: Arc>>, -} - -impl Clone for Multiplex { - #[inline] - fn clone(&self) -> Self { - Multiplex { - inner: self.inner.clone(), - } - } + inner: Mutex>, } // Struct shared throughout the implementation. @@ -308,81 +299,12 @@ where C: AsyncRead + AsyncWrite } impl StreamMuxer for Multiplex -where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/ -{ - type Substream = Substream; - type InboundSubstream = InboundSubstream; - type OutboundSubstream = Box, Error = IoError> + 'static>; - - #[inline] - fn inbound(self) -> Self::InboundSubstream { - InboundSubstream { inner: self.inner } - } - - #[inline] - fn outbound(self) -> Self::OutboundSubstream { - let mut inner = self.inner.lock(); - - // Assign a substream ID now. - let substream_id = { - let n = inner.next_outbound_stream_id; - inner.next_outbound_stream_id += 2; - n - }; - - // We use an RAII guard, so that we close the substream in case of an error. - struct OpenedSubstreamGuard(Option>>>, u32); - impl Drop for OpenedSubstreamGuard { - fn drop(&mut self) { - if let Some(inner) = self.0.take() { - debug!("Failed to open outbound substream {}", self.1); - inner.lock().buffer.retain(|elem| elem.substream_id() != self.1); - } - } - } - inner.opened_substreams.insert(substream_id); - let mut guard = OpenedSubstreamGuard(Some(self.inner.clone()), substream_id); - - // We send `Open { substream_id }`, then flush, then only produce the substream. - let future = { - future::poll_fn({ - let inner = self.inner.clone(); - move || { - let elem = codec::Elem::Open { substream_id }; - poll_send(&mut inner.lock(), elem) - } - }).and_then({ - let inner = self.inner.clone(); - move |()| { - future::poll_fn(move || inner.lock().inner.poll_complete()) - } - }).map(move |()| { - debug!("Successfully opened outbound substream {}", substream_id); - Some(Substream { - inner: guard.0.take().unwrap(), - num: substream_id, - current_data: Bytes::new(), - endpoint: Endpoint::Dialer, - }) - }) - }; - - Box::new(future) as Box<_> - } -} - -/// Future to the next incoming substream. -pub struct InboundSubstream { - inner: Arc>>, -} - -impl Future for InboundSubstream where C: AsyncRead + AsyncWrite { - type Item = Option>; - type Error = IoError; + type Substream = Substream; + type OutboundSubstream = OutboundSubstream; - fn poll(&mut self) -> Poll { + fn poll_inbound(&self) -> Poll, IoError> { let mut inner = self.inner.lock(); if inner.opened_substreams.len() >= inner.config.max_substreams { @@ -401,7 +323,6 @@ where C: AsyncRead + AsyncWrite if let Some(num) = num { debug!("Successfully opened inbound substream {}", num); Ok(Async::Ready(Some(Substream { - inner: self.inner.clone(), current_data: Bytes::new(), num, endpoint: Endpoint::Listener, @@ -410,28 +331,81 @@ where C: AsyncRead + AsyncWrite Ok(Async::Ready(None)) } } -} -/// Active substream to the remote. Implements `AsyncRead` and `AsyncWrite`. -pub struct Substream -where C: AsyncRead + AsyncWrite -{ - inner: Arc>>, - num: u32, - // Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`. - current_data: Bytes, - endpoint: Endpoint, -} + fn open_outbound(&self) -> Self::OutboundSubstream { + let mut inner = self.inner.lock(); -impl Read for Substream -where C: AsyncRead + AsyncWrite -{ - fn read(&mut self, buf: &mut [u8]) -> Result { + // Assign a substream ID now. + let substream_id = { + let n = inner.next_outbound_stream_id; + inner.next_outbound_stream_id += 2; + n + }; + + inner.opened_substreams.insert(substream_id); + + OutboundSubstream { + num: substream_id, + state: OutboundSubstreamState::SendElem(codec::Elem::Open { substream_id }), + } + } + + fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll, IoError> { + loop { + let polling = match substream.state { + OutboundSubstreamState::SendElem(ref elem) => { + let mut inner = self.inner.lock(); + poll_send(&mut inner, elem.clone()) + }, + OutboundSubstreamState::Flush => { + let mut inner = self.inner.lock(); + inner.inner.poll_complete() + }, + OutboundSubstreamState::Done => { + panic!("Polling outbound substream after it's been succesfully open"); + }, + }; + + match polling { + Ok(Async::Ready(())) => (), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(err) => { + debug!("Failed to open outbound substream {}", substream.num); + self.inner.lock().buffer.retain(|elem| elem.substream_id() != substream.num); + return Err(err) + }, + }; + + // Going to next step. + match substream.state { + OutboundSubstreamState::SendElem(_) => { + substream.state = OutboundSubstreamState::Flush; + }, + OutboundSubstreamState::Flush => { + debug!("Successfully opened outbound substream {}", substream.num); + substream.state = OutboundSubstreamState::Done; + return Ok(Async::Ready(Some(Substream { + num: substream.num, + current_data: Bytes::new(), + endpoint: Endpoint::Dialer, + }))); + }, + OutboundSubstreamState::Done => unreachable!(), + } + } + } + + #[inline] + fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { + // Nothing to do. + } + + fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result { loop { // First, transfer from `current_data`. - if self.current_data.len() != 0 { - let len = cmp::min(self.current_data.len(), buf.len()); - buf[..len].copy_from_slice(&self.current_data.split_to(len)); + if substream.current_data.len() != 0 { + let len = cmp::min(substream.current_data.len(), buf.len()); + buf[..len].copy_from_slice(&substream.current_data.split_to(len)); return Ok(len); } @@ -439,22 +413,22 @@ where C: AsyncRead + AsyncWrite let mut inner = self.inner.lock(); let next_data_poll = next_match(&mut inner, |elem| { match elem { - &codec::Elem::Data { ref substream_id, ref data, .. } if *substream_id == self.num => { // TODO: check endpoint? + &codec::Elem::Data { ref substream_id, ref data, .. } if *substream_id == substream.num => { // TODO: check endpoint? Some(data.clone()) }, _ => None, } }); - // We're in a loop, so all we need to do is set `self.current_data` to the data we + // We're in a loop, so all we need to do is set `substream.current_data` to the data we // just read and wait for the next iteration. match next_data_poll { - Ok(Async::Ready(Some(data))) => self.current_data = data.freeze(), + Ok(Async::Ready(Some(data))) => substream.current_data = data.freeze(), Ok(Async::Ready(None)) => return Ok(0), Ok(Async::NotReady) => { // There was no data packet in the buffer about this substream ; maybe it's // because it has been closed. - if inner.opened_substreams.contains(&self.num) { + if inner.opened_substreams.contains(&substream.num) { return Err(IoErrorKind::WouldBlock.into()); } else { return Ok(0); @@ -464,21 +438,12 @@ where C: AsyncRead + AsyncWrite } } } -} -impl AsyncRead for Substream -where C: AsyncRead + AsyncWrite -{ -} - -impl Write for Substream -where C: AsyncRead + AsyncWrite -{ - fn write(&mut self, buf: &[u8]) -> Result { + fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result { let elem = codec::Elem::Data { - substream_id: self.num, + substream_id: substream.num, data: From::from(buf), - endpoint: self.endpoint, + endpoint: substream.endpoint, }; let mut inner = self.inner.lock(); @@ -489,7 +454,7 @@ where C: AsyncRead + AsyncWrite } } - fn flush(&mut self) -> Result<(), IoError> { + fn flush_substream(&self, _substream: &mut Self::Substream) -> Result<(), IoError> { let mut inner = self.inner.lock(); match inner.inner.poll_complete() { Ok(Async::Ready(())) => Ok(()), @@ -497,27 +462,44 @@ where C: AsyncRead + AsyncWrite Err(err) => Err(err), } } -} -impl AsyncWrite for Substream -where C: AsyncRead + AsyncWrite -{ - fn shutdown(&mut self) -> Poll<(), IoError> { + fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { let elem = codec::Elem::Reset { - substream_id: self.num, - endpoint: self.endpoint, + substream_id: substream.num, + endpoint: substream.endpoint, }; let mut inner = self.inner.lock(); poll_send(&mut inner, elem) } -} -impl Drop for Substream -where C: AsyncRead + AsyncWrite -{ - fn drop(&mut self) { - let _ = self.shutdown(); // TODO: this doesn't necessarily send the close message - self.inner.lock().buffer.retain(|elem| elem.substream_id() != self.num); + fn destroy_substream(&self, mut substream: Self::Substream) { + let _ = self.shutdown_substream(&mut substream); // TODO: this doesn't necessarily send the close message + self.inner.lock().buffer.retain(|elem| elem.substream_id() != substream.num); } } + +/// Active attempt to open an outbound substream. +pub struct OutboundSubstream { + /// Substream number. + num: u32, + state: OutboundSubstreamState, +} + +enum OutboundSubstreamState { + /// We need to send `Elem` on the underlying stream. + SendElem(codec::Elem), + /// We need to flush the underlying stream. + Flush, + /// The substream is open and the `OutboundSubstream` is now useless. + Done, +} + +/// Active substream to the remote. +pub struct Substream { + /// Substream number. + num: u32, + // Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`. + current_data: Bytes, + endpoint: Endpoint, +} diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 95a21f0c..7c030fa9 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -28,9 +28,9 @@ extern crate tokio_io; use futures::future::Future; use futures::{Sink, Stream}; -use std::sync::mpsc; +use std::sync::{Arc, mpsc}; use std::thread; -use swarm::{StreamMuxer, Transport}; +use swarm::{muxing, Transport}; use tcp::TcpConfig; use tokio_io::codec::length_delimited::Framed; @@ -52,8 +52,8 @@ fn client_to_server_outbound() { let future = listener .into_future() .map_err(|(err, _)| err) - .and_then(|(client, _)| client.unwrap().map(|v| v.0)) - .and_then(|client| client.outbound()) + .and_then(|(client, _)| client.unwrap().map(|v| Arc::new(v.0))) + .and_then(|client| muxing::outbound_from_ref_and_wrap(client)) .map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap())) .and_then(|client| { client @@ -75,7 +75,7 @@ fn client_to_server_outbound() { let future = transport .dial(rx.recv().unwrap()) .unwrap() - .and_then(|client| client.0.inbound()) + .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client.0))) .map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap())) .and_then(|server| server.send("hello world".into())) .map(|_| ()); @@ -103,7 +103,7 @@ fn client_to_server_inbound() { .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap().map(|v| v.0)) - .and_then(|client| client.inbound()) + .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) .map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap())) .and_then(|client| { client @@ -125,7 +125,7 @@ fn client_to_server_inbound() { let future = transport .dial(rx.recv().unwrap()) .unwrap() - .and_then(|(client, _)| client.outbound()) + .and_then(|(client, _)| muxing::outbound_from_ref_and_wrap(Arc::new(client))) .map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap())) .and_then(|server| server.send("hello world".into())) .map(|_| ()); diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index e1dd08fc..1e43a0da 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -9,5 +9,6 @@ bytes = "0.4" futures = "0.1" libp2p-core = { path = "../../core" } log = "0.4" +parking_lot = "0.6" tokio-io = "0.1" yamux = { git = "https://github.com/paritytech/yamux" } diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 2c3cc2ee..46c1c993 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -23,30 +23,27 @@ extern crate futures; #[macro_use] extern crate log; extern crate libp2p_core as core; +extern crate parking_lot; extern crate tokio_io; extern crate yamux; use bytes::Bytes; use core::Endpoint; use futures::{future::{self, FutureResult}, prelude::*}; +use parking_lot::Mutex; use std::{io, iter}; +use std::io::{Read, Write, Error as IoError}; use tokio_io::{AsyncRead, AsyncWrite}; -pub struct Yamux(yamux::Connection); - -impl Clone for Yamux { - fn clone(&self) -> Self { - Yamux(self.0.clone()) - } -} +pub struct Yamux(Mutex>); impl Yamux where C: AsyncRead + AsyncWrite + 'static { pub fn new(c: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { - Yamux(yamux::Connection::new(c, cfg, mode)) + Yamux(Mutex::new(yamux::Connection::new(c, cfg, mode))) } } @@ -55,31 +52,11 @@ where C: AsyncRead + AsyncWrite + 'static { type Substream = yamux::StreamHandle; - type InboundSubstream = InboundFuture; type OutboundSubstream = FutureResult, io::Error>; - fn inbound(self) -> Self::InboundSubstream { - InboundFuture(self.0) - } - - fn outbound(self) -> Self::OutboundSubstream { - let stream = self.0.open_stream().map_err(|e| io::Error::new(io::ErrorKind::Other, e)); - future::result(stream) - } -} - - -pub struct InboundFuture(yamux::Connection); - -impl Future for InboundFuture -where - C: AsyncRead + AsyncWrite + 'static -{ - type Item = Option>; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - match self.0.poll() { + #[inline] + fn poll_inbound(&self) -> Poll, IoError> { + match self.0.lock().poll() { Err(e) => { error!("connection error: {}", e); Err(io::Error::new(io::ErrorKind::Other, e)) @@ -89,9 +66,49 @@ where Ok(Async::Ready(Some(stream))) => Ok(Async::Ready(Some(stream))) } } + + #[inline] + fn open_outbound(&self) -> Self::OutboundSubstream { + let stream = self.0.lock().open_stream().map_err(|e| io::Error::new(io::ErrorKind::Other, e)); + future::result(stream) + } + + #[inline] + fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll, IoError> { + substream.poll() + } + + #[inline] + fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { + } + + #[inline] + fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Result { + substream.read(buf) + } + + #[inline] + fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Result { + substream.write(buf) + } + + #[inline] + fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> { + substream.flush() + } + + #[inline] + fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> { + substream.shutdown() + } + + #[inline] + fn destroy_substream(&self, _substream: Self::Substream) { + } } + #[derive(Clone)] pub struct Config(yamux::Config);