connection_reuse: drop dead connections. (#178)

Currently, connection substreams are added to
`connection_reuse::Shared::active_connections`, but never removed. This
is not least because the `StreamMuxer` trait defines its inbound and
outbound substream futures to always yield a substream and contains no
provision to signal that no more substreams can be created, which would
allow client code (e.g. `ConnectionReuse`) to detect this and purge its
caches.

This PR defines the `StreamMuxer` trait to optionally yield
inbound/outbound substreams and changes `libp2p-mplex` to handle
stream EOFs by marking the underlying resource as closed.
`ConnectionReuse` will remove stream muxers from its active connections
cache if a `None` substream is returned.
This commit is contained in:
Toralf Wittner
2018-05-14 14:49:29 +02:00
committed by Pierre Krieger
parent 11f655dd6a
commit 4382adcbde
8 changed files with 204 additions and 96 deletions

View File

@ -172,7 +172,7 @@ pub struct InboundFuture<T, Buf: Array> {
}
impl<T: AsyncRead, Buf: Array<Item = u8>> Future for InboundFuture<T, Buf> {
type Item = Substream<T, Buf>;
type Item = Option<Substream<T, Buf>>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -181,6 +181,10 @@ impl<T: AsyncRead, Buf: Array<Item = u8>> Future for InboundFuture<T, Buf> {
Async::NotReady => return Ok(Async::NotReady),
};
if lock.is_closed() {
return Ok(Async::Ready(None));
}
// Attempt to make progress, but don't block if we can't
match read_stream(&mut lock, None) {
Ok(_) => {}
@ -200,12 +204,12 @@ impl<T: AsyncRead, Buf: Array<Item = u8>> Future for InboundFuture<T, Buf> {
lock.open_stream(id);
Ok(Async::Ready(Substream::new(
Ok(Async::Ready(Some(Substream::new(
id,
self.end,
name,
Arc::clone(&self.state),
)))
))))
}
}
@ -230,7 +234,7 @@ fn nonce_to_id(id: usize, end: Endpoint) -> u32 {
}
impl<T: AsyncWrite, Buf: Array> Future for OutboundFuture<T, Buf> {
type Item = Substream<T, Buf>;
type Item = Option<Substream<T, Buf>>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -239,6 +243,10 @@ impl<T: AsyncWrite, Buf: Array> Future for OutboundFuture<T, Buf> {
Async::NotReady => return Ok(Async::NotReady),
};
if lock.is_closed() {
return Ok(Async::Ready(None));
}
loop {
let (mut id_str, id) = self.current_id.take().unwrap_or_else(|| {
let next = nonce_to_id(
@ -260,12 +268,12 @@ impl<T: AsyncWrite, Buf: Array> Future for OutboundFuture<T, Buf> {
debug_assert!(id_str.position() <= id_str.get_ref().len() as u64);
if id_str.position() == id_str.get_ref().len() as u64 {
if lock.open_stream(id) {
return Ok(Async::Ready(Substream::new(
return Ok(Async::Ready(Some(Substream::new(
id,
self.meta.end,
Bytes::from(&id_str.get_ref()[..]),
Arc::clone(&self.state),
)));
))));
}
} else {
self.current_id = Some((id_str, id));
@ -393,7 +401,12 @@ mod tests {
let mplex = Multiplex::dial(stream);
let mut substream = mplex.clone().outbound().wait().unwrap();
let mut substream = mplex
.clone()
.outbound()
.wait()
.unwrap()
.expect("outbound substream");
assert!(tokio::write_all(&mut substream, message).wait().is_ok());
@ -410,7 +423,7 @@ mod tests {
let mplex = Multiplex::listen(stream);
let mut substream = mplex.inbound().wait().unwrap();
let mut substream = mplex.inbound().wait().unwrap().expect("inbound substream");
assert_eq!(id, substream.id());
assert_eq!(
@ -435,7 +448,14 @@ mod tests {
let mut outbound: Vec<Substream<_>> = vec![];
for _ in 0..5 {
outbound.push(mplex.clone().outbound().wait().unwrap());
outbound.push(
mplex
.clone()
.outbound()
.wait()
.unwrap()
.expect("outbound substream"),
);
}
outbound.sort_by_key(|a| a.id());
@ -455,7 +475,14 @@ mod tests {
let mut inbound: Vec<Substream<_>> = vec![];
for _ in 0..5 {
inbound.push(mplex.clone().inbound().wait().unwrap());
inbound.push(
mplex
.clone()
.inbound()
.wait()
.unwrap()
.expect("inbound substream"),
);
}
inbound.sort_by_key(|a| a.id());
@ -515,7 +542,7 @@ mod tests {
let mplex = Multiplex::listen(io::Cursor::new(input));
let mut substream = mplex.inbound().wait().unwrap();
let mut substream = mplex.inbound().wait().unwrap().expect("inbound substream");
assert_eq!(substream.id(), 0);
assert_eq!(substream.name(), None);
@ -567,7 +594,7 @@ mod tests {
let mplex = Multiplex::listen(io::Cursor::new(input));
let mut substream = mplex.inbound().wait().unwrap();
let mut substream = mplex.inbound().wait().unwrap().expect("inbound substream");
assert_eq!(substream.id(), 0);
assert_eq!(substream.name(), None);
@ -615,7 +642,7 @@ mod tests {
let mplex = Multiplex::listen(io::Cursor::new(data));
let mut substream = mplex.inbound().wait().unwrap();
let mut substream = mplex.inbound().wait().unwrap().expect("inbound substream");
assert_eq!(substream.id(), 1);
@ -650,7 +677,14 @@ mod tests {
let mut outbound: Vec<Substream<_, Buffer>> = vec![];
for _ in 0..5 {
outbound.push(mplex.clone().outbound().wait().unwrap());
outbound.push(
mplex
.clone()
.outbound()
.wait()
.unwrap()
.expect("outbound substream"),
);
}
outbound.sort_by_key(|a| a.id());
@ -670,7 +704,12 @@ mod tests {
let mut inbound: Vec<Substream<_, Buffer>> = vec![];
for _ in 0..5 {
let inb: Substream<_, Buffer> = mplex.clone().inbound().wait().unwrap();
let inb: Substream<_, Buffer> = mplex
.clone()
.inbound()
.wait()
.unwrap()
.expect("inbound substream");
inbound.push(inb);
}

View File

@ -235,6 +235,7 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
let header = if let Some(header) = header {
header
} else {
lock.close();
return Ok(on_block.unwrap_or(0));
};
@ -310,6 +311,7 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
let length = if let Some(length) = length {
length
} else {
lock.close();
return Ok(on_block.unwrap_or(0));
};
@ -377,6 +379,10 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
match consumed {
Ok(consumed) => {
if consumed == 0 {
lock.close()
}
let new_remaining = remaining_bytes - consumed;
lock.read_state = Some(NewStream {
@ -446,6 +452,10 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
match read_result {
Ok(consumed) => {
if consumed == 0 {
lock.close()
}
let new_remaining = remaining_bytes - consumed;
lock.read_state = Some(ParsingMessageBody {
@ -493,6 +503,9 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
let new_len = ignore_buf.len().min(remaining_bytes);
match lock.stream.read(&mut ignore_buf[..new_len]) {
Ok(consumed) => {
if consumed == 0 {
lock.close()
}
remaining_bytes -= consumed;
lock.read_state = Some(Ignore {
substream_id,

View File

@ -75,7 +75,7 @@ pub struct MultiplexShared<T, Buf: Array> {
pub read_state: Option<MultiplexReadState>,
pub write_state: Option<MultiplexWriteState>,
pub stream: T,
// true if the stream is open, false otherwise
eof: bool, // true, if `stream` has been exhausted
pub open_streams: HashMap<u32, SubstreamMetadata<Buf>>,
pub meta_write_tasks: Vec<Task>,
// TODO: Should we use a version of this with a fixed size that doesn't allocate and return
@ -93,6 +93,7 @@ impl<T, Buf: Array> MultiplexShared<T, Buf> {
meta_write_tasks: Default::default(),
to_open: Default::default(),
stream: stream,
eof: false,
}
}
@ -108,6 +109,14 @@ impl<T, Buf: Array> MultiplexShared<T, Buf> {
trace!(target: "libp2p-mplex", "close stream {}", id);
self.open_streams.insert(id, SubstreamMetadata::Closed);
}
pub fn close(&mut self) {
self.eof = true
}
pub fn is_closed(&self) -> bool {
self.eof
}
}
pub fn buf_from_slice(slice: &[u8]) -> ByteBuf {

View File

@ -56,7 +56,7 @@ fn client_to_server_outbound() {
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().map(|v| v.0))
.and_then(|client| client.outbound())
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
.map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap()))
.and_then(|client| {
client
.into_future()
@ -79,7 +79,7 @@ fn client_to_server_outbound() {
.dial(rx.recv().unwrap())
.unwrap()
.and_then(|client| client.0.inbound())
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
.map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap()))
.and_then(|server| server.send("hello world".into()))
.map(|_| ());
@ -108,7 +108,7 @@ fn client_to_server_inbound() {
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().map(|v| v.0))
.and_then(|client| client.inbound())
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
.map(|client| Framed::<_, bytes::BytesMut>::new(client.unwrap()))
.and_then(|client| {
client
.into_future()
@ -131,7 +131,7 @@ fn client_to_server_inbound() {
.dial(rx.recv().unwrap())
.unwrap()
.and_then(|(client, _)| client.outbound())
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
.map(|server| Framed::<_, bytes::BytesMut>::new(server.unwrap()))
.and_then(|server| server.send("hello world".into()))
.map(|_| ());

View File

@ -40,7 +40,7 @@
//! `MuxedTransport` trait.
use fnv::FnvHashMap;
use futures::future::{self, FutureResult, IntoFuture};
use futures::future::{self, Either, FutureResult, IntoFuture};
use futures::{Async, Future, Poll, Stream};
use futures::stream::FuturesUnordered;
use futures::stream::Fuse as StreamFuse;
@ -48,7 +48,7 @@ use futures::sync::mpsc;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use parking_lot::Mutex;
use std::io::Error as IoError;
use std::io::{self, Error as IoError};
use std::sync::Arc;
use transport::{MuxedTransport, Transport, UpgradedNode};
use upgrade::ConnectionUpgrade;
@ -152,52 +152,59 @@ where
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
// If we already have an active connection, use it!
if let Some(connec) = self.shared
let substream = if let Some(muxer) = self.shared
.lock()
.active_connections
.get(&addr)
.map(|c| c.clone())
.map(|muxer| muxer.clone())
{
debug!(target: "libp2p-swarm", "Using existing multiplexed connection to {}", addr);
let future = connec.outbound().map(|s| (s, addr));
return Ok(Box::new(future) as Box<_>);
}
debug!(target: "libp2p-swarm", "No existing connection to {} ; dialing", addr);
// TODO: handle if we're already in the middle in dialing that same node?
// TODO: try dialing again if the existing connection has dropped
let dial = match self.inner.dial(addr) {
Ok(l) => l,
Err((inner, addr)) => {
warn!(target: "libp2p-swarm", "Failed to dial {} because the underlying \
transport doesn't support this address", addr);
return Err((
ConnectionReuse {
inner: inner,
shared: self.shared,
},
addr,
));
}
let a = addr.clone();
Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, a))))
} else {
Either::B(future::ok(None))
};
let shared = self.shared.clone();
let dial = dial.into_future().and_then(move |(connec, addr)| {
// Always replace the active connection because we are the most recent.
let inner = self.inner;
let future = substream.and_then(move |outbound| {
if let Some(o) = outbound {
debug!(target: "libp2p-swarm", "Using existing multiplexed connection to {}", addr);
return Either::A(future::ok(o));
}
// The previous stream muxer did not yield a new substream => start new dial
debug!(target: "libp2p-swarm", "No existing connection to {}; dialing", addr);
match inner.dial(addr.clone()) {
Ok(dial) => {
let future = dial.into_future().and_then(move |(muxer, addr)| {
muxer.clone().outbound().and_then(move |substream| {
if let Some(s) = substream {
// Replace the active connection because we are the most recent.
let mut lock = shared.lock();
lock.active_connections.insert(addr.clone(), connec.clone());
lock.active_connections.insert(addr.clone(), muxer.clone());
// TODO: doesn't need locking ; the sender could be extracted
let _ = lock.add_to_next_tx.unbounded_send((
connec.clone(),
connec.clone().inbound(),
muxer.clone(),
muxer.inbound(),
addr.clone(),
));
connec.outbound().map(|s| (s, addr))
Ok((s, addr))
} else {
error!(target: "libp2p-swarm", "failed to dial to {}", addr);
shared.lock().active_connections.remove(&addr);
Err(io::Error::new(io::ErrorKind::Other, "dial failed"))
}
})
});
Either::B(Either::A(future))
}
Err(_) => {
let e = io::Error::new(io::ErrorKind::Other, "transport rejected dial");
Either::B(Either::B(future::err(e)))
}
}
});
Ok(Box::new(dial) as Box<_>)
Ok(Box::new(future) as Box<_>)
}
#[inline]
@ -281,12 +288,6 @@ where
let next_incoming = muxer.clone().inbound();
self.connections
.push((muxer.clone(), next_incoming, client_addr.clone()));
// We overwrite any current active connection to that multiaddr because we
// are the freshest possible connection.
self.shared
.lock()
.active_connections
.insert(client_addr, muxer);
}
Err(err) => {
// Insert the rest of the pending upgrades, but not the current one.
@ -301,7 +302,18 @@ where
for n in (0..self.connections.len()).rev() {
let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n);
match next_incoming.poll() {
Ok(Async::Ready(incoming)) => {
Ok(Async::Ready(None)) => {
// stream muxer gave us a `None` => connection should be considered closed
debug!(target: "libp2p-swarm", "no more inbound substreams on {}", client_addr);
self.shared.lock().active_connections.remove(&client_addr);
}
Ok(Async::Ready(Some(incoming))) => {
// We overwrite any current active connection to that multiaddr because we
// are the freshest possible connection.
self.shared
.lock()
.active_connections
.insert(client_addr.clone(), muxer.clone());
// A new substream is ready.
let mut new_next = muxer.clone().inbound();
self.connections
@ -366,7 +378,11 @@ where
for n in (0..lock.next_incoming.len()).rev() {
let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n);
match future.poll() {
Ok(Async::Ready(value)) => {
Ok(Async::Ready(None)) => {
debug!(target: "libp2p-swarm", "no inbound substream for {}", addr);
lock.active_connections.remove(&addr);
}
Ok(Async::Ready(Some(value))) => {
// A substream is ready ; push back the muxer for the next time this function
// is called, then return.
debug!(target: "libp2p-swarm", "New incoming substream");

View File

@ -102,56 +102,79 @@ where
B: StreamMuxer,
{
type Substream = EitherSocket<A::Substream, B::Substream>;
type InboundSubstream = EitherTransportFuture<A::InboundSubstream, B::InboundSubstream>;
type OutboundSubstream = EitherTransportFuture<A::OutboundSubstream, B::OutboundSubstream>;
type InboundSubstream = EitherInbound<A, B>;
type OutboundSubstream = EitherOutbound<A, B>;
#[inline]
fn inbound(self) -> Self::InboundSubstream {
match self {
EitherSocket::First(a) => EitherTransportFuture::First(a.inbound()),
EitherSocket::Second(b) => EitherTransportFuture::Second(b.inbound()),
EitherSocket::First(a) => EitherInbound::A(a.inbound()),
EitherSocket::Second(b) => EitherInbound::B(b.inbound()),
}
}
#[inline]
fn outbound(self) -> Self::OutboundSubstream {
match self {
EitherSocket::First(a) => EitherTransportFuture::First(a.outbound()),
EitherSocket::Second(b) => EitherTransportFuture::Second(b.outbound()),
EitherSocket::First(a) => EitherOutbound::A(a.outbound()),
EitherSocket::Second(b) => EitherOutbound::B(b.outbound()),
}
}
}
/// Implements `Future` and redirects calls to either `First` or `Second`.
///
/// Additionally, the output will be wrapped inside a `EitherSocket`.
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
// modifiers to it. This custom enum is a combination of Either and these modifiers.
#[derive(Debug, Copy, Clone)]
pub enum EitherTransportFuture<A, B> {
First(A),
Second(B),
pub enum EitherInbound<A: StreamMuxer, B: StreamMuxer> {
A(A::InboundSubstream),
B(B::InboundSubstream),
}
impl<A, B> Future for EitherTransportFuture<A, B>
impl<A, B> Future for EitherInbound<A, B>
where
A: Future<Error = IoError>,
B: Future<Error = IoError>,
A: StreamMuxer,
B: StreamMuxer,
{
type Item = EitherSocket<A::Item, B::Item>;
type Item = Option<EitherSocket<A::Substream, B::Substream>>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherTransportFuture::First(ref mut a) => {
match *self {
EitherInbound::A(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(EitherSocket::First(item)))
Ok(Async::Ready(item.map(EitherSocket::First)))
}
&mut EitherTransportFuture::Second(ref mut b) => {
EitherInbound::B(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(EitherSocket::Second(item)))
Ok(Async::Ready(item.map(EitherSocket::Second)))
}
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}
impl<A, B> Future for EitherOutbound<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Item = Option<EitherSocket<A::Substream, B::Substream>>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
EitherOutbound::A(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(item.map(EitherSocket::First)))
}
EitherOutbound::B(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(item.map(EitherSocket::Second)))
}
}
}

View File

@ -29,10 +29,18 @@ use tokio_io::{AsyncRead, AsyncWrite};
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite;
/// Future that will be resolved when a new incoming substream is open.
type InboundSubstream: Future<Item = Self::Substream, Error = IoError>;
///
/// A `None` item signals that the underlying resource has been exhausted and
/// no more substreams can be created.
type InboundSubstream: Future<Item = Option<Self::Substream>, Error = IoError>;
/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream: Future<Item = Self::Substream, Error = IoError>;
///
/// A `None` item signals that the underlying resource has been exhausted and
/// no more substreams can be created.
type OutboundSubstream: Future<Item = Option<Self::Substream>, Error = IoError>;
/// Produces a future that will be resolved when a new incoming substream arrives.
fn inbound(self) -> Self::InboundSubstream;

View File

@ -115,7 +115,7 @@ fn client_to_server_outbound() {
.dial(rx.recv().unwrap())
.unwrap()
.and_then(|client| client.0.outbound())
.map(|server| Framed::<_, BytesMut>::new(server))
.map(|server| Framed::<_, BytesMut>::new(server.unwrap()))
.and_then(|server| server.send("hello world".into()))
.map(|_| ());
@ -229,7 +229,7 @@ fn use_opened_listen_to_dial() {
let c2 = c.clone();
c.clone().inbound().map(move |i| (c2, i))
})
.map(|(muxer, client)| (muxer, Framed::<_, BytesMut>::new(client)))
.map(|(muxer, client)| (muxer, Framed::<_, BytesMut>::new(client.unwrap())))
.and_then(|(muxer, client)| {
client
.into_future()
@ -241,7 +241,7 @@ fn use_opened_listen_to_dial() {
assert_eq!(msg, "hello world");
muxer.outbound()
})
.map(|client| Framed::<_, BytesMut>::new(client))
.map(|client| Framed::<_, BytesMut>::new(client.unwrap()))
.and_then(|client| client.into_future().map_err(|(err, _)| err))
.and_then(|(msg, _)| {
let msg = msg.unwrap();