// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent};
use futures::prelude::*;
use std::{fmt, io::{Error as IoError, Read, Write}};
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug, Copy, Clone)]
pub enum EitherError {
A(A),
B(B)
}
impl fmt::Display for EitherError
where
A: fmt::Display,
B: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
EitherError::A(a) => a.fmt(f),
EitherError::B(b) => b.fmt(f)
}
}
}
impl std::error::Error for EitherError
where
A: std::error::Error,
B: std::error::Error
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
EitherError::A(a) => a.source(),
EitherError::B(b) => b.source()
}
}
}
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
pub enum EitherOutput {
First(A),
Second(B),
}
impl AsyncRead for EitherOutput
where
A: AsyncRead,
B: AsyncRead,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self {
EitherOutput::First(a) => a.prepare_uninitialized_buffer(buf),
EitherOutput::Second(b) => b.prepare_uninitialized_buffer(buf),
}
}
fn read_buf(&mut self, buf: &mut Bu) -> Poll {
match self {
EitherOutput::First(a) => a.read_buf(buf),
EitherOutput::Second(b) => b.read_buf(buf),
}
}
}
impl Read for EitherOutput
where
A: Read,
B: Read,
{
fn read(&mut self, buf: &mut [u8]) -> Result {
match self {
EitherOutput::First(a) => a.read(buf),
EitherOutput::Second(b) => b.read(buf),
}
}
}
impl AsyncWrite for EitherOutput
where
A: AsyncWrite,
B: AsyncWrite,
{
fn shutdown(&mut self) -> Poll<(), IoError> {
match self {
EitherOutput::First(a) => a.shutdown(),
EitherOutput::Second(b) => b.shutdown(),
}
}
}
impl Write for EitherOutput
where
A: Write,
B: Write,
{
fn write(&mut self, buf: &[u8]) -> Result {
match self {
EitherOutput::First(a) => a.write(buf),
EitherOutput::Second(b) => b.write(buf),
}
}
fn flush(&mut self) -> Result<(), IoError> {
match self {
EitherOutput::First(a) => a.flush(),
EitherOutput::Second(b) => b.flush(),
}
}
}
impl StreamMuxer for EitherOutput
where
A: StreamMuxer,
B: StreamMuxer,
{
type Substream = EitherOutput;
type OutboundSubstream = EitherOutbound;
type Error = IoError;
fn poll_inbound(&self) -> Poll {
match self {
EitherOutput::First(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::First)).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into()),
}
}
fn open_outbound(&self) -> Self::OutboundSubstream {
match self {
EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
}
}
fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => {
inner.poll_outbound(substream).map(|p| p.map(EitherOutput::First)).map_err(|e| e.into())
},
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => {
inner.poll_outbound(substream).map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into())
},
_ => panic!("Wrong API usage")
}
}
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
match self {
EitherOutput::First(inner) => {
match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage")
}
},
EitherOutput::Second(inner) => {
match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage")
}
},
}
}
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self {
EitherOutput::First(ref inner) => inner.prepare_uninitialized_buffer(buf),
EitherOutput::Second(ref inner) => inner.prepare_uninitialized_buffer(buf),
}
}
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.read_substream(sub, buf).map_err(|e| e.into())
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.read_substream(sub, buf).map_err(|e| e.into())
},
_ => panic!("Wrong API usage")
}
}
fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.write_substream(sub, buf).map_err(|e| e.into())
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.write_substream(sub, buf).map_err(|e| e.into())
},
_ => panic!("Wrong API usage")
}
}
fn flush_substream(&self, sub: &mut Self::Substream) -> Poll<(), Self::Error> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.flush_substream(sub).map_err(|e| e.into())
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.flush_substream(sub).map_err(|e| e.into())
},
_ => panic!("Wrong API usage")
}
}
fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), Self::Error> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.shutdown_substream(sub).map_err(|e| e.into())
},
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.shutdown_substream(sub).map_err(|e| e.into())
},
_ => panic!("Wrong API usage")
}
}
fn destroy_substream(&self, substream: Self::Substream) {
match self {
EitherOutput::First(inner) => {
match substream {
EitherOutput::First(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage")
}
},
EitherOutput::Second(inner) => {
match substream {
EitherOutput::Second(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage")
}
},
}
}
fn is_remote_acknowledged(&self) -> bool {
match self {
EitherOutput::First(inner) => inner.is_remote_acknowledged(),
EitherOutput::Second(inner) => inner.is_remote_acknowledged()
}
}
fn close(&self) -> Poll<(), Self::Error> {
match self {
EitherOutput::First(inner) => inner.close().map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.close().map_err(|e| e.into()),
}
}
fn flush_all(&self) -> Poll<(), Self::Error> {
match self {
EitherOutput::First(inner) => inner.flush_all().map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.flush_all().map_err(|e| e.into()),
}
}
}
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherOutbound {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream {
First(A),
Second(B),
}
impl Stream for EitherListenStream
where
AStream: Stream- >,
BStream: Stream
- >,
{
type Item = ListenerEvent>;
type Error = EitherError;
fn poll(&mut self) -> Poll