diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 1a6ee44f..05b43300 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -46,9 +46,9 @@ use futures::{Future, Stream, Async, Poll}; use futures::stream::Fuse as StreamFuse; use multiaddr::Multiaddr; +use muxing::StreamMuxer; use smallvec::SmallVec; use std::io::Error as IoError; -use muxing::StreamMuxer; use transport::{Transport, ConnectionUpgrade, UpgradedNode}; /// Allows reusing the same muxed connection multiple times. @@ -58,130 +58,129 @@ use transport::{Transport, ConnectionUpgrade, UpgradedNode}; /// Implements the `Transport` trait. #[derive(Clone)] pub struct ConnectionReuse - where T: Transport, - C: ConnectionUpgrade, + where T: Transport, + C: ConnectionUpgrade { - // Underlying transport and connection upgrade for when we need to dial or listen. - inner: UpgradedNode, + // Underlying transport and connection upgrade for when we need to dial or listen. + inner: UpgradedNode, } impl From> for ConnectionReuse - where T: Transport, - C: ConnectionUpgrade, + where T: Transport, + C: ConnectionUpgrade { - #[inline] - fn from(node: UpgradedNode) -> ConnectionReuse { - ConnectionReuse { - inner: node, - } - } + #[inline] + fn from(node: UpgradedNode) -> ConnectionReuse { + ConnectionReuse { inner: node } + } } impl Transport for ConnectionReuse - where T: Transport + 'static, // TODO: 'static :( - C: ConnectionUpgrade + 'static, // TODO: 'static :( - C: Clone, - C::Output: StreamMuxer + Clone, - C::NamesIter: Clone, // TODO: not elegant + where T: Transport + 'static, // TODO: 'static :( + C: ConnectionUpgrade + 'static, // TODO: 'static :( + C: Clone, + C::Output: StreamMuxer + Clone, + C::NamesIter: Clone // TODO: not elegant { type RawConn = ::Substream; - type Listener = ConnectionReuseListener>, C::Output>; + type Listener = ConnectionReuseListener< + Box< + Stream< + Item = (C::Output, Multiaddr), + Error = IoError, + >, + >, + C::Output, + >; type Dial = Box>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { - Ok((l, a)) => (l, a), - Err((inner, addr)) => { - return Err((ConnectionReuse { - inner: inner, - }, addr)); - } - }; + let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { + Ok((l, a)) => (l, a), + Err((inner, addr)) => { + return Err((ConnectionReuse { inner: inner }, addr)); + } + }; - let listener = ConnectionReuseListener { - listener: listener.fuse(), - connections: Vec::new(), - }; + let listener = ConnectionReuseListener { + listener: listener.fuse(), + connections: Vec::new(), + }; - Ok((listener, new_addr)) - } + Ok((listener, new_addr)) + } fn dial(self, addr: Multiaddr) -> Result { - let dial = match self.inner.dial(addr) { - Ok(l) => l, - Err((inner, addr)) => { - return Err((ConnectionReuse { - inner: inner, - }, addr)); - } - }; + let dial = match self.inner.dial(addr) { + Ok(l) => l, + Err((inner, addr)) => { + return Err((ConnectionReuse { inner: inner }, addr)); + } + }; - let future = dial - .and_then(|dial| { - dial.outbound() - }); - Ok(Box::new(future) as Box<_>) - } + let future = dial.and_then(|dial| dial.outbound()); + Ok(Box::new(future) as Box<_>) + } } /// Implementation of `Stream - where S: Stream, - M: StreamMuxer, + where S: Stream, + M: StreamMuxer { - listener: StreamFuse, - connections: Vec<(M, ::InboundSubstream, Multiaddr)>, + listener: StreamFuse, + connections: Vec<(M, ::InboundSubstream, Multiaddr)>, } impl Stream for ConnectionReuseListener - where S: Stream, - M: StreamMuxer + Clone + 'static, // TODO: 'static :( + where S: Stream, + M: StreamMuxer + Clone + 'static // TODO: 'static :( { - type Item = (M::Substream, Multiaddr); - type Error = IoError; + type Item = (M::Substream, Multiaddr); + type Error = IoError; - fn poll(&mut self) -> Poll, Self::Error> { - match self.listener.poll() { - Ok(Async::Ready(Some((upgrade, client_addr)))) => { - let next_incoming = upgrade.clone().inbound(); - self.connections.push((upgrade, next_incoming, client_addr)); - }, - Ok(Async::NotReady) => (), - Ok(Async::Ready(None)) => { - if self.connections.is_empty() { - return Ok(Async::Ready(None)); - } - }, - Err(err) => { - if self.connections.is_empty() { - return Err(err); - } - }, - }; + fn poll(&mut self) -> Poll, Self::Error> { + match self.listener.poll() { + Ok(Async::Ready(Some((upgrade, client_addr)))) => { + let next_incoming = upgrade.clone().inbound(); + self.connections.push((upgrade, next_incoming, client_addr)); + } + Ok(Async::NotReady) => (), + Ok(Async::Ready(None)) => { + if self.connections.is_empty() { + return Ok(Async::Ready(None)); + } + } + Err(err) => { + if self.connections.is_empty() { + return Err(err); + } + } + }; - let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new(); + let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new(); - for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in - self.connections.iter_mut().enumerate() - { - match next_incoming.poll() { - Ok(Async::Ready(incoming)) => { - let mut new_next = muxer.clone().inbound(); - *next_incoming = new_next; - return Ok(Async::Ready(Some((incoming, client_addr.clone())))); - }, - Ok(Async::NotReady) => (), - Err(_) => { - connections_to_drop.push(index); - }, - }; - } + for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in + self.connections.iter_mut().enumerate() + { + match next_incoming.poll() { + Ok(Async::Ready(incoming)) => { + let mut new_next = muxer.clone().inbound(); + *next_incoming = new_next; + return Ok(Async::Ready(Some((incoming, client_addr.clone())))); + } + Ok(Async::NotReady) => (), + Err(_) => { + connections_to_drop.push(index); + } + }; + } - for &index in connections_to_drop.iter().rev() { - self.connections.remove(index); - } + for &index in connections_to_drop.iter().rev() { + self.connections.remove(index); + } - Ok(Async::NotReady) - } + Ok(Async::NotReady) + } } diff --git a/libp2p-swarm/src/muxing.rs b/libp2p-swarm/src/muxing.rs index d38576d0..d4ba2588 100644 --- a/libp2p-swarm/src/muxing.rs +++ b/libp2p-swarm/src/muxing.rs @@ -27,33 +27,35 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// > **Note**: The methods of this trait consume the object, but if the object implements `Clone` /// > then you can clone it and keep the original in order to open additional substreams. pub trait StreamMuxer { - /// Type of the object that represents the raw substream where data can be read and written. - type Substream: AsyncRead + AsyncWrite; - /// Future that will be resolved when a new incoming substream is open. - type InboundSubstream: Future; - /// Future that will be resolved when the outgoing substream is open. - type OutboundSubstream: Future; + /// Type of the object that represents the raw substream where data can be read and written. + type Substream: AsyncRead + AsyncWrite; + /// Future that will be resolved when a new incoming substream is open. + type InboundSubstream: Future; + /// Future that will be resolved when the outgoing substream is open. + type OutboundSubstream: Future; - /// Produces a future that will be resolved when a new incoming substream arrives. - fn inbound(self) -> Self::InboundSubstream; + /// Produces a future that will be resolved when a new incoming substream arrives. + fn inbound(self) -> Self::InboundSubstream; - /// Opens a new outgoing substream, and produces a future that will be resolved when it becomes - /// available. - fn outbound(self) -> Self::OutboundSubstream; + /// Opens a new outgoing substream, and produces a future that will be resolved when it becomes + /// available. + fn outbound(self) -> Self::OutboundSubstream; } -impl StreamMuxer for T where T: AsyncRead + AsyncWrite { - type Substream = Self; - type InboundSubstream = FutureResult; // TODO: use ! - type OutboundSubstream = FutureResult; // TODO: use ! +impl StreamMuxer for T + where T: AsyncRead + AsyncWrite +{ + type Substream = Self; + type InboundSubstream = FutureResult; // TODO: use ! + type OutboundSubstream = FutureResult; // TODO: use ! - #[inline] - fn inbound(self) -> Self::InboundSubstream { - ok(self) - } + #[inline] + fn inbound(self) -> Self::InboundSubstream { + ok(self) + } - #[inline] - fn outbound(self) -> Self::OutboundSubstream { - ok(self) - } + #[inline] + fn outbound(self) -> Self::OutboundSubstream { + ok(self) + } } diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 03ce6fee..6f0100a1 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -549,7 +549,10 @@ impl<'a, T, C> UpgradedNode pub fn listen_on( self, addr: Multiaddr, - ) -> Result<(Box + 'a>, Multiaddr), (Self, Multiaddr)> + ) -> Result< + (Box + 'a>, Multiaddr), + (Self, Multiaddr), + > where C::NamesIter: Clone, // TODO: not elegant C: Clone {