// Copyright 2017 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent}; use futures::{prelude::*, io::{IoSlice, IoSliceMut}}; use pin_project::{pin_project, project}; use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll}; #[derive(Debug, Copy, Clone)] pub enum EitherError { A(A), B(B) } impl fmt::Display for EitherError where A: fmt::Display, B: fmt::Display { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { EitherError::A(a) => a.fmt(f), EitherError::B(b) => b.fmt(f) } } } impl std::error::Error for EitherError where A: std::error::Error, B: std::error::Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { EitherError::A(a) => a.source(), EitherError::B(b) => b.source() } } } /// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to /// either `First` or `Second`. #[pin_project] #[derive(Debug, Copy, Clone)] pub enum EitherOutput { First(#[pin] A), Second(#[pin] B), } impl AsyncRead for EitherOutput where A: AsyncRead, B: AsyncRead, { #[project] fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => AsyncRead::poll_read(a, cx, buf), EitherOutput::Second(b) => AsyncRead::poll_read(b, cx, buf), } } #[project] fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs), EitherOutput::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs), } } } impl AsyncWrite for EitherOutput where A: AsyncWrite, B: AsyncWrite, { #[project] fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => AsyncWrite::poll_write(a, cx, buf), EitherOutput::Second(b) => AsyncWrite::poll_write(b, cx, buf), } } #[project] fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs), EitherOutput::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs), } } #[project] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => AsyncWrite::poll_flush(a, cx), EitherOutput::Second(b) => AsyncWrite::poll_flush(b, cx), } } #[project] fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => AsyncWrite::poll_close(a, cx), EitherOutput::Second(b) => AsyncWrite::poll_close(b, cx), } } } impl Stream for EitherOutput where A: TryStream, B: TryStream, { type Item = Result>; #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => TryStream::try_poll_next(a, cx) .map(|v| v.map(|r| r.map_err(EitherError::A))), EitherOutput::Second(b) => TryStream::try_poll_next(b, cx) .map(|v| v.map(|r| r.map_err(EitherError::B))), } } } impl Sink for EitherOutput where A: Sink, B: Sink, { type Error = EitherError; #[project] fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => Sink::poll_ready(a, cx).map_err(EitherError::A), EitherOutput::Second(b) => Sink::poll_ready(b, cx).map_err(EitherError::B), } } #[project] fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { #[project] match self.project() { EitherOutput::First(a) => Sink::start_send(a, item).map_err(EitherError::A), EitherOutput::Second(b) => Sink::start_send(b, item).map_err(EitherError::B), } } #[project] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => Sink::poll_flush(a, cx).map_err(EitherError::A), EitherOutput::Second(b) => Sink::poll_flush(b, cx).map_err(EitherError::B), } } #[project] fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { #[project] match self.project() { EitherOutput::First(a) => Sink::poll_close(a, cx).map_err(EitherError::A), EitherOutput::Second(b) => Sink::poll_close(b, cx).map_err(EitherError::B), } } } impl StreamMuxer for EitherOutput where A: StreamMuxer, B: StreamMuxer, { type Substream = EitherOutput; type OutboundSubstream = EitherOutbound; type Error = IoError; fn poll_inbound(&self, cx: &mut Context) -> Poll> { match self { EitherOutput::First(inner) => inner.poll_inbound(cx).map(|p| p.map(EitherOutput::First)).map_err(|e| e.into()), EitherOutput::Second(inner) => inner.poll_inbound(cx).map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into()), } } fn open_outbound(&self) -> Self::OutboundSubstream { match self { EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()), EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()), } } fn poll_outbound(&self, cx: &mut Context, substream: &mut Self::OutboundSubstream) -> Poll> { match (self, substream) { (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => { inner.poll_outbound(cx, substream).map(|p| p.map(EitherOutput::First)).map_err(|e| e.into()) }, (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => { inner.poll_outbound(cx, substream).map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into()) }, _ => panic!("Wrong API usage") } } fn destroy_outbound(&self, substream: Self::OutboundSubstream) { match self { EitherOutput::First(inner) => { match substream { EitherOutbound::A(substream) => inner.destroy_outbound(substream), _ => panic!("Wrong API usage") } }, EitherOutput::Second(inner) => { match substream { EitherOutbound::B(substream) => inner.destroy_outbound(substream), _ => panic!("Wrong API usage") } }, } } fn read_substream(&self, cx: &mut Context, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { inner.read_substream(cx, sub, buf).map_err(|e| e.into()) }, (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { inner.read_substream(cx, sub, buf).map_err(|e| e.into()) }, _ => panic!("Wrong API usage") } } fn write_substream(&self, cx: &mut Context, sub: &mut Self::Substream, buf: &[u8]) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { inner.write_substream(cx, sub, buf).map_err(|e| e.into()) }, (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { inner.write_substream(cx, sub, buf).map_err(|e| e.into()) }, _ => panic!("Wrong API usage") } } fn flush_substream(&self, cx: &mut Context, sub: &mut Self::Substream) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { inner.flush_substream(cx, sub).map_err(|e| e.into()) }, (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { inner.flush_substream(cx, sub).map_err(|e| e.into()) }, _ => panic!("Wrong API usage") } } fn shutdown_substream(&self, cx: &mut Context, sub: &mut Self::Substream) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { inner.shutdown_substream(cx, sub).map_err(|e| e.into()) }, (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { inner.shutdown_substream(cx, sub).map_err(|e| e.into()) }, _ => panic!("Wrong API usage") } } fn destroy_substream(&self, substream: Self::Substream) { match self { EitherOutput::First(inner) => { match substream { EitherOutput::First(substream) => inner.destroy_substream(substream), _ => panic!("Wrong API usage") } }, EitherOutput::Second(inner) => { match substream { EitherOutput::Second(substream) => inner.destroy_substream(substream), _ => panic!("Wrong API usage") } }, } } fn is_remote_acknowledged(&self) -> bool { match self { EitherOutput::First(inner) => inner.is_remote_acknowledged(), EitherOutput::Second(inner) => inner.is_remote_acknowledged() } } fn close(&self, cx: &mut Context) -> Poll> { match self { EitherOutput::First(inner) => inner.close(cx).map_err(|e| e.into()), EitherOutput::Second(inner) => inner.close(cx).map_err(|e| e.into()), } } fn flush_all(&self, cx: &mut Context) -> Poll> { match self { EitherOutput::First(inner) => inner.flush_all(cx).map_err(|e| e.into()), EitherOutput::Second(inner) => inner.flush_all(cx).map_err(|e| e.into()), } } } #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherOutbound { A(A::OutboundSubstream), B(B::OutboundSubstream), } /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. #[pin_project] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherListenStream { First(#[pin] A), Second(#[pin] B), } impl Stream for EitherListenStream where AStream: TryStream>, BStream: TryStream>, { type Item = Result>, EitherError>; #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { #[project] match self.project() { EitherListenStream::First(a) => match TryStream::try_poll_next(a, cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First)))), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))), }, EitherListenStream::Second(a) => match TryStream::try_poll_next(a, cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second)))), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))), }, } } } /// Implements `Future` and dispatches all method calls to either `First` or `Second`. #[pin_project] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherFuture { First(#[pin] A), Second(#[pin] B), } impl Future for EitherFuture where AFuture: TryFuture, BFuture: TryFuture, { type Output = Result, EitherError>; #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { #[project] match self.project() { EitherFuture::First(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::First).map_err(EitherError::A), EitherFuture::Second(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::Second).map_err(EitherError::B), } } } #[pin_project] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherFuture2 { A(#[pin] A), B(#[pin] B) } impl Future for EitherFuture2 where AFut: TryFuture, BFut: TryFuture, { type Output = Result, EitherError>; #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { #[project] match self.project() { EitherFuture2::A(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::First).map_err(EitherError::A), EitherFuture2::B(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::Second).map_err(EitherError::B), } } } #[derive(Debug, Clone)] pub enum EitherName { A(A), B(B) } impl ProtocolName for EitherName { fn protocol_name(&self) -> &[u8] { match self { EitherName::A(a) => a.protocol_name(), EitherName::B(b) => b.protocol_name() } } }