From 4b41f5a9942add3f7a815bd2b79cb82eaed1ee05 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 23 Jan 2023 23:31:30 +1100 Subject: [PATCH] refactor(core)!: remove `EitherOutput` (#3341) The trick with this one is to use `futures::Either` everywhere where we may wrap something that implements any of the `futures` traits. This includes the output of `EitherFuture` itself. We also need to implement `StreamMuxer` on `future::Either` because `StreamMuxer`s may be the the `Output` of `InboundUpgrade`. --- core/CHANGELOG.md | 3 + core/src/either.rs | 207 ++++++------------------- core/src/transport/choice.rs | 5 +- core/src/upgrade/either.rs | 7 +- core/src/upgrade/select.rs | 7 +- protocols/dcutr/src/handler/relayed.rs | 6 +- protocols/identify/src/handler.rs | 9 +- protocols/kad/src/handler.rs | 6 +- swarm-derive/src/lib.rs | 10 +- swarm/src/behaviour/toggle.rs | 9 +- swarm/src/handler/either.rs | 102 +----------- swarm/src/handler/select.rs | 63 ++++---- swarm/src/lib.rs | 2 +- transports/quic/tests/smoke.rs | 5 +- transports/websocket/src/framed.rs | 11 +- 15 files changed, 123 insertions(+), 329 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index f9c45f5f..f25ee514 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -27,6 +27,8 @@ - Remove `EitherFuture2` in favor of `EitherFuture`. See [PR 3340]. +- Remove `EitherOutput` in favor of `future::Either`. See [PR 3341]. + [PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031 [PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058 [PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 @@ -36,6 +38,7 @@ [PR 3338]: https://github.com/libp2p/rust-libp2p/pull/3338 [PR 3339]: https://github.com/libp2p/rust-libp2p/pull/3339 [PR 3340]: https://github.com/libp2p/rust-libp2p/pull/3340 +[PR 3341]: https://github.com/libp2p/rust-libp2p/pull/3341 # 0.37.0 diff --git a/core/src/either.rs b/core/src/either.rs index b25e4e12..0641cd78 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -25,167 +25,30 @@ use crate::{ Multiaddr, ProtocolName, }; use either::Either; -use futures::{ - io::{IoSlice, IoSliceMut}, - prelude::*, -}; +use futures::prelude::*; use pin_project::pin_project; -use std::{io, pin::Pin, task::Context, task::Poll}; +use std::{pin::Pin, task::Context, task::Poll}; -/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to -/// either `First` or `Second`. -#[pin_project(project = EitherOutputProj)] -#[derive(Debug, Copy, Clone)] -pub enum EitherOutput { - First(#[pin] A), - Second(#[pin] B), -} - -impl AsyncRead for EitherOutput -where - A: AsyncRead, - B: AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => AsyncRead::poll_read(a, cx, buf), - EitherOutputProj::Second(b) => AsyncRead::poll_read(b, cx, buf), - } - } - - fn poll_read_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &mut [IoSliceMut<'_>], - ) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs), - EitherOutputProj::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs), - } - } -} - -impl AsyncWrite for EitherOutput -where - A: AsyncWrite, - B: AsyncWrite, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => AsyncWrite::poll_write(a, cx, buf), - EitherOutputProj::Second(b) => AsyncWrite::poll_write(b, cx, buf), - } - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[IoSlice<'_>], - ) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs), - EitherOutputProj::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs), - } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => AsyncWrite::poll_flush(a, cx), - EitherOutputProj::Second(b) => AsyncWrite::poll_flush(b, cx), - } - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => AsyncWrite::poll_close(a, cx), - EitherOutputProj::Second(b) => AsyncWrite::poll_close(b, cx), - } - } -} - -impl Stream for EitherOutput -where - A: TryStream, - B: TryStream, -{ - type Item = Result>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => { - TryStream::try_poll_next(a, cx).map(|v| v.map(|r| r.map_err(Either::Left))) - } - EitherOutputProj::Second(b) => { - TryStream::try_poll_next(b, cx).map(|v| v.map(|r| r.map_err(Either::Right))) - } - } - } -} - -impl Sink for EitherOutput -where - A: Sink, - B: Sink, -{ - type Error = Either; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => Sink::poll_ready(a, cx).map_err(Either::Left), - EitherOutputProj::Second(b) => Sink::poll_ready(b, cx).map_err(Either::Right), - } - } - - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - match self.project() { - EitherOutputProj::First(a) => Sink::start_send(a, item).map_err(Either::Left), - EitherOutputProj::Second(b) => Sink::start_send(b, item).map_err(Either::Right), - } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => Sink::poll_flush(a, cx).map_err(Either::Left), - EitherOutputProj::Second(b) => Sink::poll_flush(b, cx).map_err(Either::Right), - } - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(a) => Sink::poll_close(a, cx).map_err(Either::Left), - EitherOutputProj::Second(b) => Sink::poll_close(b, cx).map_err(Either::Right), - } - } -} - -impl StreamMuxer for EitherOutput +impl StreamMuxer for future::Either where A: StreamMuxer, B: StreamMuxer, { - type Substream = EitherOutput; + type Substream = future::Either; type Error = Either; fn poll_inbound( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.project() { - EitherOutputProj::First(inner) => inner + match as_pin_mut(self) { + future::Either::Left(inner) => inner .poll_inbound(cx) - .map_ok(EitherOutput::First) + .map_ok(future::Either::Left) .map_err(Either::Left), - EitherOutputProj::Second(inner) => inner + future::Either::Right(inner) => inner .poll_inbound(cx) - .map_ok(EitherOutput::Second) + .map_ok(future::Either::Right) .map_err(Either::Right), } } @@ -194,22 +57,22 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.project() { - EitherOutputProj::First(inner) => inner + match as_pin_mut(self) { + future::Either::Left(inner) => inner .poll_outbound(cx) - .map_ok(EitherOutput::First) + .map_ok(future::Either::Left) .map_err(Either::Left), - EitherOutputProj::Second(inner) => inner + future::Either::Right(inner) => inner .poll_outbound(cx) - .map_ok(EitherOutput::Second) + .map_ok(future::Either::Right) .map_err(Either::Right), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(Either::Left), - EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(Either::Right), + match as_pin_mut(self) { + future::Either::Left(inner) => inner.poll_close(cx).map_err(Either::Left), + future::Either::Right(inner) => inner.poll_close(cx).map_err(Either::Right), } } @@ -217,9 +80,31 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.project() { - EitherOutputProj::First(inner) => inner.poll(cx).map_err(Either::Left), - EitherOutputProj::Second(inner) => inner.poll(cx).map_err(Either::Right), + match as_pin_mut(self) { + future::Either::Left(inner) => inner.poll(cx).map_err(Either::Left), + future::Either::Right(inner) => inner.poll(cx).map_err(Either::Right), + } + } +} + +/// Convert `Pin<&mut Either>` to `Either, Pin<&mut B>>`, +/// pinned projections of the inner variants. +/// +/// Local function until is merged. +fn as_pin_mut( + either: Pin<&mut future::Either>, +) -> future::Either, Pin<&mut B>> { + // SAFETY: `get_unchecked_mut` is fine because we don't move anything. + // We can use `new_unchecked` because the `inner` parts are guaranteed + // to be pinned, as they come from `self` which is pinned, and we never + // offer an unpinned `&mut L` or `&mut R` through `Pin<&mut Self>`. We + // also don't have an implementation of `Drop`, nor manual `Unpin`. + unsafe { + match *Pin::get_unchecked_mut(either) { + future::Either::Left(ref mut inner) => future::Either::Left(Pin::new_unchecked(inner)), + future::Either::Right(ref mut inner) => { + future::Either::Right(Pin::new_unchecked(inner)) + } } } } @@ -238,15 +123,15 @@ where AFuture: TryFuture, BFuture: TryFuture, { - type Output = Result, Either>; + type Output = Result, Either>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { EitherFutureProj::First(a) => TryFuture::try_poll(a, cx) - .map_ok(EitherOutput::First) + .map_ok(future::Either::Left) .map_err(Either::Left), EitherFutureProj::Second(a) => TryFuture::try_poll(a, cx) - .map_ok(EitherOutput::Second) + .map_ok(future::Either::Right) .map_err(Either::Right), } } @@ -272,7 +157,7 @@ where B: Transport, A: Transport, { - type Output = EitherOutput; + type Output = future::Either; type Error = Either; type ListenerUpgrade = EitherFuture; type Dial = EitherFuture; diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index 2405f013..bb7d542d 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -18,9 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::either::{EitherFuture, EitherOutput}; +use crate::either::EitherFuture; use crate::transport::{ListenerId, Transport, TransportError, TransportEvent}; use either::Either; +use futures::future; use multiaddr::Multiaddr; use std::{pin::Pin, task::Context, task::Poll}; @@ -40,7 +41,7 @@ where B: Transport, A: Transport, { - type Output = EitherOutput; + type Output = future::Either; type Error = Either; type ListenerUpgrade = EitherFuture; type Dial = EitherFuture; diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 5785fda2..563e7651 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -19,10 +19,11 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - either::{EitherFuture, EitherName, EitherOutput}, + either::{EitherFuture, EitherName}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, }; use either::Either; +use futures::future; impl UpgradeInfo for Either where @@ -48,7 +49,7 @@ where A: InboundUpgrade, B: InboundUpgrade, { - type Output = EitherOutput; + type Output = future::Either; type Error = Either; type Future = EitherFuture; @@ -70,7 +71,7 @@ where A: OutboundUpgrade, B: OutboundUpgrade, { - type Output = EitherOutput; + type Output = future::Either; type Error = Either; type Future = EitherFuture; diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index cb3cdc23..1d5ab9ab 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -20,10 +20,11 @@ use crate::either::EitherFuture; use crate::{ - either::{EitherName, EitherOutput}, + either::EitherName, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, }; use either::Either; +use futures::future; /// Upgrade that combines two upgrades into one. Supports all the protocols supported by either /// sub-upgrade. @@ -65,7 +66,7 @@ where A: InboundUpgrade, B: InboundUpgrade, { - type Output = EitherOutput; + type Output = future::Either; type Error = Either; type Future = EitherFuture; @@ -82,7 +83,7 @@ where A: OutboundUpgrade, B: OutboundUpgrade, { - type Output = EitherOutput; + type Output = future::Either; type Error = Either; type Future = EitherFuture; diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 23a45841..0b487c30 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -22,9 +22,9 @@ use crate::protocol; use either::Either; +use futures::future; use futures::future::{BoxFuture, FutureExt}; use instant::Instant; -use libp2p_core::either::EitherOutput; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError}; use libp2p_core::ConnectedPoint; @@ -174,7 +174,7 @@ impl Handler { >, ) { match output { - EitherOutput::First(inbound_connect) => { + future::Either::Left(inbound_connect) => { let remote_addr = match &self.endpoint { ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), @@ -187,7 +187,7 @@ impl Handler { )); } // A connection listener denies all incoming substreams, thus none can ever be fully negotiated. - EitherOutput::Second(output) => void::unreachable(output), + future::Either::Right(output) => void::unreachable(output), } } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index e251d552..7b5f4b58 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -26,7 +26,6 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::FuturesUnordered; use futures_timer::Delay; -use libp2p_core::either::EitherOutput; use libp2p_core::upgrade::SelectUpgrade; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p_swarm::handler::{ @@ -201,7 +200,7 @@ impl Handler { >, ) { match output { - EitherOutput::First(substream) => { + future::Either::Left(substream) => { self.events .push(ConnectionHandlerEvent::Custom(Event::Identify)); if !self.reply_streams.is_empty() { @@ -213,7 +212,7 @@ impl Handler { } self.reply_streams.push_back(substream); } - EitherOutput::Second(fut) => { + future::Either::Right(fut) => { if self.inbound_identify_push.replace(fut).is_some() { warn!( "New inbound identify push stream from {} while still \ @@ -235,14 +234,14 @@ impl Handler { >, ) { match output { - EitherOutput::First(remote_info) => { + future::Either::Left(remote_info) => { self.events .push(ConnectionHandlerEvent::Custom(Event::Identified( remote_info, ))); self.keep_alive = KeepAlive::No; } - EitherOutput::Second(()) => self + future::Either::Right(()) => self .events .push(ConnectionHandlerEvent::Custom(Event::IdentificationPushed)), } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index a475c045..1c051624 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -27,7 +27,7 @@ use either::Either; use futures::prelude::*; use futures::stream::SelectAll; use instant::Instant; -use libp2p_core::{either::EitherOutput, upgrade, ConnectedPoint, PeerId}; +use libp2p_core::{upgrade, ConnectedPoint, PeerId}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; @@ -561,8 +561,8 @@ where // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol` // is a `Void`. let protocol = match protocol { - EitherOutput::First(p) => p, - EitherOutput::Second(p) => void::unreachable(p), + future::Either::Left(p) => p, + future::Either::Right(p) => void::unreachable(p), }; if let ProtocolStatus::Unconfirmed = self.protocol_status { diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 489ec8c7..a95d99a3 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -53,7 +53,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let multiaddr = quote! { #prelude_path::Multiaddr }; let trait_to_impl = quote! { #prelude_path::NetworkBehaviour }; - let either_ident = quote! { #prelude_path::EitherOutput }; + let either_ident = quote! { #prelude_path::Either }; let network_behaviour_action = quote! { #prelude_path::NetworkBehaviourAction }; let into_connection_handler = quote! { #prelude_path::IntoConnectionHandler }; let connection_handler = quote! { #prelude_path::ConnectionHandler }; @@ -531,13 +531,13 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { .enumerate() .map(|(enum_n, (field_n, field))| { let mut elem = if enum_n != 0 { - quote! { #either_ident::Second(ev) } + quote! { #either_ident::Right(ev) } } else { quote! { ev } }; for _ in 0..data_struct.fields.len() - 1 - enum_n { - elem = quote! { #either_ident::First(#elem) }; + elem = quote! { #either_ident::Left(#elem) }; } Some(match field.ident { @@ -599,12 +599,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { .expect("Fields of NetworkBehaviour implementation to be named."); let mut wrapped_event = if field_n != 0 { - quote!{ #either_ident::Second(event) } + quote!{ #either_ident::Right(event) } } else { quote!{ event } }; for _ in 0 .. data_struct.fields.len() - 1 - field_n { - wrapped_event = quote!{ #either_ident::First(#wrapped_event) }; + wrapped_event = quote!{ #either_ident::Left(#wrapped_event) }; } // `Dial` provides a handler of the specific behaviour triggering the diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 897d515c..8a384127 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -28,9 +28,8 @@ use crate::handler::{ use crate::upgrade::SendWrapper; use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use either::Either; -use libp2p_core::{ - either::EitherOutput, upgrade::DeniedUpgrade, ConnectedPoint, Multiaddr, PeerId, -}; +use futures::future; +use libp2p_core::{upgrade::DeniedUpgrade, ConnectedPoint, Multiaddr, PeerId}; use std::{task::Context, task::Poll}; /// Implementation of `NetworkBehaviour` that can be either in the disabled or enabled state. @@ -169,8 +168,8 @@ where >, ) { let out = match out { - EitherOutput::First(out) => out, - EitherOutput::Second(v) => void::unreachable(v), + future::Either::Left(out) => out, + future::Either::Right(v) => void::unreachable(v), }; if let Either::Left(info) = info { diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index 2345830e..e2c72bb3 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -20,12 +20,12 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, - IntoConnectionHandler, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol, + FullyNegotiatedInbound, InboundUpgradeSend, IntoConnectionHandler, KeepAlive, + ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use either::Either; -use libp2p_core::either::EitherOutput; +use futures::future; use libp2p_core::upgrade::UpgradeError; use libp2p_core::{ConnectedPoint, PeerId}; use std::task::{Context, Poll}; @@ -101,11 +101,11 @@ where ) -> Either, FullyNegotiatedInbound> { match self { FullyNegotiatedInbound { - protocol: EitherOutput::First(protocol), + protocol: future::Either::Left(protocol), info: Either::Left(info), } => Either::Left(FullyNegotiatedInbound { protocol, info }), FullyNegotiatedInbound { - protocol: EitherOutput::Second(protocol), + protocol: future::Either::Right(protocol), info: Either::Right(info), } => Either::Right(FullyNegotiatedInbound { protocol, info }), _ => unreachable!(), @@ -113,98 +113,6 @@ where } } -impl - FullyNegotiatedOutbound, SendWrapper>, Either> -where - LOP: OutboundUpgradeSend, - ROP: OutboundUpgradeSend, -{ - fn transpose( - self, - ) -> Either, FullyNegotiatedOutbound> { - match self { - FullyNegotiatedOutbound { - protocol: EitherOutput::First(protocol), - info: Either::Left(info), - } => Either::Left(FullyNegotiatedOutbound { protocol, info }), - FullyNegotiatedOutbound { - protocol: EitherOutput::Second(protocol), - info: Either::Right(info), - } => Either::Right(FullyNegotiatedOutbound { protocol, info }), - _ => unreachable!(), - } - } -} - -impl - DialUpgradeError, Either, SendWrapper>> -where - LOP: OutboundUpgradeSend, - ROP: OutboundUpgradeSend, -{ - fn transpose(self) -> Either, DialUpgradeError> { - match self { - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(error))), - info: Either::Left(info), - } => Either::Left(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), - info, - }), - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(error))), - info: Either::Right(info), - } => Either::Right(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), - info, - }), - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info: Either::Left(info), - } => Either::Left(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info, - }), - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info: Either::Right(info), - } => Either::Right(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info, - }), - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info: Either::Left(info), - } => Either::Left(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info, - }), - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info: Either::Right(info), - } => Either::Right(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info, - }), - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info: Either::Left(info), - } => Either::Left(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info, - }), - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info: Either::Right(info), - } => Either::Right(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info, - }), - _ => unreachable!(), - } - } -} - impl ListenUpgradeError, Either, SendWrapper>> where diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 535b06be..8c38ffe7 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -27,8 +27,8 @@ use crate::handler::{ use crate::upgrade::SendWrapper; use either::Either; +use futures::future; use libp2p_core::{ - either::EitherOutput, upgrade::{NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, ConnectedPoint, PeerId, }; @@ -101,27 +101,24 @@ impl ConnectionHandlerSelect { } impl - FullyNegotiatedOutbound< - Either, SendWrapper>, - EitherOutput, - > + FullyNegotiatedOutbound, SendWrapper>, Either> where S1OP: OutboundUpgradeSend, S2OP: OutboundUpgradeSend, S1OOI: Send + 'static, S2OOI: Send + 'static, { - fn transpose( + pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedOutbound> { match self { FullyNegotiatedOutbound { - protocol: EitherOutput::First(protocol), - info: EitherOutput::First(info), + protocol: future::Either::Left(protocol), + info: Either::Left(info), } => Either::Left(FullyNegotiatedOutbound { protocol, info }), FullyNegotiatedOutbound { - protocol: EitherOutput::Second(protocol), - info: EitherOutput::Second(info), + protocol: future::Either::Right(protocol), + info: Either::Right(info), } => Either::Right(FullyNegotiatedOutbound { protocol, info }), _ => panic!("wrong API usage: the protocol doesn't match the upgrade info"), } @@ -134,16 +131,16 @@ where S1IP: InboundUpgradeSend, S2IP: InboundUpgradeSend, { - fn transpose( + pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedInbound> { match self { FullyNegotiatedInbound { - protocol: EitherOutput::First(protocol), + protocol: future::Either::Left(protocol), info: (i1, _i2), } => Either::Left(FullyNegotiatedInbound { protocol, info: i1 }), FullyNegotiatedInbound { - protocol: EitherOutput::Second(protocol), + protocol: future::Either::Right(protocol), info: (_i1, i2), } => Either::Right(FullyNegotiatedInbound { protocol, info: i2 }), } @@ -151,66 +148,68 @@ where } impl - DialUpgradeError, Either, SendWrapper>> + DialUpgradeError, Either, SendWrapper>> where S1OP: OutboundUpgradeSend, S2OP: OutboundUpgradeSend, S1OOI: Send + 'static, S2OOI: Send + 'static, { - fn transpose(self) -> Either, DialUpgradeError> { + pub(crate) fn transpose( + self, + ) -> Either, DialUpgradeError> { match self { DialUpgradeError { - info: EitherOutput::First(info), + info: Either::Left(info), error: ConnectionHandlerUpgrErr::Timer, } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timer, }), DialUpgradeError { - info: EitherOutput::First(info), + info: Either::Left(info), error: ConnectionHandlerUpgrErr::Timeout, } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timeout, }), DialUpgradeError { - info: EitherOutput::First(info), + info: Either::Left(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), }), DialUpgradeError { - info: EitherOutput::First(info), + info: Either::Left(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(err))), } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), }), DialUpgradeError { - info: EitherOutput::Second(info), + info: Either::Right(info), error: ConnectionHandlerUpgrErr::Timer, } => Either::Right(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timer, }), DialUpgradeError { - info: EitherOutput::Second(info), + info: Either::Right(info), error: ConnectionHandlerUpgrErr::Timeout, } => Either::Right(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timeout, }), DialUpgradeError { - info: EitherOutput::Second(info), + info: Either::Right(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), } => Either::Right(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), }), DialUpgradeError { - info: EitherOutput::Second(info), + info: Either::Right(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(err))), } => Either::Right(DialUpgradeError { info, @@ -338,8 +337,8 @@ where TProto1: ConnectionHandler, TProto2: ConnectionHandler, { - type InEvent = EitherOutput; - type OutEvent = EitherOutput; + type InEvent = Either; + type OutEvent = Either; type Error = Either; type InboundProtocol = SelectUpgrade< SendWrapper<::InboundProtocol>, @@ -347,7 +346,7 @@ where >; type OutboundProtocol = Either, SendWrapper>; - type OutboundOpenInfo = EitherOutput; + type OutboundOpenInfo = Either; type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo); fn listen_protocol(&self) -> SubstreamProtocol { @@ -362,8 +361,8 @@ where fn on_behaviour_event(&mut self, event: Self::InEvent) { match event { - EitherOutput::First(event) => self.proto1.on_behaviour_event(event), - EitherOutput::Second(event) => self.proto2.on_behaviour_event(event), + Either::Left(event) => self.proto1.on_behaviour_event(event), + Either::Right(event) => self.proto2.on_behaviour_event(event), } } @@ -387,7 +386,7 @@ where > { match self.proto1.poll(cx) { Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::First(event))); + return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Left(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { return Poll::Ready(ConnectionHandlerEvent::Close(Either::Left(event))); @@ -396,7 +395,7 @@ where return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: protocol .map_upgrade(|u| Either::Left(SendWrapper(u))) - .map_info(EitherOutput::First), + .map_info(Either::Left), }); } Poll::Pending => (), @@ -404,7 +403,7 @@ where match self.proto2.poll(cx) { Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::Second(event))); + return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Right(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { return Poll::Ready(ConnectionHandlerEvent::Close(Either::Right(event))); @@ -413,7 +412,7 @@ where return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: protocol .map_upgrade(|u| Either::Right(SendWrapper(u))) - .map_info(EitherOutput::Second), + .map_info(Either::Right), }); } Poll::Pending => (), diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0e2b0abc..ee8a2808 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -92,8 +92,8 @@ pub mod derive_prelude { pub use crate::NetworkBehaviour; pub use crate::NetworkBehaviourAction; pub use crate::PollParameters; + pub use either::Either; pub use futures::prelude as futures; - pub use libp2p_core::either::EitherOutput; pub use libp2p_core::transport::ListenerId; pub use libp2p_core::ConnectedPoint; pub use libp2p_core::Multiaddr; diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index a1478645..7950bbdc 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -4,7 +4,6 @@ use futures::channel::{mpsc, oneshot}; use futures::future::{poll_fn, Either}; use futures::stream::StreamExt; use futures::{future, AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt}; -use libp2p_core::either::EitherOutput; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt, SubstreamBox}; use libp2p_core::transport::{Boxed, OrTransport, TransportEvent}; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr, PeerId, Transport}; @@ -131,8 +130,8 @@ fn new_tcp_quic_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { let transport = OrTransport::new(quic_transport, tcp_transport) .map(|either_output, _| match either_output { - EitherOutput::First((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - EitherOutput::Second((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), }) .boxed(); diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 0b069d27..318090d1 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -24,7 +24,6 @@ use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use futures_rustls::{client, rustls, server}; use libp2p_core::{ connection::Endpoint, - either::EitherOutput, multiaddr::{Multiaddr, Protocol}, transport::{ListenerId, TransportError, TransportEvent}, Transport, @@ -108,7 +107,7 @@ impl WsConfig { } } -type TlsOrPlain = EitherOutput, server::TlsStream>, T>; +type TlsOrPlain = future::Either, server::TlsStream>, T>; impl Transport for WsConfig where @@ -350,11 +349,11 @@ where }) .await?; - let stream: TlsOrPlain<_> = EitherOutput::First(EitherOutput::First(stream)); + let stream: TlsOrPlain<_> = future::Either::Left(future::Either::Left(stream)); stream } else { // continue with plain stream - EitherOutput::Second(stream) + future::Either::Right(stream) }; trace!("Sending websocket handshake to {}", addr.host_port); @@ -422,12 +421,12 @@ where }) .await?; - let stream: TlsOrPlain<_> = EitherOutput::First(EitherOutput::Second(stream)); + let stream: TlsOrPlain<_> = future::Either::Left(future::Either::Right(stream)); stream } else { // continue with plain stream - EitherOutput::Second(stream) + future::Either::Right(stream) }; trace!(