diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index ab07bb97..e5db222a 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -40,116 +40,151 @@ //! `MuxedTransport` trait. use fnv::FnvHashMap; -use futures::future::{self, Either, FutureResult}; -use futures::{Async, Future, Poll, Stream}; +use futures::future::{self, FutureResult}; +use futures::{Async, Future, Poll, Stream, stream, task}; use futures::stream::FuturesUnordered; -use futures::sync::mpsc; use multiaddr::Multiaddr; use muxing::{self, StreamMuxer}; use parking_lot::Mutex; -use std::io::{self, Error as IoError}; -use std::sync::Arc; +use std::collections::hash_map::Entry; +use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; -use transport::{MuxedTransport, Transport, UpgradedNode}; -use upgrade::ConnectionUpgrade; +use transport::{MuxedTransport, Transport}; /// Allows reusing the same muxed connection multiple times. /// /// Can be created from an `UpgradedNode` through the `From` trait. /// /// Implements the `Transport` trait. -pub struct ConnectionReuse +pub struct ConnectionReuse where T: Transport, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade, - C::Output: StreamMuxer, + T: Transport, + M: StreamMuxer, { - // Underlying transport and connection upgrade for when we need to dial or listen. - inner: UpgradedNode, - - // Struct shared between most of the `ConnectionReuse` infrastructure. - shared: Arc>>, + /// Struct shared between most of the `ConnectionReuse` infrastructure. + shared: Arc>>, } -impl Clone for ConnectionReuse +impl Clone for ConnectionReuse where - T: Transport + Clone, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + Clone, - C::Output: StreamMuxer + T: Transport, + T: Transport, + M: StreamMuxer, { #[inline] fn clone(&self) -> Self { ConnectionReuse { - inner: self.inner.clone(), shared: self.shared.clone(), } } } -struct Shared { - // List of active muxers. - active_connections: FnvHashMap>, - - // List of pending inbound substreams from dialed nodes. - // Only add to this list elements received through `add_to_next_rx`. - next_incoming: Vec<(Arc, Multiaddr)>, - - // New elements are not directly added to `next_incoming`. Instead they are sent to this - // channel. This is done so that we can wake up tasks whenever a new element is added. - add_to_next_rx: mpsc::UnboundedReceiver<(Arc, Multiaddr)>, - - // Other side of `add_to_next_rx`. - add_to_next_tx: mpsc::UnboundedSender<(Arc, Multiaddr)>, -} - -impl From> for ConnectionReuse +/// Struct shared between most of the `ConnectionReuse` infrastructure. +struct Shared where T: Transport, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade, - C::Output: StreamMuxer, + T: Transport, + M: StreamMuxer, +{ + /// Underlying transport and connection upgrade, used when we need to dial or listen. + transport: T, + + /// All the connections that were opened, whether successful and/or active or not. + // TODO: this will grow forever + connections: FnvHashMap>, + + /// Tasks to notify when one or more new elements were added to `connections`. + notify_on_new_connec: FnvHashMap, + + /// Next `connection_id` to use when opening a connection. + next_connection_id: u64, + + /// Next `listener_id` for the next listener we create. + next_listener_id: u64, +} + +enum PeerState where M: StreamMuxer { + /// Connection is active and can be used to open substreams. + Active { + /// The muxer to open new substreams. + muxer: Arc, + /// Custom data passed to the output. + custom_data: D, + /// Future of the address of the client. + client_addr: Multiaddr, + /// Unique identifier for this connection in the `ConnectionReuse`. + connection_id: u64, + /// Number of open substreams. + num_substreams: u64, + /// Id of the listener that created this connection, or `None` if it was opened by a + /// dialer. + listener_id: Option, + }, + + /// Connection is pending. + // TODO: stronger Future type + Pending { + /// Future that produces the muxer. + future: Box>, + /// All the tasks to notify when `future` resolves. + notify: FnvHashMap, + }, + + /// An earlier connection attempt errored. + Errored(IoError), + + /// The `PeerState` is poisonned. Happens if a panic happened while executing some of the + /// functions. + Poisonned, +} + +impl ConnectionReuse +where + T: Transport, + T: Transport, + M: StreamMuxer, { #[inline] - fn from(node: UpgradedNode) -> ConnectionReuse { - let (tx, rx) = mpsc::unbounded(); - + pub(crate) fn new(node: T) -> ConnectionReuse { ConnectionReuse { - inner: node, shared: Arc::new(Mutex::new(Shared { - active_connections: Default::default(), - next_incoming: Vec::new(), - add_to_next_rx: rx, - add_to_next_tx: tx, + transport: node, + connections: Default::default(), + notify_on_new_connec: Default::default(), + next_connection_id: 0, + next_listener_id: 0, })), } } } -impl Transport for ConnectionReuse +impl Transport for ConnectionReuse where T: Transport + 'static, // TODO: 'static :( - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( - C::Output: StreamMuxer, - C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + T: Transport + Clone + 'static, // TODO: 'static :( + M: StreamMuxer + 'static, + D: Clone + 'static, + T: Clone, { - type Output = muxing::SubstreamRef>; + type Output = (D, ConnectionReuseSubstream); type MultiaddrFuture = future::FutureResult; type Listener = Box>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; - type Dial = Box>; + type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { + let mut shared = self.shared.lock(); + + let (listener, new_addr) = match shared.transport.clone().listen_on(addr.clone()) { Ok((l, a)) => (l, a), - Err((inner, addr)) => { + Err((_, addr)) => { return Err(( ConnectionReuse { - inner: inner, - shared: self.shared, + shared: self.shared.clone(), }, addr, )); @@ -157,103 +192,68 @@ where }; let listener = listener - .fuse() .map(|upgr| { upgr.and_then(|(out, addr)| { + trace!("Waiting for remote's address as listener"); addr.map(move |addr| (out, addr)) }) - }); + }) + .fuse(); + + let listener_id = shared.next_listener_id; + shared.next_listener_id += 1; let listener = ConnectionReuseListener { shared: self.shared.clone(), - listener: listener, + listener, + listener_id, current_upgrades: FuturesUnordered::new(), - connections: Vec::new(), }; Ok((Box::new(listener) as Box<_>, new_addr)) } + #[inline] fn dial(self, addr: Multiaddr) -> Result { - // If we already have an active connection, use it! - let substream = if let Some(muxer) = self.shared - .lock() - .active_connections - .get(&addr) - .map(|muxer| muxer.clone()) - { - let a = addr.clone(); - Either::A(muxing::outbound_from_ref_and_wrap(muxer).map(|o| o.map(move |s| (s, future::ok(a))))) - } else { - Either::B(future::ok(None)) + let mut shared = self.shared.lock(); + + // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise + // the returned `Future` will immediately produce the error. + let must_clear = match shared.connections.get(&addr) { + Some(&PeerState::Errored(ref err)) => { + trace!("Clearing existing connection to {} which errored earlier: {:?}", addr, err); + true + }, + _ => false, }; + if must_clear { + shared.connections.remove(&addr); + } - let shared = self.shared.clone(); - let inner = self.inner; - let future = substream.and_then(move |outbound| { - if let Some(o) = outbound { - debug!("Using existing multiplexed connection to {}", addr); - return Either::A(future::ok(o)); - } - // The previous stream muxer did not yield a new substream => start new dial - debug!("No existing connection to {}; dialing", addr); - match inner.dial(addr.clone()) { - Ok(dial) => { - let future = dial - .and_then(move |(muxer, addr_fut)| { - trace!("Waiting for remote's address"); - addr_fut.map(move |addr| (Arc::new(muxer), addr)) - }) - .and_then(move |(muxer, addr)| { - muxing::outbound_from_ref(muxer.clone()).and_then(move |substream| { - if let Some(s) = substream { - // Replace the active connection because we are the most recent. - let mut lock = shared.lock(); - lock.active_connections.insert(addr.clone(), muxer.clone()); - // TODO: doesn't need locking ; the sender could be extracted - let _ = lock.add_to_next_tx.unbounded_send(( - muxer.clone(), - addr.clone(), - )); - let s = muxing::substream_from_ref(muxer, s); - Ok((s, future::ok(addr))) - } else { - error!("failed to dial to {}", addr); - shared.lock().active_connections.remove(&addr); - Err(io::Error::new(io::ErrorKind::Other, "dial failed")) - } - }) - }); - Either::B(Either::A(future)) - } - Err(_) => { - let e = io::Error::new(io::ErrorKind::Other, "transport rejected dial"); - Either::B(Either::B(future::err(e))) - } - } - }); - - Ok(Box::new(future) as Box<_>) + Ok(ConnectionReuseDial { + outbound: None, + shared: self.shared.clone(), + addr, + }) } #[inline] fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.transport().nat_traversal(server, observed) + self.shared.lock().transport.nat_traversal(server, observed) } } -impl MuxedTransport for ConnectionReuse +impl MuxedTransport for ConnectionReuse where T: Transport + 'static, // TODO: 'static :( - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( - C::Output: StreamMuxer, - C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + T: Transport + Clone + 'static, // TODO: 'static :( + M: StreamMuxer + 'static, + D: Clone + 'static, + T: Clone, { - type Incoming = ConnectionReuseIncoming; + type Incoming = ConnectionReuseIncoming; type IncomingUpgrade = - future::FutureResult<(muxing::SubstreamRef>, Self::MultiaddrFuture), IoError>; + future::FutureResult<((D, ConnectionReuseSubstream), Self::MultiaddrFuture), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -263,27 +263,230 @@ where } } -/// Implementation of `Stream` for the connections incoming from listening on a specific address. -pub struct ConnectionReuseListener -where - M: StreamMuxer, -{ - // The main listener. `S` is from the underlying transport. - listener: S, - current_upgrades: FuturesUnordered, - connections: Vec<(Arc, Multiaddr)>, - - // Shared between the whole connection reuse mechanism. - shared: Arc>>, +static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); +// `TASK_ID` is used internally to uniquely identify each task. +task_local!{ + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) } -impl Stream for ConnectionReuseListener -where - S: Stream, - F: Future, - M: StreamMuxer + 'static, // TODO: 'static :( +/// Implementation of `Future` for dialing a node. +pub struct ConnectionReuseDial +where + T: Transport, + T: Transport, + M: StreamMuxer, { - type Item = FutureResult<(muxing::SubstreamRef>, FutureResult), IoError>; + /// The future that will construct the substream, the connection id the muxer comes from, and + /// the `Future` of the client's multiaddr. + /// If `None`, we need to grab a new outbound substream from the muxer. + outbound: Option>, + + // Shared between the whole connection reuse mechanism. + shared: Arc>>, + + // The address we're trying to dial. + addr: Multiaddr, +} + +struct ConnectionReuseDialOut +where + M: StreamMuxer, +{ + /// Custom data for the connection. + custom_data: D, + /// The pending outbound substream. + stream: muxing::OutboundSubstreamRefWrapFuture>, + /// Id of the connection that was used to create the substream. + connection_id: u64, + /// Address of the remote. + client_addr: Multiaddr, +} + +impl Future for ConnectionReuseDial +where + T: Transport + Clone, + M: StreamMuxer + 'static, + D: Clone + 'static, + ::Dial: 'static, + ::MultiaddrFuture: 'static, +{ + type Item = ((D, ConnectionReuseSubstream), FutureResult); + type Error = IoError; + + fn poll(&mut self) -> Poll { + loop { + let should_kill_existing_muxer; + if let Some(mut outbound) = self.outbound.take() { + match outbound.stream.poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("Opened new outgoing substream to {}", self.addr); + let substream = ConnectionReuseSubstream { + connection_id: outbound.connection_id, + shared: self.shared.clone(), + inner, + addr: outbound.client_addr.clone(), + }; + return Ok(Async::Ready(((outbound.custom_data, substream), future::ok(outbound.client_addr)))); + }, + Ok(Async::NotReady) => { + self.outbound = Some(outbound); + return Ok(Async::NotReady); + }, + Ok(Async::Ready(None)) => { + // The muxer can no longer produce outgoing substreams. + // Let's reopen a connection. + trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); + should_kill_existing_muxer = true; + }, + Err(err) => { + // If we get an error while opening a substream, we decide to ignore it + // and open a new muxer. + // If opening the muxer produces an error, *then* we will return it. + debug!("Error while opening outgoing substream to {}: {:?}", self.addr, err); + should_kill_existing_muxer = true; + }, + } + } else { + should_kill_existing_muxer = false; + } + + // If we reach this point, that means we have to fill `self.outbound`. + // If `should_kill_existing_muxer`, do not use any existing connection but create a + // new one instead. + let mut shared = self.shared.lock(); + let shared = &mut *shared; // Avoids borrow errors + + // TODO: could be optimized + if should_kill_existing_muxer { + shared.connections.remove(&self.addr); + } + let connec = match shared.connections.entry(self.addr.clone()) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => { + // Build the connection. + let state = match shared.transport.clone().dial(self.addr.clone()) { + Ok(future) => { + trace!("Opened new connection to {:?}", self.addr); + let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); + let future = Box::new(future); + PeerState::Pending { future, notify: Default::default() } + }, + Err(_) => { + trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); + let err = IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); + PeerState::Errored(err) + }, + }; + + for task in shared.notify_on_new_connec.drain() { + task.1.notify(); + } + + e.insert(state) + }, + }; + + match mem::replace(&mut *connec, PeerState::Poisonned) { + PeerState::Active { muxer, custom_data, connection_id, listener_id, mut num_substreams, client_addr } => { + let outbound = muxing::outbound_from_ref_and_wrap(muxer.clone()); + num_substreams += 1; + *connec = PeerState::Active { muxer, custom_data: custom_data.clone(), connection_id, listener_id, num_substreams, client_addr: client_addr.clone() }; + trace!("Using existing connection to {} to open outbound substream", self.addr); + self.outbound = Some(ConnectionReuseDialOut { + custom_data, + stream: outbound, + connection_id, + client_addr, + }); + }, + PeerState::Pending { mut future, mut notify } => { + match future.poll() { + Ok(Async::Ready(((custom_data, muxer), client_addr))) => { + trace!("Successful new connection to {} ({})", self.addr, client_addr); + for task in notify { + task.1.notify(); + } + let muxer = Arc::new(muxer); + let first_outbound = muxing::outbound_from_ref_and_wrap(muxer.clone()); + let connection_id = shared.next_connection_id; + shared.next_connection_id += 1; + *connec = PeerState::Active { muxer, custom_data: custom_data.clone(), connection_id, num_substreams: 1, listener_id: None, client_addr: client_addr.clone() }; + self.outbound = Some(ConnectionReuseDialOut { + custom_data, + stream: first_outbound, + connection_id, + client_addr, + }); + }, + Ok(Async::NotReady) => { + notify.insert(TASK_ID.with(|&t| t), task::current()); + *connec = PeerState::Pending { future, notify }; + return Ok(Async::NotReady); + }, + Err(err) => { + trace!("Failed new connection to {}: {:?}", self.addr, err); + let io_err = IoError::new(err.kind(), err.to_string()); + *connec = PeerState::Errored(err); + return Err(io_err); + }, + } + }, + PeerState::Errored(err) => { + trace!("Existing new connection to {} errored earlier: {:?}", self.addr, err); + let io_err = IoError::new(err.kind(), err.to_string()); + *connec = PeerState::Errored(err); + return Err(io_err); + }, + PeerState::Poisonned => { + panic!("Poisonned peer state"); + }, + } + } + } +} + +impl Drop for ConnectionReuseDial +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + fn drop(&mut self) { + if let Some(outbound) = self.outbound.take() { + let mut shared = self.shared.lock(); + remove_one_substream(&mut *shared, outbound.connection_id, &outbound.client_addr); + } + } +} + +/// Implementation of `Stream` for the connections incoming from listening on a specific address. +pub struct ConnectionReuseListener +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + /// The main listener. + listener: stream::Fuse, + /// Identifier for this listener. Used to determine which connections were opened by it. + listener_id: u64, + /// Opened connections that need to be upgraded. + current_upgrades: FuturesUnordered>>, + + /// Shared between the whole connection reuse mechanism. + shared: Arc>>, +} + +impl Stream for ConnectionReuseListener +where + T: Transport, + T: Transport, + M: StreamMuxer, + D: Clone, + L: Stream, + Lu: Future + 'static, +{ + type Item = FutureResult<((D, ConnectionReuseSubstream), FutureResult), IoError>; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { @@ -293,143 +496,312 @@ where loop { match self.listener.poll() { Ok(Async::Ready(Some(upgrade))) => { - self.current_upgrades.push(upgrade); + trace!("New incoming connection"); + self.current_upgrades.push(Box::new(upgrade)); } Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { - debug!("listener has been closed"); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Ok(Async::Ready(None)); - } - break + debug!("Listener has been closed"); + break; } Err(err) => { - debug!("error while polling listener: {:?}", err); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Err(err); - } - break + debug!("Error while polling listener: {:?}", err); + return Err(err); } - } + }; } + // Process the connections being upgraded. loop { match self.current_upgrades.poll() { - Ok(Async::Ready(Some((muxer, client_addr)))) => { - self.connections.push((Arc::new(muxer), client_addr.clone())); + Ok(Async::Ready(Some(((custom_data, muxer), client_addr)))) => { + // Successfully upgraded a new incoming connection. + trace!("New multiplexed connection from {}", client_addr); + let mut shared = self.shared.lock(); + let muxer = Arc::new(muxer); + let connection_id = shared.next_connection_id; + shared.next_connection_id += 1; + let state = PeerState::Active { muxer, custom_data, connection_id, listener_id: Some(self.listener_id), num_substreams: 1, client_addr: client_addr.clone() }; + shared.connections.insert(client_addr, state); + for to_notify in shared.notify_on_new_connec.drain() { + to_notify.1.notify(); + } } + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { + break; + }, Err(err) => { - debug!("error while upgrading listener connection: {:?}", err); - return Ok(Async::Ready(Some(future::err(err)))); - } - _ => break, - } - } - - // Check whether any incoming substream is ready. - for n in (0..self.connections.len()).rev() { - let (muxer, client_addr) = self.connections.swap_remove(n); - match muxer.poll_inbound() { - Ok(Async::Ready(None)) => { - // stream muxer gave us a `None` => connection should be considered closed - debug!("no more inbound substreams on {}", client_addr); - self.shared.lock().active_connections.remove(&client_addr); - } - Ok(Async::Ready(Some(incoming))) => { - // We overwrite any current active connection to that multiaddr because we - // are the freshest possible connection. - self.shared - .lock() - .active_connections - .insert(client_addr.clone(), muxer.clone()); - // A new substream is ready. - self.connections - .push((muxer.clone(), client_addr.clone())); - let incoming = muxing::substream_from_ref(muxer, incoming); - return Ok(Async::Ready(Some( - future::ok((incoming, future::ok(client_addr))), - ))); - } - Ok(Async::NotReady) => { - self.connections.push((muxer, client_addr)); - } - Err(err) => { - debug!("error while upgrading the multiplexed incoming connection: {:?}", err); - // Insert the rest of the pending connections, but not the current one. + // Insert the rest of the pending upgrades, but not the current one. + debug!("Error while upgrading listener connection: {:?}", err); return Ok(Async::Ready(Some(future::err(err)))); } } } - // Nothing is ready, return `NotReady`. - Ok(Async::NotReady) + // Poll all the incoming connections on all the connections we opened. + let mut shared = self.shared.lock(); + match poll_incoming(&self.shared, &mut shared, Some(self.listener_id)) { + Ok(Async::Ready(None)) => { + if self.listener.is_done() && self.current_upgrades.is_empty() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + }, + Ok(Async::Ready(Some(substream))) => { + Ok(Async::Ready(Some(substream))) + }, + Ok(Async::NotReady) => { + Ok(Async::NotReady) + } + Err(err) => { + Ok(Async::Ready(Some(future::err(err)))) + } + } } } /// Implementation of `Future` that yields the next incoming substream from a dialed connection. -pub struct ConnectionReuseIncoming +pub struct ConnectionReuseIncoming where + T: Transport, + T: Transport, M: StreamMuxer, { // Shared between the whole connection reuse system. - shared: Arc>>, + shared: Arc>>, } -impl Future for ConnectionReuseIncoming +impl Future for ConnectionReuseIncoming where + T: Transport, + T: Transport, M: StreamMuxer, + D: Clone, { - type Item = future::FutureResult<(muxing::SubstreamRef>, future::FutureResult), IoError>; + type Item = future::FutureResult<((D, ConnectionReuseSubstream), future::FutureResult), IoError>; type Error = IoError; + #[inline] fn poll(&mut self) -> Poll { - let mut lock = self.shared.lock(); - - // Try to get any new muxer from `add_to_next_rx`. - // We push the new muxers to a channel instead of adding them to `next_incoming`, so that - // tasks are notified when something is pushed. - loop { - match lock.add_to_next_rx.poll() { - Ok(Async::Ready(Some(elem))) => { - lock.next_incoming.push(elem); - } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) | Err(_) => unreachable!( - "the sender and receiver are both in the same struct, therefore \ - the link can never break" - ), - } + let mut shared = self.shared.lock(); + match poll_incoming(&self.shared, &mut shared, None) { + Ok(Async::Ready(Some(substream))) => { + Ok(Async::Ready(substream)) + }, + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { + // TODO: will add an element to the list every time + shared.notify_on_new_connec.insert(TASK_ID.with(|&v| v), task::current()); + Ok(Async::NotReady) + }, + Err(err) => Err(err) } - - // Check whether any incoming substream is ready. - for n in (0..lock.next_incoming.len()).rev() { - let (muxer, addr) = lock.next_incoming.swap_remove(n); - match muxer.poll_inbound() { - Ok(Async::Ready(None)) => { - debug!("no inbound substream for {}", addr); - lock.active_connections.remove(&addr); - } - Ok(Async::Ready(Some(value))) => { - // A substream is ready ; push back the muxer for the next time this function - // is called, then return. - debug!("New incoming substream"); - lock.next_incoming.push((muxer.clone(), addr.clone())); - let substream = muxing::substream_from_ref(muxer, value); - return Ok(Async::Ready(future::ok((substream, future::ok(addr))))); - } - Ok(Async::NotReady) => { - lock.next_incoming.push((muxer, addr)); - } - Err(err) => { - // In case of error, we just not push back the element, which drops it. - debug!("ConnectionReuse incoming: one of the \ - multiplexed substreams produced an error: {:?}", - err); - } - } - } - - // Nothing is ready. - Ok(Async::NotReady) + } +} + +/// Polls the incoming substreams on all the incoming connections that match the `listener`. +/// +/// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if +/// one or more connections are matching the `listener` but they are not ready. +fn poll_incoming(shared_arc: &Arc>>, shared: &mut Shared, listener: Option) + -> Poll), FutureResult), IoError>>, IoError> +where + T: Transport, + T: Transport, + M: StreamMuxer, + D: Clone, +{ + // Keys of the elements in `shared.connections` to remove afterwards. + let mut to_remove = Vec::new(); + // Substream to return, if any found. + let mut ret_value = None; + let mut found_one = false; + + for (addr, state) in shared.connections.iter_mut() { + match *state { + PeerState::Active { ref custom_data, ref muxer, ref mut num_substreams, connection_id, ref client_addr, listener_id } => { + if listener_id != listener { + continue; + } + found_one = true; + + match muxer.poll_inbound() { + Ok(Async::Ready(Some(inner))) => { + trace!("New incoming substream from {}", client_addr); + *num_substreams += 1; + let substream = ConnectionReuseSubstream { + inner: muxing::substream_from_ref(muxer.clone(), inner), + shared: shared_arc.clone(), + connection_id, + addr: client_addr.clone(), + }; + ret_value = Some(Ok(((custom_data.clone(), substream), future::ok(client_addr.clone())))); + break; + }, + Ok(Async::Ready(None)) => { + // The muxer isn't capable of opening any inbound stream anymore, so + // we close the connection entirely. + trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); + to_remove.push(addr.clone()); + }, + Ok(Async::NotReady) => (), + Err(err) => { + // If an error happens while opening an inbound stream, we close the + // connection entirely. + trace!("Error while opening inbound substream to {}: {:?}", addr, err); + to_remove.push(addr.clone()); + ret_value = Some(Err(err)); + break; + }, + } + }, + PeerState::Pending { ref mut notify, .. } => { + notify.insert(TASK_ID.with(|&t| t), task::current()); + }, + PeerState::Errored(_) => {}, + PeerState::Poisonned => { + panic!("Poisonned peer state"); + }, + } + } + + for to_remove in to_remove { + shared.connections.remove(&to_remove); + } + + match ret_value { + Some(Ok(val)) => Ok(Async::Ready(Some(future::ok(val)))), + Some(Err(err)) => Err(err), + None => { + if found_one { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } + }, + } +} + +/// Removes one substream from an active connection. Closes the connection if necessary. +fn remove_one_substream(shared: &mut Shared, connec_id: u64, addr: &Multiaddr) +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + shared.connections.retain(|_, connec| { + if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { + if *connection_id == connec_id { + *num_substreams -= 1; + if *num_substreams == 0 { + trace!("All substreams to {} closed ; closing main connection", addr); + return false; + } + } + } + + true + }); +} + +/// Wraps around the `Substream`. +pub struct ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + inner: muxing::SubstreamRef>, + shared: Arc>>, + /// Id this connection was created from. + connection_id: u64, + /// Address of the remote. + addr: Multiaddr, +} + +impl Deref for ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + type Target = muxing::SubstreamRef>; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Read for ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner.read(buf) + } +} + +impl AsyncRead for ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ +} + +impl Write for ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + self.inner.flush() + } +} + +impl AsyncWrite for ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + self.inner.shutdown() + } +} + +impl Drop for ConnectionReuseSubstream +where + T: Transport, + T: Transport, + M: StreamMuxer, +{ + fn drop(&mut self) { + let mut shared = self.shared.lock(); + remove_one_substream(&mut *shared, self.connection_id, &self.addr); } } diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 6b7ab637..690dbf4f 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -29,8 +29,10 @@ //! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades //! together in a complex chain of protocols negotiation. +use connection_reuse::ConnectionReuse; use futures::prelude::*; use multiaddr::Multiaddr; +use muxing::StreamMuxer; use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; use upgrade::{ConnectionUpgrade, Endpoint}; @@ -116,7 +118,7 @@ pub trait Transport { /// implementation of `Transport` is only responsible for handling the protocols it supports. /// /// Returns `None` if nothing can be determined. This happens if this trait implementation - /// doesn't recognize the protocols, or if `server` and `observed` are unrelated. + /// doesn't recognize the protocols, or if `server` and `observed` are related. fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; /// Applies a function on the output of the `Transport`. @@ -207,6 +209,17 @@ pub trait Transport { DummyMuxing::new(self) } + /// Turns this `Transport` into a `ConnectionReuse`. If the `Output` implements the + /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. + #[inline] + fn into_connection_reuse(self) -> ConnectionReuse + where + Self: Sized + Transport, + M: StreamMuxer, + { + ConnectionReuse::new(self) + } + /// Wraps around the `Transport` and makes it interruptible. #[inline] fn interruptible(self) -> (interruptible::Interruptible, interruptible::Interrupt) diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 8c21897b..f1a4fd17 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -18,10 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use connection_reuse::ConnectionReuse; use futures::prelude::*; use multiaddr::Multiaddr; -use muxing::StreamMuxer; use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport}; @@ -52,16 +50,6 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + 'a, { - /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the - /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. - #[inline] - pub fn into_connection_reuse(self) -> ConnectionReuse - where - C::Output: StreamMuxer, - { - From::from(self) - } - /// Returns a reference to the inner `Transport`. #[inline] pub fn transport(&self) -> &T { diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs index 2cdc0580..989216cc 100644 --- a/core/tests/multiplex.rs +++ b/core/tests/multiplex.rs @@ -78,7 +78,9 @@ fn client_to_server_outbound() { let bg_thread = thread::spawn(move || { let future = rx .with_upgrade(multiplex::MplexConfig::new()) + .map(|val, _| ((), val)) .into_connection_reuse() + .map(|((), val), _| val) .listen_on("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()).0 .into_future() @@ -124,7 +126,9 @@ fn connection_reused_for_dialing() { let bg_thread = thread::spawn(move || { let future = OnlyOnce::from(rx) .with_upgrade(multiplex::MplexConfig::new()) + .map(|val, _| ((), val)) .into_connection_reuse() + .map(|((), val), _| val) .listen_on("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()).0 .into_future() @@ -160,7 +164,9 @@ fn connection_reused_for_dialing() { let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); let future = transport .clone() @@ -229,7 +235,9 @@ fn use_opened_listen_to_dial() { let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); let future = transport .clone() diff --git a/examples/echo-dialer.rs b/examples/echo-dialer.rs index 18e71a58..bc8f4123 100644 --- a/examples/echo-dialer.rs +++ b/examples/echo-dialer.rs @@ -75,7 +75,9 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); // Building a struct that represents the protocol that we are going to use for dialing. let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { diff --git a/examples/echo-server.rs b/examples/echo-server.rs index 4ad06e66..e6b67867 100644 --- a/examples/echo-server.rs +++ b/examples/echo-server.rs @@ -75,7 +75,9 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); // We now have a `transport` variable that can be used either to dial nodes or listen to // incoming connections, and that will automatically apply secio and multiplex on top diff --git a/examples/floodsub.rs b/examples/floodsub.rs index e86e101d..6b45ca42 100644 --- a/examples/floodsub.rs +++ b/examples/floodsub.rs @@ -76,7 +76,9 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); // We now have a `transport` variable that can be used either to dial nodes or listen to // incoming connections, and that will automatically apply secio and multiplex on top diff --git a/examples/kademlia.rs b/examples/kademlia.rs index 9fe2b422..aa5fa950 100644 --- a/examples/kademlia.rs +++ b/examples/kademlia.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +#![type_length_limit = "2097152"] + extern crate bigint; extern crate bytes; extern crate env_logger; @@ -84,7 +86,9 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); let addr_resolver = { let peer_store = peer_store.clone(); diff --git a/examples/ping-client.rs b/examples/ping-client.rs index 8ca38c91..db85970b 100644 --- a/examples/ping-client.rs +++ b/examples/ping-client.rs @@ -68,7 +68,9 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming // connections for us. The second parameter we pass is the connection upgrade that is accepted diff --git a/examples/relay.rs b/examples/relay.rs index effeb129..5d453d53 100644 --- a/examples/relay.rs +++ b/examples/relay.rs @@ -128,7 +128,9 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box> { let transport = { let tcp = TcpConfig::new() .with_upgrade(libp2p_yamux::Config::default()) - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing() }; @@ -161,7 +163,9 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box> { let transport = TcpConfig::new() .with_upgrade(libp2p_yamux::Config::default()) - .into_connection_reuse(); + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); let relay = RelayConfig::new(opts.me, transport.clone(), store);