mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-29 01:31:33 +00:00
Run rustfmt on the code and use tabs
This commit is contained in:
@ -46,9 +46,9 @@
|
||||
use futures::{Future, Stream, Async, Poll};
|
||||
use futures::stream::Fuse as StreamFuse;
|
||||
use multiaddr::Multiaddr;
|
||||
use muxing::StreamMuxer;
|
||||
use smallvec::SmallVec;
|
||||
use std::io::Error as IoError;
|
||||
use muxing::StreamMuxer;
|
||||
use transport::{Transport, ConnectionUpgrade, UpgradedNode};
|
||||
|
||||
/// Allows reusing the same muxed connection multiple times.
|
||||
@ -58,130 +58,129 @@ use transport::{Transport, ConnectionUpgrade, UpgradedNode};
|
||||
/// Implements the `Transport` trait.
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectionReuse<T, C>
|
||||
where T: Transport,
|
||||
C: ConnectionUpgrade<T::RawConn>,
|
||||
where T: Transport,
|
||||
C: ConnectionUpgrade<T::RawConn>
|
||||
{
|
||||
// Underlying transport and connection upgrade for when we need to dial or listen.
|
||||
inner: UpgradedNode<T, C>,
|
||||
// Underlying transport and connection upgrade for when we need to dial or listen.
|
||||
inner: UpgradedNode<T, C>,
|
||||
}
|
||||
|
||||
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
|
||||
where T: Transport,
|
||||
C: ConnectionUpgrade<T::RawConn>,
|
||||
where T: Transport,
|
||||
C: ConnectionUpgrade<T::RawConn>
|
||||
{
|
||||
#[inline]
|
||||
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
|
||||
ConnectionReuse {
|
||||
inner: node,
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
|
||||
ConnectionReuse { inner: node }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C> Transport for ConnectionReuse<T, C>
|
||||
where T: Transport + 'static, // TODO: 'static :(
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
|
||||
C: Clone,
|
||||
C::Output: StreamMuxer + Clone,
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
where T: Transport + 'static, // TODO: 'static :(
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
|
||||
C: Clone,
|
||||
C::Output: StreamMuxer + Clone,
|
||||
C::NamesIter: Clone // TODO: not elegant
|
||||
{
|
||||
type RawConn = <C::Output as StreamMuxer>::Substream;
|
||||
type Listener = ConnectionReuseListener<Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>, C::Output>;
|
||||
type Listener = ConnectionReuseListener<
|
||||
Box<
|
||||
Stream<
|
||||
Item = (C::Output, Multiaddr),
|
||||
Error = IoError,
|
||||
>,
|
||||
>,
|
||||
C::Output,
|
||||
>;
|
||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
|
||||
Ok((l, a)) => (l, a),
|
||||
Err((inner, addr)) => {
|
||||
return Err((ConnectionReuse {
|
||||
inner: inner,
|
||||
}, addr));
|
||||
}
|
||||
};
|
||||
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
|
||||
Ok((l, a)) => (l, a),
|
||||
Err((inner, addr)) => {
|
||||
return Err((ConnectionReuse { inner: inner }, addr));
|
||||
}
|
||||
};
|
||||
|
||||
let listener = ConnectionReuseListener {
|
||||
listener: listener.fuse(),
|
||||
connections: Vec::new(),
|
||||
};
|
||||
let listener = ConnectionReuseListener {
|
||||
listener: listener.fuse(),
|
||||
connections: Vec::new(),
|
||||
};
|
||||
|
||||
Ok((listener, new_addr))
|
||||
}
|
||||
Ok((listener, new_addr))
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||
let dial = match self.inner.dial(addr) {
|
||||
Ok(l) => l,
|
||||
Err((inner, addr)) => {
|
||||
return Err((ConnectionReuse {
|
||||
inner: inner,
|
||||
}, addr));
|
||||
}
|
||||
};
|
||||
let dial = match self.inner.dial(addr) {
|
||||
Ok(l) => l,
|
||||
Err((inner, addr)) => {
|
||||
return Err((ConnectionReuse { inner: inner }, addr));
|
||||
}
|
||||
};
|
||||
|
||||
let future = dial
|
||||
.and_then(|dial| {
|
||||
dial.outbound()
|
||||
});
|
||||
Ok(Box::new(future) as Box<_>)
|
||||
}
|
||||
let future = dial.and_then(|dial| dial.outbound());
|
||||
Ok(Box::new(future) as Box<_>)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
|
||||
/// `ConnectionReuse` struct.
|
||||
pub struct ConnectionReuseListener<S, M>
|
||||
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
|
||||
M: StreamMuxer,
|
||||
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
|
||||
M: StreamMuxer
|
||||
{
|
||||
listener: StreamFuse<S>,
|
||||
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
|
||||
listener: StreamFuse<S>,
|
||||
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
|
||||
}
|
||||
|
||||
impl<S, M> Stream for ConnectionReuseListener<S, M>
|
||||
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
|
||||
M: StreamMuxer + Clone + 'static, // TODO: 'static :(
|
||||
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
|
||||
M: StreamMuxer + Clone + 'static // TODO: 'static :(
|
||||
{
|
||||
type Item = (M::Substream, Multiaddr);
|
||||
type Error = IoError;
|
||||
type Item = (M::Substream, Multiaddr);
|
||||
type Error = IoError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match self.listener.poll() {
|
||||
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
|
||||
let next_incoming = upgrade.clone().inbound();
|
||||
self.connections.push((upgrade, next_incoming, client_addr));
|
||||
},
|
||||
Ok(Async::NotReady) => (),
|
||||
Ok(Async::Ready(None)) => {
|
||||
if self.connections.is_empty() {
|
||||
return Ok(Async::Ready(None));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
if self.connections.is_empty() {
|
||||
return Err(err);
|
||||
}
|
||||
},
|
||||
};
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match self.listener.poll() {
|
||||
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
|
||||
let next_incoming = upgrade.clone().inbound();
|
||||
self.connections.push((upgrade, next_incoming, client_addr));
|
||||
}
|
||||
Ok(Async::NotReady) => (),
|
||||
Ok(Async::Ready(None)) => {
|
||||
if self.connections.is_empty() {
|
||||
return Ok(Async::Ready(None));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
if self.connections.is_empty() {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new();
|
||||
let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new();
|
||||
|
||||
for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in
|
||||
self.connections.iter_mut().enumerate()
|
||||
{
|
||||
match next_incoming.poll() {
|
||||
Ok(Async::Ready(incoming)) => {
|
||||
let mut new_next = muxer.clone().inbound();
|
||||
*next_incoming = new_next;
|
||||
return Ok(Async::Ready(Some((incoming, client_addr.clone()))));
|
||||
},
|
||||
Ok(Async::NotReady) => (),
|
||||
Err(_) => {
|
||||
connections_to_drop.push(index);
|
||||
},
|
||||
};
|
||||
}
|
||||
for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in
|
||||
self.connections.iter_mut().enumerate()
|
||||
{
|
||||
match next_incoming.poll() {
|
||||
Ok(Async::Ready(incoming)) => {
|
||||
let mut new_next = muxer.clone().inbound();
|
||||
*next_incoming = new_next;
|
||||
return Ok(Async::Ready(Some((incoming, client_addr.clone()))));
|
||||
}
|
||||
Ok(Async::NotReady) => (),
|
||||
Err(_) => {
|
||||
connections_to_drop.push(index);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
for &index in connections_to_drop.iter().rev() {
|
||||
self.connections.remove(index);
|
||||
}
|
||||
for &index in connections_to_drop.iter().rev() {
|
||||
self.connections.remove(index);
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
@ -27,33 +27,35 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
/// > **Note**: The methods of this trait consume the object, but if the object implements `Clone`
|
||||
/// > then you can clone it and keep the original in order to open additional substreams.
|
||||
pub trait StreamMuxer {
|
||||
/// Type of the object that represents the raw substream where data can be read and written.
|
||||
type Substream: AsyncRead + AsyncWrite;
|
||||
/// Future that will be resolved when a new incoming substream is open.
|
||||
type InboundSubstream: Future<Item = Self::Substream, Error = IoError>;
|
||||
/// Future that will be resolved when the outgoing substream is open.
|
||||
type OutboundSubstream: Future<Item = Self::Substream, Error = IoError>;
|
||||
/// Type of the object that represents the raw substream where data can be read and written.
|
||||
type Substream: AsyncRead + AsyncWrite;
|
||||
/// Future that will be resolved when a new incoming substream is open.
|
||||
type InboundSubstream: Future<Item = Self::Substream, Error = IoError>;
|
||||
/// Future that will be resolved when the outgoing substream is open.
|
||||
type OutboundSubstream: Future<Item = Self::Substream, Error = IoError>;
|
||||
|
||||
/// Produces a future that will be resolved when a new incoming substream arrives.
|
||||
fn inbound(self) -> Self::InboundSubstream;
|
||||
/// Produces a future that will be resolved when a new incoming substream arrives.
|
||||
fn inbound(self) -> Self::InboundSubstream;
|
||||
|
||||
/// Opens a new outgoing substream, and produces a future that will be resolved when it becomes
|
||||
/// available.
|
||||
fn outbound(self) -> Self::OutboundSubstream;
|
||||
/// Opens a new outgoing substream, and produces a future that will be resolved when it becomes
|
||||
/// available.
|
||||
fn outbound(self) -> Self::OutboundSubstream;
|
||||
}
|
||||
|
||||
impl<T> StreamMuxer for T where T: AsyncRead + AsyncWrite {
|
||||
type Substream = Self;
|
||||
type InboundSubstream = FutureResult<Self, IoError>; // TODO: use !
|
||||
type OutboundSubstream = FutureResult<Self, IoError>; // TODO: use !
|
||||
impl<T> StreamMuxer for T
|
||||
where T: AsyncRead + AsyncWrite
|
||||
{
|
||||
type Substream = Self;
|
||||
type InboundSubstream = FutureResult<Self, IoError>; // TODO: use !
|
||||
type OutboundSubstream = FutureResult<Self, IoError>; // TODO: use !
|
||||
|
||||
#[inline]
|
||||
fn inbound(self) -> Self::InboundSubstream {
|
||||
ok(self)
|
||||
}
|
||||
#[inline]
|
||||
fn inbound(self) -> Self::InboundSubstream {
|
||||
ok(self)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn outbound(self) -> Self::OutboundSubstream {
|
||||
ok(self)
|
||||
}
|
||||
#[inline]
|
||||
fn outbound(self) -> Self::OutboundSubstream {
|
||||
ok(self)
|
||||
}
|
||||
}
|
||||
|
@ -549,7 +549,10 @@ impl<'a, T, C> UpgradedNode<T, C>
|
||||
pub fn listen_on(
|
||||
self,
|
||||
addr: Multiaddr,
|
||||
) -> Result<(Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, Multiaddr), (Self, Multiaddr)>
|
||||
) -> Result<
|
||||
(Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, Multiaddr),
|
||||
(Self, Multiaddr),
|
||||
>
|
||||
where C::NamesIter: Clone, // TODO: not elegant
|
||||
C: Clone
|
||||
{
|
||||
|
Reference in New Issue
Block a user