// Copyright 2019 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. #[allow(deprecated)] use crate::handler::IntoConnectionHandler; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use either::Either; use futures::future; use libp2p_core::{ upgrade::{NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, ConnectedPoint, }; use libp2p_identity::PeerId; use std::{cmp, task::Context, task::Poll}; /// Implementation of `IntoConnectionHandler` that combines two protocols into one. #[derive(Debug, Clone)] pub struct IntoConnectionHandlerSelect { /// The first protocol. proto1: TProto1, /// The second protocol. proto2: TProto2, } impl IntoConnectionHandlerSelect { /// Builds a `IntoConnectionHandlerSelect`. pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { IntoConnectionHandlerSelect { proto1, proto2 } } pub fn into_inner(self) -> (TProto1, TProto2) { (self.proto1, self.proto2) } } #[allow(deprecated)] impl IntoConnectionHandler for IntoConnectionHandlerSelect where TProto1: IntoConnectionHandler, TProto2: IntoConnectionHandler, { type Handler = ConnectionHandlerSelect; fn into_handler( self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint, ) -> Self::Handler { ConnectionHandlerSelect { proto1: self.proto1.into_handler(remote_peer_id, connected_point), proto2: self.proto2.into_handler(remote_peer_id, connected_point), } } fn inbound_protocol(&self) -> ::InboundProtocol { SelectUpgrade::new( SendWrapper(self.proto1.inbound_protocol()), SendWrapper(self.proto2.inbound_protocol()), ) } } /// Implementation of [`ConnectionHandler`] that combines two protocols into one. #[derive(Debug, Clone)] pub struct ConnectionHandlerSelect { /// The first protocol. proto1: TProto1, /// The second protocol. proto2: TProto2, } impl ConnectionHandlerSelect { /// Builds a [`ConnectionHandlerSelect`]. pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { ConnectionHandlerSelect { proto1, proto2 } } pub fn into_inner(self) -> (TProto1, TProto2) { (self.proto1, self.proto2) } } impl FullyNegotiatedOutbound, SendWrapper>, Either> where S1OP: OutboundUpgradeSend, S2OP: OutboundUpgradeSend, S1OOI: Send + 'static, S2OOI: Send + 'static, { pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedOutbound> { match self { FullyNegotiatedOutbound { protocol: future::Either::Left(protocol), info: Either::Left(info), } => Either::Left(FullyNegotiatedOutbound { protocol, info }), FullyNegotiatedOutbound { protocol: future::Either::Right(protocol), info: Either::Right(info), } => Either::Right(FullyNegotiatedOutbound { protocol, info }), _ => panic!("wrong API usage: the protocol doesn't match the upgrade info"), } } } impl FullyNegotiatedInbound, SendWrapper>, (S1IOI, S2IOI)> where S1IP: InboundUpgradeSend, S2IP: InboundUpgradeSend, { pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedInbound> { match self { FullyNegotiatedInbound { protocol: future::Either::Left(protocol), info: (i1, _i2), } => Either::Left(FullyNegotiatedInbound { protocol, info: i1 }), FullyNegotiatedInbound { protocol: future::Either::Right(protocol), info: (_i1, i2), } => Either::Right(FullyNegotiatedInbound { protocol, info: i2 }), } } } impl DialUpgradeError, Either, SendWrapper>> where S1OP: OutboundUpgradeSend, S2OP: OutboundUpgradeSend, S1OOI: Send + 'static, S2OOI: Send + 'static, { pub(crate) fn transpose( self, ) -> Either, DialUpgradeError> { match self { DialUpgradeError { info: Either::Left(info), error: ConnectionHandlerUpgrErr::Timer, } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timer, }), DialUpgradeError { info: Either::Left(info), error: ConnectionHandlerUpgrErr::Timeout, } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timeout, }), DialUpgradeError { info: Either::Left(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), }), DialUpgradeError { info: Either::Left(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(err))), } => Either::Left(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), }), DialUpgradeError { info: Either::Right(info), error: ConnectionHandlerUpgrErr::Timer, } => Either::Right(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timer, }), DialUpgradeError { info: Either::Right(info), error: ConnectionHandlerUpgrErr::Timeout, } => Either::Right(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Timeout, }), DialUpgradeError { info: Either::Right(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), } => Either::Right(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), }), DialUpgradeError { info: Either::Right(info), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(err))), } => Either::Right(DialUpgradeError { info, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), }), _ => panic!("Wrong API usage; the upgrade error doesn't match the outbound open info"), } } } impl ConnectionHandlerSelect where TProto1: ConnectionHandler, TProto2: ConnectionHandler, { fn on_listen_upgrade_error( &mut self, ListenUpgradeError { info: (i1, i2), error, }: ListenUpgradeError< ::InboundOpenInfo, ::InboundProtocol, >, ) { match error { ConnectionHandlerUpgrErr::Timer => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, error: ConnectionHandlerUpgrErr::Timer, })); self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, error: ConnectionHandlerUpgrErr::Timer, })); } ConnectionHandlerUpgrErr::Timeout => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, error: ConnectionHandlerUpgrErr::Timeout, })); self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, error: ConnectionHandlerUpgrErr::Timeout, })); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::Failed, )), })); self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::Failed, )), })); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::ProtocolError(e), )) => { let (e1, e2); match e { ProtocolError::IoError(e) => { e1 = NegotiationError::ProtocolError(ProtocolError::IoError( e.kind().into(), )); e2 = NegotiationError::ProtocolError(ProtocolError::IoError(e)) } ProtocolError::InvalidMessage => { e1 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); e2 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage) } ProtocolError::InvalidProtocol => { e1 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); e2 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol) } ProtocolError::TooManyProtocols => { e1 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols) } } self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)), })); self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)), })); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(e))) => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), })); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(e))) => { self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), })); } } } } impl ConnectionHandler for ConnectionHandlerSelect where TProto1: ConnectionHandler, TProto2: ConnectionHandler, { type InEvent = Either; type OutEvent = Either; type Error = Either; type InboundProtocol = SelectUpgrade< SendWrapper<::InboundProtocol>, SendWrapper<::InboundProtocol>, >; type OutboundProtocol = Either, SendWrapper>; type OutboundOpenInfo = Either; type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo); fn listen_protocol(&self) -> SubstreamProtocol { let proto1 = self.proto1.listen_protocol(); let proto2 = self.proto2.listen_protocol(); let timeout = *std::cmp::max(proto1.timeout(), proto2.timeout()); let (u1, i1) = proto1.into_upgrade(); let (u2, i2) = proto2.into_upgrade(); let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2)); SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout) } fn on_behaviour_event(&mut self, event: Self::InEvent) { match event { Either::Left(event) => self.proto1.on_behaviour_event(event), Either::Right(event) => self.proto2.on_behaviour_event(event), } } fn connection_keep_alive(&self) -> KeepAlive { cmp::max( self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive(), ) } fn poll( &mut self, cx: &mut Context<'_>, ) -> Poll< ConnectionHandlerEvent< Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error, >, > { match self.proto1.poll(cx) { Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Left(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { return Poll::Ready(ConnectionHandlerEvent::Close(Either::Left(event))); } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: protocol .map_upgrade(|u| Either::Left(SendWrapper(u))) .map_info(Either::Left), }); } Poll::Pending => (), }; match self.proto2.poll(cx) { Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Right(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { return Poll::Ready(ConnectionHandlerEvent::Close(Either::Right(event))); } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: protocol .map_upgrade(|u| Either::Right(SendWrapper(u))) .map_info(Either::Right), }); } Poll::Pending => (), }; Poll::Pending } fn on_connection_event( &mut self, event: ConnectionEvent< Self::InboundProtocol, Self::OutboundProtocol, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { match event { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { match fully_negotiated_outbound.transpose() { Either::Left(f) => self .proto1 .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)), Either::Right(f) => self .proto2 .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)), } } ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { match fully_negotiated_inbound.transpose() { Either::Left(f) => self .proto1 .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)), Either::Right(f) => self .proto2 .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)), } } ConnectionEvent::AddressChange(address) => { self.proto1 .on_connection_event(ConnectionEvent::AddressChange(AddressChange { new_address: address.new_address, })); self.proto2 .on_connection_event(ConnectionEvent::AddressChange(AddressChange { new_address: address.new_address, })); } ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { match dial_upgrade_error.transpose() { Either::Left(err) => self .proto1 .on_connection_event(ConnectionEvent::DialUpgradeError(err)), Either::Right(err) => self .proto2 .on_connection_event(ConnectionEvent::DialUpgradeError(err)), } } ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { self.on_listen_upgrade_error(listen_upgrade_error) } } } }