// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. //! Contains the `ConnectionReuse` struct. Stores open muxed connections to nodes so that dialing //! a node reuses the same connection instead of opening a new one. //! //! A `ConnectionReuse` can only be created from an `UpgradedNode` whose `ConnectionUpgrade` //! yields as `StreamMuxer`. //! //! # Behaviour //! //! The API exposed by the `ConnectionReuse` struct consists in the `Transport` trait //! implementation, with the `dial` and `listen_on` methods. //! //! When called on a `ConnectionReuse`, the `listen_on` method will listen on the given //! multiaddress (by using the underlying `Transport`), then will apply a `flat_map` on the //! incoming connections so that we actually listen to the incoming substreams of each connection. //! //! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has //! already been opened earlier, and open an outgoing substream on it. If none is available, it //! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with //! us. In order to handle these new substreams you should use the `next_incoming` method of the //! `MuxedTransport` trait. use fnv::FnvHashMap; use futures::future::{self, FutureResult}; use futures::{Async, Future, Poll, Stream, stream, task}; use futures::stream::FuturesUnordered; use multiaddr::Multiaddr; use muxing::{self, StreamMuxer}; use parking_lot::Mutex; use std::collections::hash_map::Entry; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport}; /// Allows reusing the same muxed connection multiple times. /// /// Can be created from an `UpgradedNode` through the `From` trait. /// /// Implements the `Transport` trait. pub struct ConnectionReuse where T: Transport, T: Transport, M: StreamMuxer, { /// Struct shared between most of the `ConnectionReuse` infrastructure. shared: Arc>>, } impl Clone for ConnectionReuse where T: Transport, T: Transport, M: StreamMuxer, { #[inline] fn clone(&self) -> Self { ConnectionReuse { shared: self.shared.clone(), } } } /// Struct shared between most of the `ConnectionReuse` infrastructure. struct Shared where T: Transport, T: Transport, M: StreamMuxer, { /// Underlying transport and connection upgrade, used when we need to dial or listen. transport: T, /// All the connections that were opened, whether successful and/or active or not. // TODO: this will grow forever connections: FnvHashMap>, /// Tasks to notify when one or more new elements were added to `connections`. notify_on_new_connec: FnvHashMap, /// Next `connection_id` to use when opening a connection. next_connection_id: u64, /// Next `listener_id` for the next listener we create. next_listener_id: u64, } enum PeerState where M: StreamMuxer { /// Connection is active and can be used to open substreams. Active { /// The muxer to open new substreams. muxer: Arc, /// Custom data passed to the output. custom_data: D, /// Future of the address of the client. client_addr: Multiaddr, /// Unique identifier for this connection in the `ConnectionReuse`. connection_id: u64, /// Number of open substreams. num_substreams: u64, /// Id of the listener that created this connection, or `None` if it was opened by a /// dialer. listener_id: Option, }, /// Connection is pending. // TODO: stronger Future type Pending { /// Future that produces the muxer. future: Box>, /// All the tasks to notify when `future` resolves. notify: FnvHashMap, }, /// An earlier connection attempt errored. Errored(IoError), /// The `PeerState` is poisonned. Happens if a panic happened while executing some of the /// functions. Poisonned, } impl ConnectionReuse where T: Transport, T: Transport, M: StreamMuxer, { #[inline] pub(crate) fn new(node: T) -> ConnectionReuse { ConnectionReuse { shared: Arc::new(Mutex::new(Shared { transport: node, connections: Default::default(), notify_on_new_connec: Default::default(), next_connection_id: 0, next_listener_id: 0, })), } } } impl Transport for ConnectionReuse where T: Transport + 'static, // TODO: 'static :( T: Transport + Clone + 'static, // TODO: 'static :( M: StreamMuxer + 'static, D: Clone + 'static, T: Clone, { type Output = (D, ConnectionReuseSubstream); type MultiaddrFuture = future::FutureResult; type Listener = Box>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { let mut shared = self.shared.lock(); let (listener, new_addr) = match shared.transport.clone().listen_on(addr.clone()) { Ok((l, a)) => (l, a), Err((_, addr)) => { return Err(( ConnectionReuse { shared: self.shared.clone(), }, addr, )); } }; let listener = listener .map(|upgr| { upgr.and_then(|(out, addr)| { trace!("Waiting for remote's address as listener"); addr.map(move |addr| (out, addr)) }) }) .fuse(); let listener_id = shared.next_listener_id; shared.next_listener_id += 1; let listener = ConnectionReuseListener { shared: self.shared.clone(), listener, listener_id, current_upgrades: FuturesUnordered::new(), }; Ok((Box::new(listener) as Box<_>, new_addr)) } #[inline] fn dial(self, addr: Multiaddr) -> Result { let mut shared = self.shared.lock(); // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise // the returned `Future` will immediately produce the error. let must_clear = match shared.connections.get(&addr) { Some(&PeerState::Errored(ref err)) => { trace!("Clearing existing connection to {} which errored earlier: {:?}", addr, err); true }, _ => false, }; if must_clear { shared.connections.remove(&addr); } Ok(ConnectionReuseDial { outbound: None, shared: self.shared.clone(), addr, }) } #[inline] fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.shared.lock().transport.nat_traversal(server, observed) } } impl MuxedTransport for ConnectionReuse where T: Transport + 'static, // TODO: 'static :( T: Transport + Clone + 'static, // TODO: 'static :( M: StreamMuxer + 'static, D: Clone + 'static, T: Clone, { type Incoming = ConnectionReuseIncoming; type IncomingUpgrade = future::FutureResult<((D, ConnectionReuseSubstream), Self::MultiaddrFuture), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { ConnectionReuseIncoming { shared: self.shared.clone(), } } } static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); // `TASK_ID` is used internally to uniquely identify each task. task_local!{ static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) } /// Implementation of `Future` for dialing a node. pub struct ConnectionReuseDial where T: Transport, T: Transport, M: StreamMuxer, { /// The future that will construct the substream, the connection id the muxer comes from, and /// the `Future` of the client's multiaddr. /// If `None`, we need to grab a new outbound substream from the muxer. outbound: Option>, // Shared between the whole connection reuse mechanism. shared: Arc>>, // The address we're trying to dial. addr: Multiaddr, } struct ConnectionReuseDialOut where M: StreamMuxer, { /// Custom data for the connection. custom_data: D, /// The pending outbound substream. stream: muxing::OutboundSubstreamRefWrapFuture>, /// Id of the connection that was used to create the substream. connection_id: u64, /// Address of the remote. client_addr: Multiaddr, } impl Future for ConnectionReuseDial where T: Transport + Clone, M: StreamMuxer + 'static, D: Clone + 'static, ::Dial: 'static, ::MultiaddrFuture: 'static, { type Item = ((D, ConnectionReuseSubstream), FutureResult); type Error = IoError; fn poll(&mut self) -> Poll { loop { let should_kill_existing_muxer; if let Some(mut outbound) = self.outbound.take() { match outbound.stream.poll() { Ok(Async::Ready(Some(inner))) => { trace!("Opened new outgoing substream to {}", self.addr); let substream = ConnectionReuseSubstream { connection_id: outbound.connection_id, shared: self.shared.clone(), inner, addr: outbound.client_addr.clone(), }; return Ok(Async::Ready(((outbound.custom_data, substream), future::ok(outbound.client_addr)))); }, Ok(Async::NotReady) => { self.outbound = Some(outbound); return Ok(Async::NotReady); }, Ok(Async::Ready(None)) => { // The muxer can no longer produce outgoing substreams. // Let's reopen a connection. trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); should_kill_existing_muxer = true; }, Err(err) => { // If we get an error while opening a substream, we decide to ignore it // and open a new muxer. // If opening the muxer produces an error, *then* we will return it. debug!("Error while opening outgoing substream to {}: {:?}", self.addr, err); should_kill_existing_muxer = true; }, } } else { should_kill_existing_muxer = false; } // If we reach this point, that means we have to fill `self.outbound`. // If `should_kill_existing_muxer`, do not use any existing connection but create a // new one instead. let mut shared = self.shared.lock(); let shared = &mut *shared; // Avoids borrow errors // TODO: could be optimized if should_kill_existing_muxer { shared.connections.remove(&self.addr); } let connec = match shared.connections.entry(self.addr.clone()) { Entry::Occupied(e) => e.into_mut(), Entry::Vacant(e) => { // Build the connection. let state = match shared.transport.clone().dial(self.addr.clone()) { Ok(future) => { trace!("Opened new connection to {:?}", self.addr); let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); let future = Box::new(future); PeerState::Pending { future, notify: Default::default() } }, Err(_) => { trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); let err = IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); PeerState::Errored(err) }, }; for task in shared.notify_on_new_connec.drain() { task.1.notify(); } e.insert(state) }, }; match mem::replace(&mut *connec, PeerState::Poisonned) { PeerState::Active { muxer, custom_data, connection_id, listener_id, mut num_substreams, client_addr } => { let outbound = muxing::outbound_from_ref_and_wrap(muxer.clone()); num_substreams += 1; *connec = PeerState::Active { muxer, custom_data: custom_data.clone(), connection_id, listener_id, num_substreams, client_addr: client_addr.clone() }; trace!("Using existing connection to {} to open outbound substream", self.addr); self.outbound = Some(ConnectionReuseDialOut { custom_data, stream: outbound, connection_id, client_addr, }); }, PeerState::Pending { mut future, mut notify } => { match future.poll() { Ok(Async::Ready(((custom_data, muxer), client_addr))) => { trace!("Successful new connection to {} ({})", self.addr, client_addr); for task in notify { task.1.notify(); } let muxer = Arc::new(muxer); let first_outbound = muxing::outbound_from_ref_and_wrap(muxer.clone()); let connection_id = shared.next_connection_id; shared.next_connection_id += 1; *connec = PeerState::Active { muxer, custom_data: custom_data.clone(), connection_id, num_substreams: 1, listener_id: None, client_addr: client_addr.clone() }; self.outbound = Some(ConnectionReuseDialOut { custom_data, stream: first_outbound, connection_id, client_addr, }); }, Ok(Async::NotReady) => { notify.insert(TASK_ID.with(|&t| t), task::current()); *connec = PeerState::Pending { future, notify }; return Ok(Async::NotReady); }, Err(err) => { trace!("Failed new connection to {}: {:?}", self.addr, err); let io_err = IoError::new(err.kind(), err.to_string()); *connec = PeerState::Errored(err); return Err(io_err); }, } }, PeerState::Errored(err) => { trace!("Existing new connection to {} errored earlier: {:?}", self.addr, err); let io_err = IoError::new(err.kind(), err.to_string()); *connec = PeerState::Errored(err); return Err(io_err); }, PeerState::Poisonned => { panic!("Poisonned peer state"); }, } } } } impl Drop for ConnectionReuseDial where T: Transport, T: Transport, M: StreamMuxer, { fn drop(&mut self) { if let Some(outbound) = self.outbound.take() { let mut shared = self.shared.lock(); remove_one_substream(&mut *shared, outbound.connection_id, &outbound.client_addr); } } } /// Implementation of `Stream` for the connections incoming from listening on a specific address. pub struct ConnectionReuseListener where T: Transport, T: Transport, M: StreamMuxer, { /// The main listener. listener: stream::Fuse, /// Identifier for this listener. Used to determine which connections were opened by it. listener_id: u64, /// Opened connections that need to be upgraded. current_upgrades: FuturesUnordered>>, /// Shared between the whole connection reuse mechanism. shared: Arc>>, } impl Stream for ConnectionReuseListener where T: Transport, T: Transport, M: StreamMuxer, D: Clone, L: Stream, Lu: Future + 'static, { type Item = FutureResult<((D, ConnectionReuseSubstream), FutureResult), IoError>; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { // Check for any incoming connection on the listening socket. // Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even // after it is finished or after it error'ed. loop { match self.listener.poll() { Ok(Async::Ready(Some(upgrade))) => { trace!("New incoming connection"); self.current_upgrades.push(Box::new(upgrade)); } Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { debug!("Listener has been closed"); break; } Err(err) => { debug!("Error while polling listener: {:?}", err); return Err(err); } }; } // Process the connections being upgraded. loop { match self.current_upgrades.poll() { Ok(Async::Ready(Some(((custom_data, muxer), client_addr)))) => { // Successfully upgraded a new incoming connection. trace!("New multiplexed connection from {}", client_addr); let mut shared = self.shared.lock(); let muxer = Arc::new(muxer); let connection_id = shared.next_connection_id; shared.next_connection_id += 1; let state = PeerState::Active { muxer, custom_data, connection_id, listener_id: Some(self.listener_id), num_substreams: 1, client_addr: client_addr.clone() }; shared.connections.insert(client_addr, state); for to_notify in shared.notify_on_new_connec.drain() { to_notify.1.notify(); } } Ok(Async::Ready(None)) | Ok(Async::NotReady) => { break; }, Err(err) => { // Insert the rest of the pending upgrades, but not the current one. debug!("Error while upgrading listener connection: {:?}", err); return Ok(Async::Ready(Some(future::err(err)))); } } } // Poll all the incoming connections on all the connections we opened. let mut shared = self.shared.lock(); match poll_incoming(&self.shared, &mut shared, Some(self.listener_id)) { Ok(Async::Ready(None)) => { if self.listener.is_done() && self.current_upgrades.is_empty() { Ok(Async::Ready(None)) } else { Ok(Async::NotReady) } }, Ok(Async::Ready(Some(substream))) => { Ok(Async::Ready(Some(substream))) }, Ok(Async::NotReady) => { Ok(Async::NotReady) } Err(err) => { Ok(Async::Ready(Some(future::err(err)))) } } } } /// Implementation of `Future` that yields the next incoming substream from a dialed connection. pub struct ConnectionReuseIncoming where T: Transport, T: Transport, M: StreamMuxer, { // Shared between the whole connection reuse system. shared: Arc>>, } impl Future for ConnectionReuseIncoming where T: Transport, T: Transport, M: StreamMuxer, D: Clone, { type Item = future::FutureResult<((D, ConnectionReuseSubstream), future::FutureResult), IoError>; type Error = IoError; #[inline] fn poll(&mut self) -> Poll { let mut shared = self.shared.lock(); match poll_incoming(&self.shared, &mut shared, None) { Ok(Async::Ready(Some(substream))) => { Ok(Async::Ready(substream)) }, Ok(Async::Ready(None)) | Ok(Async::NotReady) => { // TODO: will add an element to the list every time shared.notify_on_new_connec.insert(TASK_ID.with(|&v| v), task::current()); Ok(Async::NotReady) }, Err(err) => Err(err) } } } /// Polls the incoming substreams on all the incoming connections that match the `listener`. /// /// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if /// one or more connections are matching the `listener` but they are not ready. fn poll_incoming(shared_arc: &Arc>>, shared: &mut Shared, listener: Option) -> Poll), FutureResult), IoError>>, IoError> where T: Transport, T: Transport, M: StreamMuxer, D: Clone, { // Keys of the elements in `shared.connections` to remove afterwards. let mut to_remove = Vec::new(); // Substream to return, if any found. let mut ret_value = None; let mut found_one = false; for (addr, state) in shared.connections.iter_mut() { match *state { PeerState::Active { ref custom_data, ref muxer, ref mut num_substreams, connection_id, ref client_addr, listener_id } => { if listener_id != listener { continue; } found_one = true; match muxer.poll_inbound() { Ok(Async::Ready(Some(inner))) => { trace!("New incoming substream from {}", client_addr); *num_substreams += 1; let substream = ConnectionReuseSubstream { inner: muxing::substream_from_ref(muxer.clone(), inner), shared: shared_arc.clone(), connection_id, addr: client_addr.clone(), }; ret_value = Some(Ok(((custom_data.clone(), substream), future::ok(client_addr.clone())))); break; }, Ok(Async::Ready(None)) => { // The muxer isn't capable of opening any inbound stream anymore, so // we close the connection entirely. trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); to_remove.push(addr.clone()); }, Ok(Async::NotReady) => (), Err(err) => { // If an error happens while opening an inbound stream, we close the // connection entirely. trace!("Error while opening inbound substream to {}: {:?}", addr, err); to_remove.push(addr.clone()); ret_value = Some(Err(err)); break; }, } }, PeerState::Pending { ref mut notify, .. } => { notify.insert(TASK_ID.with(|&t| t), task::current()); }, PeerState::Errored(_) => {}, PeerState::Poisonned => { panic!("Poisonned peer state"); }, } } for to_remove in to_remove { shared.connections.remove(&to_remove); } match ret_value { Some(Ok(val)) => Ok(Async::Ready(Some(future::ok(val)))), Some(Err(err)) => Err(err), None => { if found_one { Ok(Async::NotReady) } else { Ok(Async::Ready(None)) } }, } } /// Removes one substream from an active connection. Closes the connection if necessary. fn remove_one_substream(shared: &mut Shared, connec_id: u64, addr: &Multiaddr) where T: Transport, T: Transport, M: StreamMuxer, { shared.connections.retain(|_, connec| { if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { if *connection_id == connec_id { *num_substreams -= 1; if *num_substreams == 0 { trace!("All substreams to {} closed ; closing main connection", addr); return false; } } } true }); } /// Wraps around the `Substream`. pub struct ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { inner: muxing::SubstreamRef>, shared: Arc>>, /// Id this connection was created from. connection_id: u64, /// Address of the remote. addr: Multiaddr, } impl Deref for ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { type Target = muxing::SubstreamRef>; #[inline] fn deref(&self) -> &Self::Target { &self.inner } } impl DerefMut for ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } impl Read for ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { self.inner.read(buf) } } impl AsyncRead for ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { } impl Write for ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { #[inline] fn write(&mut self, buf: &[u8]) -> Result { self.inner.write(buf) } #[inline] fn flush(&mut self) -> Result<(), IoError> { self.inner.flush() } } impl AsyncWrite for ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { self.inner.shutdown() } } impl Drop for ConnectionReuseSubstream where T: Transport, T: Transport, M: StreamMuxer, { fn drop(&mut self) { let mut shared = self.shared.lock(); remove_one_substream(&mut *shared, self.connection_id, &self.addr); } }