2018-07-17 16:31:32 +02:00
|
|
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
2017-11-22 18:01:28 +01:00
|
|
|
//
|
|
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
|
|
// to deal in the Software without restriction, including without limitation
|
|
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
|
|
//
|
|
|
|
// The above copyright notice and this permission notice shall be included in
|
|
|
|
// all copies or substantial portions of the Software.
|
|
|
|
//
|
|
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
mod codec;
|
2017-11-01 11:59:52 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
use std::{cmp, iter, mem, pin::Pin, task::Context, task::Poll};
|
2018-08-31 10:31:34 +02:00
|
|
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
2019-09-16 11:08:44 +02:00
|
|
|
use std::sync::Arc;
|
|
|
|
use std::task::Waker;
|
2018-07-17 16:31:32 +02:00
|
|
|
use bytes::Bytes;
|
2018-12-18 11:06:37 +01:00
|
|
|
use libp2p_core::{
|
2018-11-15 17:41:11 +01:00
|
|
|
Endpoint,
|
|
|
|
StreamMuxer,
|
2020-01-13 14:34:43 +01:00
|
|
|
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
|
2018-11-15 17:41:11 +01:00
|
|
|
};
|
2018-12-18 11:06:37 +01:00
|
|
|
use log::{debug, trace};
|
2018-07-17 16:31:32 +02:00
|
|
|
use parking_lot::Mutex;
|
2019-09-16 11:08:44 +02:00
|
|
|
use fnv::FnvHashSet;
|
2019-11-01 16:53:11 +01:00
|
|
|
use futures::{prelude::*, future, ready, stream::Fuse};
|
2019-09-16 11:08:44 +02:00
|
|
|
use futures::task::{ArcWake, waker_ref};
|
|
|
|
use futures_codec::Framed;
|
2018-07-17 16:31:32 +02:00
|
|
|
|
|
|
|
/// Configuration for the multiplexer.
|
2018-08-17 09:39:37 +02:00
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct MplexConfig {
|
|
|
|
/// Maximum number of simultaneously-open substreams.
|
|
|
|
max_substreams: usize,
|
|
|
|
/// Maximum number of elements in the internal buffer.
|
|
|
|
max_buffer_len: usize,
|
|
|
|
/// Behaviour when the buffer size limit is reached.
|
|
|
|
max_buffer_behaviour: MaxBufferBehaviour,
|
2019-01-04 13:25:51 +01:00
|
|
|
/// When sending data, split it into frames whose maximum size is this value
|
|
|
|
/// (max 1MByte, as per the Mplex spec).
|
2018-09-06 15:43:49 +02:00
|
|
|
split_send_size: usize,
|
2018-08-17 09:39:37 +02:00
|
|
|
}
|
2018-07-17 16:31:32 +02:00
|
|
|
|
|
|
|
impl MplexConfig {
|
|
|
|
/// Builds the default configuration.
|
|
|
|
pub fn new() -> MplexConfig {
|
|
|
|
Default::default()
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
2018-08-17 09:39:37 +02:00
|
|
|
|
|
|
|
/// Sets the maximum number of simultaneously opened substreams, after which an error is
|
|
|
|
/// generated and the connection closes.
|
|
|
|
///
|
|
|
|
/// A limit is necessary in order to avoid DoS attacks.
|
|
|
|
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
|
|
|
|
self.max_substreams = max;
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Sets the maximum number of pending incoming messages.
|
|
|
|
///
|
|
|
|
/// A limit is necessary in order to avoid DoS attacks.
|
|
|
|
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
|
|
|
|
self.max_buffer_len = max;
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Sets the behaviour when the maximum buffer length has been reached.
|
|
|
|
///
|
|
|
|
/// See the documentation of `MaxBufferBehaviour`.
|
|
|
|
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
|
|
|
|
self.max_buffer_behaviour = behaviour;
|
|
|
|
self
|
|
|
|
}
|
2018-11-15 17:41:11 +01:00
|
|
|
|
2019-01-04 13:25:51 +01:00
|
|
|
/// Sets the frame size used when sending data. Capped at 1Mbyte as per the
|
|
|
|
/// Mplex spec.
|
|
|
|
pub fn split_send_size(&mut self, size: usize) -> &mut Self {
|
|
|
|
let size = cmp::min(size, codec::MAX_FRAME_SIZE);
|
|
|
|
self.split_send_size = size;
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
2019-01-25 11:26:37 +01:00
|
|
|
fn upgrade<C>(self, i: C) -> Multiplex<C>
|
2018-11-15 17:41:11 +01:00
|
|
|
where
|
2019-09-16 11:08:44 +02:00
|
|
|
C: AsyncRead + AsyncWrite + Unpin
|
2018-11-15 17:41:11 +01:00
|
|
|
{
|
|
|
|
let max_buffer_len = self.max_buffer_len;
|
|
|
|
Multiplex {
|
|
|
|
inner: Mutex::new(MultiplexInner {
|
|
|
|
error: Ok(()),
|
2019-09-16 11:08:44 +02:00
|
|
|
inner: Framed::new(i, codec::Codec::new()).fuse(),
|
2018-11-15 17:41:11 +01:00
|
|
|
config: self,
|
|
|
|
buffer: Vec::with_capacity(cmp::min(max_buffer_len, 512)),
|
|
|
|
opened_substreams: Default::default(),
|
2019-01-25 11:26:37 +01:00
|
|
|
next_outbound_stream_id: 0,
|
2018-11-15 17:41:11 +01:00
|
|
|
notifier_read: Arc::new(Notifier {
|
2019-09-16 11:08:44 +02:00
|
|
|
to_wake: Mutex::new(Default::default()),
|
2018-11-15 17:41:11 +01:00
|
|
|
}),
|
|
|
|
notifier_write: Arc::new(Notifier {
|
2019-09-16 11:08:44 +02:00
|
|
|
to_wake: Mutex::new(Default::default()),
|
2018-11-15 17:41:11 +01:00
|
|
|
}),
|
2019-02-20 16:25:34 +01:00
|
|
|
is_shutdown: false,
|
2018-11-15 17:41:11 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
2018-08-17 09:39:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for MplexConfig {
|
|
|
|
fn default() -> MplexConfig {
|
|
|
|
MplexConfig {
|
|
|
|
max_substreams: 128,
|
|
|
|
max_buffer_len: 4096,
|
|
|
|
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
|
2018-09-06 15:43:49 +02:00
|
|
|
split_send_size: 1024,
|
2018-08-17 09:39:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Behaviour when the maximum length of the buffer is reached.
|
|
|
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
|
|
|
pub enum MaxBufferBehaviour {
|
|
|
|
/// Produce an error on all the substreams.
|
|
|
|
CloseAll,
|
|
|
|
/// No new message will be read from the underlying connection if the buffer is full.
|
|
|
|
///
|
|
|
|
/// This can potentially introduce a deadlock if you are waiting for a message from a substream
|
|
|
|
/// before processing the messages received on another substream.
|
|
|
|
Block,
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
|
|
|
|
2018-11-15 17:41:11 +01:00
|
|
|
impl UpgradeInfo for MplexConfig {
|
2018-12-11 15:13:10 +01:00
|
|
|
type Info = &'static [u8];
|
|
|
|
type InfoIter = iter::Once<Self::Info>;
|
2018-11-15 17:41:11 +01:00
|
|
|
|
2018-12-11 15:13:10 +01:00
|
|
|
fn protocol_info(&self) -> Self::InfoIter {
|
|
|
|
iter::once(b"/mplex/6.7.0")
|
2018-11-15 17:41:11 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<C> InboundUpgrade<C> for MplexConfig
|
2018-07-17 16:31:32 +02:00
|
|
|
where
|
2019-09-16 11:08:44 +02:00
|
|
|
C: AsyncRead + AsyncWrite + Unpin,
|
2018-07-17 16:31:32 +02:00
|
|
|
{
|
2020-01-13 14:34:43 +01:00
|
|
|
type Output = Multiplex<C>;
|
2018-11-15 17:41:11 +01:00
|
|
|
type Error = IoError;
|
2019-09-16 11:08:44 +02:00
|
|
|
type Future = future::Ready<Result<Self::Output, IoError>>;
|
2017-12-14 17:37:32 +01:00
|
|
|
|
2020-01-13 14:34:43 +01:00
|
|
|
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
|
2019-09-16 11:08:44 +02:00
|
|
|
future::ready(Ok(self.upgrade(socket)))
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
2018-11-15 17:41:11 +01:00
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
|
2018-11-15 17:41:11 +01:00
|
|
|
impl<C> OutboundUpgrade<C> for MplexConfig
|
|
|
|
where
|
2019-09-16 11:08:44 +02:00
|
|
|
C: AsyncRead + AsyncWrite + Unpin,
|
2018-11-15 17:41:11 +01:00
|
|
|
{
|
2020-01-13 14:34:43 +01:00
|
|
|
type Output = Multiplex<C>;
|
2018-11-15 17:41:11 +01:00
|
|
|
type Error = IoError;
|
2019-09-16 11:08:44 +02:00
|
|
|
type Future = future::Ready<Result<Self::Output, IoError>>;
|
2018-11-15 17:41:11 +01:00
|
|
|
|
2020-01-13 14:34:43 +01:00
|
|
|
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
|
2019-09-16 11:08:44 +02:00
|
|
|
future::ready(Ok(self.upgrade(socket)))
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
/// Multiplexer. Implements the `StreamMuxer` trait.
|
|
|
|
pub struct Multiplex<C> {
|
2018-08-31 10:31:34 +02:00
|
|
|
inner: Mutex<MultiplexInner<C>>,
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
// Struct shared throughout the implementation.
|
|
|
|
struct MultiplexInner<C> {
|
2018-11-08 03:01:33 +11:00
|
|
|
// Error that happened earlier. Should poison any attempt to use this `MultiplexError`.
|
2018-08-13 11:29:07 +02:00
|
|
|
error: Result<(), IoError>,
|
2018-07-17 16:31:32 +02:00
|
|
|
// Underlying stream.
|
2019-09-16 11:08:44 +02:00
|
|
|
inner: Fuse<Framed<C, codec::Codec>>,
|
2018-08-17 09:39:37 +02:00
|
|
|
/// The original configuration.
|
|
|
|
config: MplexConfig,
|
2018-07-17 16:31:32 +02:00
|
|
|
// Buffer of elements pulled from the stream but not processed yet.
|
|
|
|
buffer: Vec<codec::Elem>,
|
|
|
|
// List of Ids of opened substreams. Used to filter out messages that don't belong to any
|
2018-08-13 11:29:07 +02:00
|
|
|
// substream. Note that this is handled exclusively by `next_match`.
|
2018-09-06 13:59:14 +02:00
|
|
|
// The `Endpoint` value denotes who initiated the substream from our point of view
|
|
|
|
// (see note [StreamId]).
|
|
|
|
opened_substreams: FnvHashSet<(u32, Endpoint)>,
|
2019-01-25 11:26:37 +01:00
|
|
|
// Id of the next outgoing substream.
|
2018-07-17 16:31:32 +02:00
|
|
|
next_outbound_stream_id: u32,
|
2019-09-16 11:08:44 +02:00
|
|
|
/// List of wakers to wake when a read event happens on the underlying stream.
|
2018-09-05 01:53:39 +02:00
|
|
|
notifier_read: Arc<Notifier>,
|
2019-09-16 11:08:44 +02:00
|
|
|
/// List of wakers to wake when a write event happens on the underlying stream.
|
2018-09-05 01:53:39 +02:00
|
|
|
notifier_write: Arc<Notifier>,
|
2018-10-11 15:43:34 +02:00
|
|
|
/// If true, the connection has been shut down. We need to be careful not to accidentally
|
|
|
|
/// call `Sink::poll_complete` or `Sink::start_send` after `Sink::close`.
|
2019-02-20 16:25:34 +01:00
|
|
|
is_shutdown: bool,
|
2018-09-05 01:53:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
struct Notifier {
|
2019-09-16 11:08:44 +02:00
|
|
|
/// List of wakers to wake.
|
|
|
|
to_wake: Mutex<Vec<Waker>>,
|
2018-09-05 01:53:39 +02:00
|
|
|
}
|
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
impl Notifier {
|
|
|
|
fn insert(&self, waker: &Waker) {
|
|
|
|
let mut to_wake = self.to_wake.lock();
|
|
|
|
if to_wake.iter().all(|w| !w.will_wake(waker)) {
|
|
|
|
to_wake.push(waker.clone());
|
2018-09-05 01:53:39 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
impl ArcWake for Notifier {
|
|
|
|
fn wake_by_ref(arc_self: &Arc<Self>) {
|
|
|
|
let wakers = mem::replace(&mut *arc_self.to_wake.lock(), Default::default());
|
|
|
|
for waker in wakers {
|
|
|
|
waker.wake();
|
|
|
|
}
|
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
|
|
|
|
2018-09-06 13:59:14 +02:00
|
|
|
// Note [StreamId]: mplex no longer partitions stream IDs into odd (for initiators) and
|
|
|
|
// even ones (for receivers). Streams are instead identified by a number and whether the flag
|
|
|
|
// is odd (for receivers) or even (for initiators). `Open` frames do not have a flag, but are
|
|
|
|
// sent unidirectional. As a consequence, we need to remember if the stream was initiated by us
|
|
|
|
// or remotely and we store the information from our point of view, i.e. receiving an `Open` frame
|
|
|
|
// is stored as `(<u32>, Listener)`, sending an `Open` frame as `(<u32>, Dialer)`. Receiving
|
|
|
|
// a `Data` frame with flag `MessageReceiver` (= 1) means that we initiated the stream, so the
|
|
|
|
// entry has been stored as `(<u32>, Dialer)`. So, when looking up streams based on frames
|
|
|
|
// received, we have to invert the `Endpoint`, except for `Open`.
|
|
|
|
|
2018-08-13 11:29:07 +02:00
|
|
|
/// Processes elements in `inner` until one matching `filter` is found.
|
|
|
|
///
|
2019-09-20 10:46:13 +02:00
|
|
|
/// If `Pending` is returned, the waker is kept and notified later, just like with any `Poll`.
|
2019-09-16 11:08:44 +02:00
|
|
|
/// `Ready(Ok())` is almost always returned. An error is returned if the stream is EOF.
|
|
|
|
fn next_match<C, F, O>(inner: &mut MultiplexInner<C>, cx: &mut Context, mut filter: F) -> Poll<Result<O, IoError>>
|
|
|
|
where C: AsyncRead + AsyncWrite + Unpin,
|
2018-07-17 16:31:32 +02:00
|
|
|
F: FnMut(&codec::Elem) -> Option<O>,
|
|
|
|
{
|
2018-08-13 11:29:07 +02:00
|
|
|
// If an error happened earlier, immediately return it.
|
|
|
|
if let Err(ref err) = inner.error {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Err(IoError::new(err.kind(), err.to_string())));
|
2018-08-13 11:29:07 +02:00
|
|
|
}
|
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
if let Some((offset, out)) = inner.buffer.iter().enumerate().filter_map(|(n, v)| filter(v).map(|v| (n, v))).next() {
|
2019-09-16 11:08:44 +02:00
|
|
|
// Found a matching entry in the existing buffer!
|
|
|
|
|
2018-08-17 09:39:37 +02:00
|
|
|
// The buffer was full and no longer is, so let's notify everything.
|
|
|
|
if inner.buffer.len() == inner.config.max_buffer_len {
|
2019-09-16 11:08:44 +02:00
|
|
|
ArcWake::wake_by_ref(&inner.notifier_read);
|
2018-08-17 09:39:37 +02:00
|
|
|
}
|
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
inner.buffer.remove(offset);
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(out));
|
2017-11-22 18:01:28 +01:00
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
loop {
|
2018-08-17 09:39:37 +02:00
|
|
|
// Check if we reached max buffer length first.
|
|
|
|
debug_assert!(inner.buffer.len() <= inner.config.max_buffer_len);
|
|
|
|
if inner.buffer.len() == inner.config.max_buffer_len {
|
|
|
|
debug!("Reached mplex maximum buffer length");
|
|
|
|
match inner.config.max_buffer_behaviour {
|
|
|
|
MaxBufferBehaviour::CloseAll => {
|
|
|
|
inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length"));
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")));
|
2018-08-17 09:39:37 +02:00
|
|
|
},
|
|
|
|
MaxBufferBehaviour::Block => {
|
2019-09-16 11:08:44 +02:00
|
|
|
inner.notifier_read.insert(cx.waker());
|
|
|
|
return Poll::Pending
|
2018-08-17 09:39:37 +02:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
inner.notifier_read.insert(cx.waker());
|
|
|
|
let elem = match Stream::poll_next(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_read))) {
|
|
|
|
Poll::Ready(Some(Ok(item))) => item,
|
|
|
|
Poll::Ready(None) => return Poll::Ready(Err(IoErrorKind::BrokenPipe.into())),
|
|
|
|
Poll::Pending => return Poll::Pending,
|
|
|
|
Poll::Ready(Some(Err(err))) => {
|
2018-08-13 11:29:07 +02:00
|
|
|
let err2 = IoError::new(err.kind(), err.to_string());
|
|
|
|
inner.error = Err(err);
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Err(err2));
|
2018-07-17 16:31:32 +02:00
|
|
|
},
|
2017-11-01 11:59:52 +01:00
|
|
|
};
|
|
|
|
|
2018-09-05 01:53:39 +02:00
|
|
|
trace!("Received message: {:?}", elem);
|
2018-08-13 11:29:07 +02:00
|
|
|
|
2018-09-05 01:53:39 +02:00
|
|
|
// Handle substreams opening/closing.
|
|
|
|
match elem {
|
|
|
|
codec::Elem::Open { substream_id } => {
|
2018-09-06 13:59:14 +02:00
|
|
|
if !inner.opened_substreams.insert((substream_id, Endpoint::Listener)) {
|
2018-09-05 01:53:39 +02:00
|
|
|
debug!("Received open message for substream {} which was already open", substream_id)
|
2017-11-22 18:01:28 +01:00
|
|
|
}
|
2018-09-06 13:59:14 +02:00
|
|
|
}
|
|
|
|
codec::Elem::Close { substream_id, endpoint, .. } | codec::Elem::Reset { substream_id, endpoint, .. } => {
|
|
|
|
inner.opened_substreams.remove(&(substream_id, !endpoint));
|
|
|
|
}
|
2018-09-05 01:53:39 +02:00
|
|
|
_ => ()
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(out) = filter(&elem) {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(out));
|
2018-07-17 16:31:32 +02:00
|
|
|
} else {
|
2018-09-06 13:59:14 +02:00
|
|
|
let endpoint = elem.endpoint().unwrap_or(Endpoint::Dialer);
|
|
|
|
if inner.opened_substreams.contains(&(elem.substream_id(), !endpoint)) || elem.is_open_msg() {
|
2018-09-05 01:53:39 +02:00
|
|
|
inner.buffer.push(elem);
|
|
|
|
} else if !elem.is_close_or_reset_msg() {
|
|
|
|
debug!("Ignored message {:?} because the substream wasn't open", elem);
|
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
2017-11-22 18:01:28 +01:00
|
|
|
}
|
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
// Small convenience function that tries to write `elem` to the stream.
|
2019-09-16 11:08:44 +02:00
|
|
|
fn poll_send<C>(inner: &mut MultiplexInner<C>, cx: &mut Context, elem: codec::Elem) -> Poll<Result<(), IoError>>
|
|
|
|
where C: AsyncRead + AsyncWrite + Unpin
|
2018-07-17 16:31:32 +02:00
|
|
|
{
|
2020-04-01 14:28:59 +02:00
|
|
|
ensure_no_error_no_close(inner)?;
|
2019-09-16 11:08:44 +02:00
|
|
|
|
|
|
|
inner.notifier_write.insert(cx.waker());
|
|
|
|
|
|
|
|
match Sink::poll_ready(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) {
|
|
|
|
Poll::Ready(Ok(())) => {
|
|
|
|
match Sink::start_send(Pin::new(&mut inner.inner), elem) {
|
|
|
|
Ok(()) => Poll::Ready(Ok(())),
|
|
|
|
Err(err) => Poll::Ready(Err(err))
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Poll::Pending => Poll::Pending,
|
2020-04-01 14:28:59 +02:00
|
|
|
Poll::Ready(Err(err)) => {
|
|
|
|
inner.error = Err(IoError::new(err.kind(), err.to_string()));
|
|
|
|
Poll::Ready(Err(err))
|
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-01 14:28:59 +02:00
|
|
|
fn ensure_no_error_no_close<C>(inner: &mut MultiplexInner<C>) -> Result<(), IoError>
|
|
|
|
where
|
|
|
|
C: AsyncRead + AsyncWrite + Unpin
|
|
|
|
{
|
|
|
|
if inner.is_shutdown {
|
|
|
|
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
|
|
|
|
}
|
|
|
|
if let Err(ref e) = inner.error {
|
|
|
|
return Err(IoError::new(e.kind(), e.to_string()))
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
impl<C> StreamMuxer for Multiplex<C>
|
2019-09-16 11:08:44 +02:00
|
|
|
where C: AsyncRead + AsyncWrite + Unpin
|
2017-11-22 18:01:28 +01:00
|
|
|
{
|
2018-08-31 10:31:34 +02:00
|
|
|
type Substream = Substream;
|
|
|
|
type OutboundSubstream = OutboundSubstream;
|
2019-04-28 14:42:18 +03:00
|
|
|
type Error = IoError;
|
2017-11-22 18:01:28 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn poll_inbound(&self, cx: &mut Context) -> Poll<Result<Self::Substream, IoError>> {
|
2018-07-17 16:31:32 +02:00
|
|
|
let mut inner = self.inner.lock();
|
2017-12-11 17:57:11 +01:00
|
|
|
|
2018-08-17 09:39:37 +02:00
|
|
|
if inner.opened_substreams.len() >= inner.config.max_substreams {
|
2018-10-29 20:38:32 +11:00
|
|
|
debug!("Refused substream; reached maximum number of substreams {}", inner.config.max_substreams);
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Err(IoError::new(IoErrorKind::ConnectionRefused,
|
|
|
|
"exceeded maximum number of open substreams")));
|
2017-12-11 17:57:11 +01:00
|
|
|
}
|
2017-11-22 18:01:28 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
let num = ready!(next_match(&mut inner, cx, |elem| {
|
2018-07-17 16:31:32 +02:00
|
|
|
match elem {
|
2018-08-13 11:29:07 +02:00
|
|
|
codec::Elem::Open { substream_id } => Some(*substream_id),
|
2018-07-17 16:31:32 +02:00
|
|
|
_ => None,
|
|
|
|
}
|
|
|
|
}));
|
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
let num = match num {
|
|
|
|
Ok(n) => n,
|
|
|
|
Err(err) => return Poll::Ready(Err(err)),
|
|
|
|
};
|
|
|
|
|
2019-03-11 17:19:50 +01:00
|
|
|
debug!("Successfully opened inbound substream {}", num);
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Ready(Ok(Substream {
|
2019-03-11 17:19:50 +01:00
|
|
|
current_data: Bytes::new(),
|
|
|
|
num,
|
|
|
|
endpoint: Endpoint::Listener,
|
2019-03-12 15:59:37 +01:00
|
|
|
local_open: true,
|
|
|
|
remote_open: true,
|
2019-03-11 17:19:50 +01:00
|
|
|
}))
|
2017-11-22 18:01:28 +01:00
|
|
|
}
|
|
|
|
|
2018-08-31 10:31:34 +02:00
|
|
|
fn open_outbound(&self) -> Self::OutboundSubstream {
|
|
|
|
let mut inner = self.inner.lock();
|
2017-11-22 18:01:28 +01:00
|
|
|
|
2018-08-31 10:31:34 +02:00
|
|
|
// Assign a substream ID now.
|
|
|
|
let substream_id = {
|
|
|
|
let n = inner.next_outbound_stream_id;
|
2019-01-25 11:26:37 +01:00
|
|
|
inner.next_outbound_stream_id = inner.next_outbound_stream_id.checked_add(1)
|
|
|
|
.expect("Mplex substream ID overflowed");
|
2018-08-31 10:31:34 +02:00
|
|
|
n
|
|
|
|
};
|
|
|
|
|
2018-09-06 13:59:14 +02:00
|
|
|
inner.opened_substreams.insert((substream_id, Endpoint::Dialer));
|
2018-08-31 10:31:34 +02:00
|
|
|
|
|
|
|
OutboundSubstream {
|
|
|
|
num: substream_id,
|
|
|
|
state: OutboundSubstreamState::SendElem(codec::Elem::Open { substream_id }),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn poll_outbound(&self, cx: &mut Context, substream: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, IoError>> {
|
2018-08-31 10:31:34 +02:00
|
|
|
loop {
|
2018-09-05 01:53:39 +02:00
|
|
|
let mut inner = self.inner.lock();
|
|
|
|
|
2018-08-31 10:31:34 +02:00
|
|
|
let polling = match substream.state {
|
|
|
|
OutboundSubstreamState::SendElem(ref elem) => {
|
2019-09-16 11:08:44 +02:00
|
|
|
poll_send(&mut inner, cx, elem.clone())
|
2018-08-31 10:31:34 +02:00
|
|
|
},
|
|
|
|
OutboundSubstreamState::Flush => {
|
2020-04-01 14:28:59 +02:00
|
|
|
ensure_no_error_no_close(&mut inner)?;
|
2018-09-05 01:53:39 +02:00
|
|
|
let inner = &mut *inner; // Avoids borrow errors
|
2019-09-16 11:08:44 +02:00
|
|
|
inner.notifier_write.insert(cx.waker());
|
|
|
|
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
|
2018-08-31 10:31:34 +02:00
|
|
|
},
|
|
|
|
OutboundSubstreamState::Done => {
|
|
|
|
panic!("Polling outbound substream after it's been succesfully open");
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
match polling {
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Ready(Ok(())) => (),
|
|
|
|
Poll::Pending => return Poll::Pending,
|
|
|
|
Poll::Ready(Err(err)) => {
|
2018-08-31 10:31:34 +02:00
|
|
|
debug!("Failed to open outbound substream {}", substream.num);
|
2018-09-10 14:03:38 +02:00
|
|
|
inner.buffer.retain(|elem| {
|
|
|
|
elem.substream_id() != substream.num || elem.endpoint() == Some(Endpoint::Dialer)
|
|
|
|
});
|
2020-04-01 14:28:59 +02:00
|
|
|
inner.error = Err(IoError::new(err.kind(), err.to_string()));
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Err(err));
|
2018-08-31 10:31:34 +02:00
|
|
|
},
|
|
|
|
};
|
|
|
|
|
2018-09-05 01:53:39 +02:00
|
|
|
drop(inner);
|
|
|
|
|
2018-08-31 10:31:34 +02:00
|
|
|
// Going to next step.
|
|
|
|
match substream.state {
|
|
|
|
OutboundSubstreamState::SendElem(_) => {
|
|
|
|
substream.state = OutboundSubstreamState::Flush;
|
|
|
|
},
|
|
|
|
OutboundSubstreamState::Flush => {
|
|
|
|
debug!("Successfully opened outbound substream {}", substream.num);
|
|
|
|
substream.state = OutboundSubstreamState::Done;
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(Substream {
|
2018-08-31 10:31:34 +02:00
|
|
|
num: substream.num,
|
|
|
|
current_data: Bytes::new(),
|
|
|
|
endpoint: Endpoint::Dialer,
|
2019-03-12 15:59:37 +01:00
|
|
|
local_open: true,
|
|
|
|
remote_open: true,
|
2019-03-11 17:19:50 +01:00
|
|
|
}));
|
2018-08-31 10:31:34 +02:00
|
|
|
},
|
|
|
|
OutboundSubstreamState::Done => unreachable!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
|
|
|
|
// Nothing to do.
|
|
|
|
}
|
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn read_substream(&self, cx: &mut Context, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
|
2018-07-17 16:31:32 +02:00
|
|
|
loop {
|
2018-08-13 11:29:07 +02:00
|
|
|
// First, transfer from `current_data`.
|
2019-01-30 15:41:54 +01:00
|
|
|
if !substream.current_data.is_empty() {
|
2018-08-31 10:31:34 +02:00
|
|
|
let len = cmp::min(substream.current_data.len(), buf.len());
|
|
|
|
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(len));
|
2018-07-17 16:31:32 +02:00
|
|
|
}
|
2017-11-22 18:01:28 +01:00
|
|
|
|
2019-03-12 15:59:37 +01:00
|
|
|
// If the remote writing side is closed, return EOF.
|
|
|
|
if !substream.remote_open {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(0));
|
2019-03-12 15:59:37 +01:00
|
|
|
}
|
|
|
|
|
2018-08-13 11:29:07 +02:00
|
|
|
// Try to find a packet of data in the buffer.
|
2018-07-17 16:31:32 +02:00
|
|
|
let mut inner = self.inner.lock();
|
2019-09-16 11:08:44 +02:00
|
|
|
let next_data_poll = next_match(&mut inner, cx, |elem| {
|
2018-07-17 16:31:32 +02:00
|
|
|
match elem {
|
2018-09-06 13:59:14 +02:00
|
|
|
codec::Elem::Data { substream_id, endpoint, data, .. }
|
|
|
|
if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId]
|
|
|
|
{
|
2019-03-12 15:59:37 +01:00
|
|
|
Some(Some(data.clone()))
|
|
|
|
}
|
|
|
|
codec::Elem::Close { substream_id, endpoint }
|
|
|
|
if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId]
|
|
|
|
{
|
|
|
|
Some(None)
|
2018-09-06 13:59:14 +02:00
|
|
|
}
|
|
|
|
_ => None
|
2018-07-17 16:31:32 +02:00
|
|
|
}
|
|
|
|
});
|
2017-11-22 18:01:28 +01:00
|
|
|
|
2018-08-31 10:31:34 +02:00
|
|
|
// We're in a loop, so all we need to do is set `substream.current_data` to the data we
|
2018-07-17 16:31:32 +02:00
|
|
|
// just read and wait for the next iteration.
|
2019-09-16 11:08:44 +02:00
|
|
|
match next_data_poll {
|
|
|
|
Poll::Ready(Ok(Some(data))) => substream.current_data = data,
|
|
|
|
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
|
|
|
|
Poll::Ready(Ok(None)) => {
|
2019-03-12 15:59:37 +01:00
|
|
|
substream.remote_open = false;
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(0));
|
2019-03-12 15:59:37 +01:00
|
|
|
},
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Pending => {
|
2018-10-29 20:38:32 +11:00
|
|
|
// There was no data packet in the buffer about this substream; maybe it's
|
2018-08-13 11:29:07 +02:00
|
|
|
// because it has been closed.
|
2018-09-06 13:59:14 +02:00
|
|
|
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Pending
|
2018-08-13 11:29:07 +02:00
|
|
|
} else {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(0))
|
2018-08-13 11:29:07 +02:00
|
|
|
}
|
|
|
|
},
|
2018-07-17 16:31:32 +02:00
|
|
|
}
|
2017-11-22 18:01:28 +01:00
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
2018-03-21 15:41:24 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn write_substream(&self, cx: &mut Context, substream: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, IoError>> {
|
2019-03-12 15:59:37 +01:00
|
|
|
if !substream.local_open {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Err(IoErrorKind::BrokenPipe.into()));
|
2019-03-12 15:59:37 +01:00
|
|
|
}
|
|
|
|
|
2018-09-06 15:43:49 +02:00
|
|
|
let mut inner = self.inner.lock();
|
|
|
|
|
|
|
|
let to_write = cmp::min(buf.len(), inner.config.split_send_size);
|
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
let elem = codec::Elem::Data {
|
2018-08-31 10:31:34 +02:00
|
|
|
substream_id: substream.num,
|
2019-12-21 15:35:55 +01:00
|
|
|
data: Bytes::copy_from_slice(&buf[..to_write]),
|
2018-08-31 10:31:34 +02:00
|
|
|
endpoint: substream.endpoint,
|
2018-07-17 16:31:32 +02:00
|
|
|
};
|
2018-03-21 15:41:24 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
match poll_send(&mut inner, cx, elem) {
|
|
|
|
Poll::Ready(Ok(())) => Poll::Ready(Ok(to_write)),
|
|
|
|
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
|
|
|
|
Poll::Pending => Poll::Pending,
|
2018-03-21 15:41:24 +01:00
|
|
|
}
|
2018-07-17 16:31:32 +02:00
|
|
|
}
|
2018-03-21 15:41:24 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn flush_substream(&self, cx: &mut Context, _substream: &mut Self::Substream) -> Poll<Result<(), IoError>> {
|
2018-07-17 16:31:32 +02:00
|
|
|
let mut inner = self.inner.lock();
|
2020-04-01 14:28:59 +02:00
|
|
|
ensure_no_error_no_close(&mut inner)?;
|
2018-10-11 15:43:34 +02:00
|
|
|
let inner = &mut *inner; // Avoids borrow errors
|
2019-09-16 11:08:44 +02:00
|
|
|
inner.notifier_write.insert(cx.waker());
|
2020-04-01 14:28:59 +02:00
|
|
|
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
|
|
|
|
if let Poll::Ready(Err(err)) = &result {
|
|
|
|
inner.error = Err(IoError::new(err.kind(), err.to_string()));
|
|
|
|
}
|
|
|
|
result
|
2018-07-17 16:31:32 +02:00
|
|
|
}
|
2018-03-21 15:41:24 +01:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn shutdown_substream(&self, cx: &mut Context, sub: &mut Self::Substream) -> Poll<Result<(), IoError>> {
|
2019-03-12 15:59:37 +01:00
|
|
|
if !sub.local_open {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(()));
|
2019-03-12 15:59:37 +01:00
|
|
|
}
|
|
|
|
|
2018-11-16 11:35:57 +01:00
|
|
|
let elem = codec::Elem::Close {
|
2018-10-11 10:35:14 +02:00
|
|
|
substream_id: sub.num,
|
|
|
|
endpoint: sub.endpoint,
|
2018-07-17 16:31:32 +02:00
|
|
|
};
|
2018-03-21 15:41:24 +01:00
|
|
|
|
2018-07-17 16:31:32 +02:00
|
|
|
let mut inner = self.inner.lock();
|
2019-09-16 11:08:44 +02:00
|
|
|
let result = poll_send(&mut inner, cx, elem);
|
|
|
|
if let Poll::Ready(Ok(())) = result {
|
2019-03-12 15:59:37 +01:00
|
|
|
sub.local_open = false;
|
|
|
|
}
|
|
|
|
result
|
2018-07-17 16:31:32 +02:00
|
|
|
}
|
2018-03-21 15:41:24 +01:00
|
|
|
|
2018-10-11 10:35:14 +02:00
|
|
|
fn destroy_substream(&self, sub: Self::Substream) {
|
2018-09-10 10:21:19 +02:00
|
|
|
self.inner.lock().buffer.retain(|elem| {
|
2018-10-11 10:35:14 +02:00
|
|
|
elem.substream_id() != sub.num || elem.endpoint() == Some(sub.endpoint)
|
2018-09-10 10:21:19 +02:00
|
|
|
})
|
2018-03-21 15:41:24 +01:00
|
|
|
}
|
2018-09-14 13:18:10 +02:00
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn close(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
|
2018-10-11 10:35:14 +02:00
|
|
|
let inner = &mut *self.inner.lock();
|
2020-04-01 14:28:59 +02:00
|
|
|
if inner.is_shutdown {
|
|
|
|
return Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
if let Err(ref e) = inner.error {
|
|
|
|
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
|
|
|
|
}
|
2019-09-16 11:08:44 +02:00
|
|
|
inner.notifier_write.insert(cx.waker());
|
|
|
|
match Sink::poll_close(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) {
|
|
|
|
Poll::Ready(Ok(())) => {
|
|
|
|
inner.is_shutdown = true;
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
2020-04-01 14:28:59 +02:00
|
|
|
Poll::Ready(Err(err)) => {
|
|
|
|
inner.error = Err(IoError::new(err.kind(), err.to_string()));
|
|
|
|
Poll::Ready(Err(err))
|
|
|
|
}
|
2019-09-16 11:08:44 +02:00
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
}
|
2018-09-14 13:18:10 +02:00
|
|
|
}
|
|
|
|
|
2019-09-16 11:08:44 +02:00
|
|
|
fn flush_all(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
|
2018-10-11 10:35:14 +02:00
|
|
|
let inner = &mut *self.inner.lock();
|
2018-10-11 15:43:34 +02:00
|
|
|
if inner.is_shutdown {
|
2019-09-16 11:08:44 +02:00
|
|
|
return Poll::Ready(Ok(()))
|
2018-10-11 15:43:34 +02:00
|
|
|
}
|
2020-04-01 14:28:59 +02:00
|
|
|
if let Err(ref e) = inner.error {
|
|
|
|
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
|
|
|
|
}
|
2019-09-16 11:08:44 +02:00
|
|
|
inner.notifier_write.insert(cx.waker());
|
2020-04-01 14:28:59 +02:00
|
|
|
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
|
|
|
|
if let Poll::Ready(Err(err)) = &result {
|
|
|
|
inner.error = Err(IoError::new(err.kind(), err.to_string()));
|
|
|
|
}
|
|
|
|
result
|
2018-09-14 13:18:10 +02:00
|
|
|
}
|
2017-11-01 11:59:52 +01:00
|
|
|
}
|
2018-08-31 10:31:34 +02:00
|
|
|
|
|
|
|
/// Active attempt to open an outbound substream.
|
|
|
|
pub struct OutboundSubstream {
|
|
|
|
/// Substream number.
|
|
|
|
num: u32,
|
|
|
|
state: OutboundSubstreamState,
|
|
|
|
}
|
|
|
|
|
|
|
|
enum OutboundSubstreamState {
|
|
|
|
/// We need to send `Elem` on the underlying stream.
|
|
|
|
SendElem(codec::Elem),
|
|
|
|
/// We need to flush the underlying stream.
|
|
|
|
Flush,
|
|
|
|
/// The substream is open and the `OutboundSubstream` is now useless.
|
|
|
|
Done,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Active substream to the remote.
|
|
|
|
pub struct Substream {
|
|
|
|
/// Substream number.
|
|
|
|
num: u32,
|
|
|
|
// Read buffer. Contains data read from `inner` but not yet dispatched by a call to `read()`.
|
|
|
|
current_data: Bytes,
|
|
|
|
endpoint: Endpoint,
|
2019-03-12 15:59:37 +01:00
|
|
|
/// If true, our writing side is still open.
|
|
|
|
local_open: bool,
|
|
|
|
/// If true, the remote writing side is still open.
|
|
|
|
remote_open: bool,
|
2018-08-31 10:31:34 +02:00
|
|
|
}
|