// Copyright 2017 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::{ muxing::StreamMuxer, transport::{ListenerId, Transport, TransportError, TransportEvent}, Multiaddr, ProtocolName, }; use futures::{ io::{IoSlice, IoSliceMut}, prelude::*, }; use pin_project::pin_project; use std::{fmt, io, pin::Pin, task::Context, task::Poll}; #[derive(Debug, Copy, Clone)] pub enum EitherError { A(A), B(B), } impl fmt::Display for EitherError where A: fmt::Display, B: fmt::Display, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { EitherError::A(a) => a.fmt(f), EitherError::B(b) => b.fmt(f), } } } impl std::error::Error for EitherError where A: std::error::Error, B: std::error::Error, { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { EitherError::A(a) => a.source(), EitherError::B(b) => b.source(), } } } /// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to /// either `First` or `Second`. #[pin_project(project = EitherOutputProj)] #[derive(Debug, Copy, Clone)] pub enum EitherOutput { First(#[pin] A), Second(#[pin] B), } impl AsyncRead for EitherOutput where A: AsyncRead, B: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { match self.project() { EitherOutputProj::First(a) => AsyncRead::poll_read(a, cx, buf), EitherOutputProj::Second(b) => AsyncRead::poll_read(b, cx, buf), } } fn poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { match self.project() { EitherOutputProj::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs), EitherOutputProj::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs), } } } impl AsyncWrite for EitherOutput where A: AsyncWrite, B: AsyncWrite, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { match self.project() { EitherOutputProj::First(a) => AsyncWrite::poll_write(a, cx, buf), EitherOutputProj::Second(b) => AsyncWrite::poll_write(b, cx, buf), } } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { match self.project() { EitherOutputProj::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs), EitherOutputProj::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { EitherOutputProj::First(a) => AsyncWrite::poll_flush(a, cx), EitherOutputProj::Second(b) => AsyncWrite::poll_flush(b, cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { EitherOutputProj::First(a) => AsyncWrite::poll_close(a, cx), EitherOutputProj::Second(b) => AsyncWrite::poll_close(b, cx), } } } impl Stream for EitherOutput where A: TryStream, B: TryStream, { type Item = Result>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { EitherOutputProj::First(a) => { TryStream::try_poll_next(a, cx).map(|v| v.map(|r| r.map_err(EitherError::A))) } EitherOutputProj::Second(b) => { TryStream::try_poll_next(b, cx).map(|v| v.map(|r| r.map_err(EitherError::B))) } } } } impl Sink for EitherOutput where A: Sink, B: Sink, { type Error = EitherError; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { EitherOutputProj::First(a) => Sink::poll_ready(a, cx).map_err(EitherError::A), EitherOutputProj::Second(b) => Sink::poll_ready(b, cx).map_err(EitherError::B), } } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { match self.project() { EitherOutputProj::First(a) => Sink::start_send(a, item).map_err(EitherError::A), EitherOutputProj::Second(b) => Sink::start_send(b, item).map_err(EitherError::B), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { EitherOutputProj::First(a) => Sink::poll_flush(a, cx).map_err(EitherError::A), EitherOutputProj::Second(b) => Sink::poll_flush(b, cx).map_err(EitherError::B), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { EitherOutputProj::First(a) => Sink::poll_close(a, cx).map_err(EitherError::A), EitherOutputProj::Second(b) => Sink::poll_close(b, cx).map_err(EitherError::B), } } } impl StreamMuxer for EitherOutput where A: StreamMuxer, B: StreamMuxer, { type Substream = EitherOutput; type Error = EitherError; fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner .poll_inbound(cx) .map_ok(EitherOutput::First) .map_err(EitherError::A), EitherOutput::Second(inner) => inner .poll_inbound(cx) .map_ok(EitherOutput::Second) .map_err(EitherError::B), } } fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner .poll_outbound(cx) .map_ok(EitherOutput::First) .map_err(EitherError::A), EitherOutput::Second(inner) => inner .poll_outbound(cx) .map_ok(EitherOutput::Second) .map_err(EitherError::B), } } fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A), EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B), } } fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A), EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), } } } /// Implements `Future` and dispatches all method calls to either `First` or `Second`. #[pin_project(project = EitherFutureProj)] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherFuture { First(#[pin] A), Second(#[pin] B), } impl Future for EitherFuture where AFuture: TryFuture, BFuture: TryFuture, { type Output = Result, EitherError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { EitherFutureProj::First(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::First) .map_err(EitherError::A), EitherFutureProj::Second(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::Second) .map_err(EitherError::B), } } } #[pin_project(project = EitherFuture2Proj)] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherFuture2 { A(#[pin] A), B(#[pin] B), } impl Future for EitherFuture2 where AFut: TryFuture, BFut: TryFuture, { type Output = Result, EitherError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { EitherFuture2Proj::A(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::First) .map_err(EitherError::A), EitherFuture2Proj::B(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::Second) .map_err(EitherError::B), } } } #[derive(Debug, Clone)] pub enum EitherName { A(A), B(B), } impl ProtocolName for EitherName { fn protocol_name(&self) -> &[u8] { match self { EitherName::A(a) => a.protocol_name(), EitherName::B(b) => b.protocol_name(), } } } #[pin_project(project = EitherTransportProj)] #[derive(Debug)] #[must_use = "transports do nothing unless polled"] pub enum EitherTransport { Left(#[pin] A), Right(#[pin] B), } impl Transport for EitherTransport where B: Transport, A: Transport, { type Output = EitherOutput; type Error = EitherError; type ListenerUpgrade = EitherFuture; type Dial = EitherFuture; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { match self.project() { EitherTransportProj::Left(a) => match a.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(event) => Poll::Ready( event .map_upgrade(EitherFuture::First) .map_err(EitherError::A), ), }, EitherTransportProj::Right(b) => match b.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(event) => Poll::Ready( event .map_upgrade(EitherFuture::Second) .map_err(EitherError::B), ), }, } } fn remove_listener(&mut self, id: ListenerId) -> bool { match self { EitherTransport::Left(t) => t.remove_listener(id), EitherTransport::Right(t) => t.remove_listener(id), } } fn listen_on(&mut self, addr: Multiaddr) -> Result> { use TransportError::*; match self { EitherTransport::Left(a) => a.listen_on(addr).map_err(|e| match e { MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr), Other(err) => Other(EitherError::A(err)), }), EitherTransport::Right(b) => b.listen_on(addr).map_err(|e| match e { MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr), Other(err) => Other(EitherError::B(err)), }), } } fn dial(&mut self, addr: Multiaddr) -> Result> { use TransportError::*; match self { EitherTransport::Left(a) => match a.dial(addr) { Ok(connec) => Ok(EitherFuture::First(connec)), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(EitherError::A(err))), }, EitherTransport::Right(b) => match b.dial(addr) { Ok(connec) => Ok(EitherFuture::Second(connec)), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(EitherError::B(err))), }, } } fn dial_as_listener( &mut self, addr: Multiaddr, ) -> Result> where Self: Sized, { use TransportError::*; match self { EitherTransport::Left(a) => match a.dial_as_listener(addr) { Ok(connec) => Ok(EitherFuture::First(connec)), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(EitherError::A(err))), }, EitherTransport::Right(b) => match b.dial_as_listener(addr) { Ok(connec) => Ok(EitherFuture::Second(connec)), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(EitherError::B(err))), }, } } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { match self { EitherTransport::Left(a) => a.address_translation(server, observed), EitherTransport::Right(b) => b.address_translation(server, observed), } } }