2018-09-14 13:18:10 +02:00
|
|
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
|
|
|
//
|
|
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
|
|
// to deal in the Software without restriction, including without limitation
|
|
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
|
|
//
|
|
|
|
// The above copyright notice and this permission notice shall be included in
|
|
|
|
// all copies or substantial portions of the Software.
|
|
|
|
//
|
|
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
|
2018-10-01 11:18:00 +02:00
|
|
|
use futures::prelude::*;
|
2019-01-07 11:21:09 +01:00
|
|
|
use crate::muxing;
|
2018-09-14 13:18:10 +02:00
|
|
|
use smallvec::SmallVec;
|
2019-09-16 11:08:44 +02:00
|
|
|
use std::{fmt, io::Error as IoError, pin::Pin, sync::Arc, task::Context, task::Poll};
|
2018-09-14 13:18:10 +02:00
|
|
|
|
2018-11-08 03:01:33 +11:00
|
|
|
// Implementation notes
|
2018-09-14 13:18:10 +02:00
|
|
|
// =================
|
|
|
|
//
|
|
|
|
// In order to minimize the risk of bugs in higher-level code, we want to avoid as much as
|
|
|
|
// possible having a racy API. The behaviour of methods should be well-defined and predictable.
|
|
|
|
//
|
|
|
|
// In order to respect this coding practice, we should theoretically provide events such as "data
|
|
|
|
// incoming on a substream", or "a substream is ready to be written". This would however make the
|
|
|
|
// API of `NodeStream` really painful to use. Instead, we really want to provide an object that
|
|
|
|
// implements the `AsyncRead` and `AsyncWrite` traits.
|
|
|
|
//
|
|
|
|
// This substream object raises the question of how to keep the `NodeStream` and the various
|
2019-03-11 17:19:50 +01:00
|
|
|
// substreams in sync without exposing a racy API. The answer is that the `NodeStream` holds
|
|
|
|
// ownership of the connection. Shutting node the `NodeStream` or destroying it will close all the
|
|
|
|
// existing substreams. The user of the `NodeStream` should be aware of that.
|
2018-09-14 13:18:10 +02:00
|
|
|
|
|
|
|
/// Implementation of `Stream` that handles a node.
|
|
|
|
///
|
|
|
|
/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying
|
|
|
|
/// the `NodeStream` will **not** close the existing substreams.
|
|
|
|
///
|
|
|
|
/// The stream will close once both the inbound and outbound channels are closed, and no more
|
|
|
|
/// outbound substream attempt is pending.
|
2018-10-17 10:17:40 +01:00
|
|
|
pub struct NodeStream<TMuxer, TUserData>
|
2018-09-14 13:18:10 +02:00
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
{
|
|
|
|
/// The muxer used to manage substreams.
|
|
|
|
muxer: Arc<TMuxer>,
|
|
|
|
/// List of substreams we are currently opening.
|
|
|
|
outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
|
|
|
|
}
|
|
|
|
|
2019-03-11 17:19:50 +01:00
|
|
|
/// Future that signals the remote that we have closed the connection.
|
|
|
|
pub struct Close<TMuxer> {
|
|
|
|
/// Muxer to close.
|
|
|
|
muxer: Arc<TMuxer>,
|
|
|
|
}
|
|
|
|
|
2018-09-14 13:18:10 +02:00
|
|
|
/// A successfully opened substream.
|
|
|
|
pub type Substream<TMuxer> = muxing::SubstreamRef<Arc<TMuxer>>;
|
|
|
|
|
|
|
|
/// Event that can happen on the `NodeStream`.
|
|
|
|
pub enum NodeEvent<TMuxer, TUserData>
|
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
{
|
|
|
|
/// A new inbound substream arrived.
|
|
|
|
InboundSubstream {
|
2019-03-11 17:19:50 +01:00
|
|
|
/// The newly-opened substream. Will return EOF of an error if the `NodeStream` is
|
|
|
|
/// destroyed or `close_graceful` is called.
|
2018-09-14 13:18:10 +02:00
|
|
|
substream: Substream<TMuxer>,
|
|
|
|
},
|
|
|
|
|
|
|
|
/// An outbound substream has successfully been opened.
|
|
|
|
OutboundSubstream {
|
|
|
|
/// User data that has been passed to the `open_substream` method.
|
|
|
|
user_data: TUserData,
|
2019-03-11 17:19:50 +01:00
|
|
|
/// The newly-opened substream. Will return EOF of an error if the `NodeStream` is
|
|
|
|
/// destroyed or `close_graceful` is called.
|
2018-09-14 13:18:10 +02:00
|
|
|
substream: Substream<TMuxer>,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Identifier for a substream being opened.
|
|
|
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
|
|
|
pub struct OutboundSubstreamId(usize);
|
|
|
|
|
2018-10-17 10:17:40 +01:00
|
|
|
impl<TMuxer, TUserData> NodeStream<TMuxer, TUserData>
|
2018-09-14 13:18:10 +02:00
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
{
|
|
|
|
/// Creates a new node events stream.
|
|
|
|
#[inline]
|
2018-10-17 10:17:40 +01:00
|
|
|
pub fn new(muxer: TMuxer) -> Self {
|
2018-09-14 13:18:10 +02:00
|
|
|
NodeStream {
|
|
|
|
muxer: Arc::new(muxer),
|
|
|
|
outbound_substreams: SmallVec::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Starts the process of opening a new outbound substream.
|
|
|
|
///
|
|
|
|
/// After calling this method, polling the stream should eventually produce either an
|
|
|
|
/// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has
|
|
|
|
/// been passed to this method.
|
2019-03-11 17:19:50 +01:00
|
|
|
pub fn open_substream(&mut self, user_data: TUserData) {
|
2018-09-14 13:18:10 +02:00
|
|
|
let raw = self.muxer.open_outbound();
|
|
|
|
self.outbound_substreams.push((user_data, raw));
|
|
|
|
}
|
|
|
|
|
2019-02-20 16:25:34 +01:00
|
|
|
/// Returns `true` if the remote has shown any sign of activity after the muxer has been open.
|
|
|
|
///
|
|
|
|
/// See `StreamMuxer::is_remote_acknowledged`.
|
|
|
|
pub fn is_remote_acknowledged(&self) -> bool {
|
|
|
|
self.muxer.is_remote_acknowledged()
|
|
|
|
}
|
|
|
|
|
2019-03-11 17:19:50 +01:00
|
|
|
/// Destroys the node stream and returns all the pending outbound substreams, plus an object
|
|
|
|
/// that signals the remote that we shut down the connection.
|
|
|
|
#[must_use]
|
|
|
|
pub fn close(mut self) -> (Close<TMuxer>, Vec<TUserData>) {
|
|
|
|
let substreams = self.cancel_outgoing();
|
|
|
|
let close = Close { muxer: self.muxer.clone() };
|
|
|
|
(close, substreams)
|
2018-10-15 10:42:11 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Destroys all outbound streams and returns the corresponding user data.
|
|
|
|
pub fn cancel_outgoing(&mut self) -> Vec<TUserData> {
|
2018-09-14 13:18:10 +02:00
|
|
|
let mut out = Vec::with_capacity(self.outbound_substreams.len());
|
2019-11-19 11:18:16 +01:00
|
|
|
for (user_data, outbound) in self.outbound_substreams.drain(..) {
|
2018-09-14 13:18:10 +02:00
|
|
|
out.push(user_data);
|
|
|
|
self.muxer.destroy_outbound(outbound);
|
|
|
|
}
|
|
|
|
out
|
|
|
|
}
|
2018-10-15 10:42:11 +02:00
|
|
|
|
2019-03-11 17:19:50 +01:00
|
|
|
/// Provides an API similar to `Future`.
|
2019-09-16 11:08:44 +02:00
|
|
|
pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<NodeEvent<TMuxer, TUserData>, IoError>> {
|
2018-09-14 13:18:10 +02:00
|
|
|
// Polling inbound substream.
|
2019-09-16 11:08:44 +02:00
|
|
|
match self.muxer.poll_inbound(cx) {
|
|
|
|
Poll::Ready(Ok(substream)) => {
|
2019-03-11 17:19:50 +01:00
|
|
|
let substream = muxing::substream_from_ref(self.muxer.clone(), substream);
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(NodeEvent::InboundSubstream {
|
2019-03-11 17:19:50 +01:00
|
|
|
substream,
|
|
|
|
}));
|
2018-09-14 13:18:10 +02:00
|
|
|
}
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
|
|
|
|
Poll::Pending => {}
|
2018-09-14 13:18:10 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Polling outbound substreams.
|
|
|
|
// We remove each element from `outbound_substreams` one by one and add them back.
|
|
|
|
for n in (0..self.outbound_substreams.len()).rev() {
|
|
|
|
let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
|
2019-09-16 11:08:44 +02:00
|
|
|
match self.muxer.poll_outbound(cx, &mut outbound) {
|
|
|
|
Poll::Ready(Ok(substream)) => {
|
2018-09-14 13:18:10 +02:00
|
|
|
let substream = muxing::substream_from_ref(self.muxer.clone(), substream);
|
|
|
|
self.muxer.destroy_outbound(outbound);
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(NodeEvent::OutboundSubstream {
|
2018-09-14 13:18:10 +02:00
|
|
|
user_data,
|
|
|
|
substream,
|
2019-03-11 17:19:50 +01:00
|
|
|
}));
|
2018-09-14 13:18:10 +02:00
|
|
|
}
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Pending => {
|
2018-09-14 13:18:10 +02:00
|
|
|
self.outbound_substreams.push((user_data, outbound));
|
|
|
|
}
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Ready(Err(err)) => {
|
2018-09-14 13:18:10 +02:00
|
|
|
self.muxer.destroy_outbound(outbound);
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Err(err.into()));
|
2018-09-14 13:18:10 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Nothing happened. Register our task to be notified and return.
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Pending
|
2018-09-14 13:18:10 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-17 10:17:40 +01:00
|
|
|
impl<TMuxer, TUserData> fmt::Debug for NodeStream<TMuxer, TUserData>
|
2018-09-14 13:18:10 +02:00
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
{
|
2019-02-11 14:58:15 +01:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
2018-09-14 13:18:10 +02:00
|
|
|
f.debug_struct("NodeStream")
|
|
|
|
.field("outbound_substreams", &self.outbound_substreams.len())
|
|
|
|
.finish()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-17 10:17:40 +01:00
|
|
|
impl<TMuxer, TUserData> Drop for NodeStream<TMuxer, TUserData>
|
2018-09-14 13:18:10 +02:00
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
{
|
|
|
|
fn drop(&mut self) {
|
|
|
|
// The substreams that were produced will continue to work, as the muxer is held in an Arc.
|
|
|
|
// However we will no longer process any further inbound or outbound substream, and we
|
|
|
|
// therefore close everything.
|
2019-11-19 11:18:16 +01:00
|
|
|
for (_, outbound) in self.outbound_substreams.drain(..) {
|
2018-09-14 13:18:10 +02:00
|
|
|
self.muxer.destroy_outbound(outbound);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-11 17:19:50 +01:00
|
|
|
impl<TMuxer> Future for Close<TMuxer>
|
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
{
|
2019-09-16 11:08:44 +02:00
|
|
|
type Output = Result<(), IoError>;
|
2019-03-11 17:19:50 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
|
|
|
match self.muxer.close(cx) {
|
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
|
|
|
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
|
|
|
|
}
|
2019-03-11 17:19:50 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<TMuxer> fmt::Debug for Close<TMuxer>
|
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
{
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
|
|
|
f.debug_struct("Close")
|
|
|
|
.finish()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-03 09:54:31 +01:00
|
|
|
impl<TMuxer, TUserData> fmt::Debug for NodeEvent<TMuxer, TUserData>
|
|
|
|
where
|
|
|
|
TMuxer: muxing::StreamMuxer,
|
|
|
|
TMuxer::Substream: fmt::Debug,
|
|
|
|
TUserData: fmt::Debug,
|
2018-09-14 13:18:10 +02:00
|
|
|
{
|
2019-02-11 14:58:15 +01:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
2018-09-14 13:18:10 +02:00
|
|
|
match self {
|
2018-11-03 09:54:31 +01:00
|
|
|
NodeEvent::InboundSubstream { substream } => {
|
|
|
|
f.debug_struct("NodeEvent::OutboundClosed")
|
|
|
|
.field("substream", substream)
|
2018-09-14 13:18:10 +02:00
|
|
|
.finish()
|
|
|
|
},
|
2018-11-03 09:54:31 +01:00
|
|
|
NodeEvent::OutboundSubstream { user_data, substream } => {
|
|
|
|
f.debug_struct("NodeEvent::OutboundSubstream")
|
|
|
|
.field("user_data", user_data)
|
|
|
|
.field("substream", substream)
|
2018-09-14 13:18:10 +02:00
|
|
|
.finish()
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
2018-11-03 09:54:31 +01:00
|
|
|
}
|