mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 02:21:21 +00:00
swarm/: Rename ProtocolsHandler to ConnectionHandler (#2527)
A `ProtocolsHandler`, now `ConnectionHandler`, handels a connection, not a protocol. Thus the name `CONNECTIONHandler` is more appropriate. Next to the rename of `ProtocolsHandler` this commit renames the `mod protocols_handler` to `mod handler`. Finally all combinators (e.g. `ProtocolsHandlerSelect`) are renamed appropriately.
This commit is contained in:
117
swarm/src/handler/dummy.rs
Normal file
117
swarm/src/handler/dummy.rs
Normal file
@ -0,0 +1,117 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::handler::{
|
||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use crate::NegotiatedSubstream;
|
||||
use libp2p_core::{
|
||||
upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade},
|
||||
Multiaddr,
|
||||
};
|
||||
use std::task::{Context, Poll};
|
||||
use void::Void;
|
||||
|
||||
/// Implementation of [`ConnectionHandler`] that doesn't handle anything.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DummyConnectionHandler {
|
||||
pub keep_alive: KeepAlive,
|
||||
}
|
||||
|
||||
impl Default for DummyConnectionHandler {
|
||||
fn default() -> Self {
|
||||
DummyConnectionHandler {
|
||||
keep_alive: KeepAlive::No,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectionHandler for DummyConnectionHandler {
|
||||
type InEvent = Void;
|
||||
type OutEvent = Void;
|
||||
type Error = Void;
|
||||
type InboundProtocol = DeniedUpgrade;
|
||||
type OutboundProtocol = DeniedUpgrade;
|
||||
type OutboundOpenInfo = Void;
|
||||
type InboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
SubstreamProtocol::new(DeniedUpgrade, ())
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
_: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
_: Self::InboundOpenInfo,
|
||||
) {
|
||||
unreachable!("`DeniedUpgrade` is never successful.");
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
_: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
v: Self::OutboundOpenInfo,
|
||||
) {
|
||||
void::unreachable(v)
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, v: Self::InEvent) {
|
||||
void::unreachable(v)
|
||||
}
|
||||
|
||||
fn inject_address_change(&mut self, _: &Multiaddr) {}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
_: Self::OutboundOpenInfo,
|
||||
_: ConnectionHandlerUpgrErr<
|
||||
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
|
||||
>,
|
||||
) {
|
||||
}
|
||||
|
||||
fn inject_listen_upgrade_error(
|
||||
&mut self,
|
||||
_: Self::InboundOpenInfo,
|
||||
_: ConnectionHandlerUpgrErr<
|
||||
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error,
|
||||
>,
|
||||
) {
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.keep_alive
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
305
swarm/src/handler/either.rs
Normal file
305
swarm/src/handler/either.rs
Normal file
@ -0,0 +1,305 @@
|
||||
// Copyright 2021 Protocol Labs.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::handler::{
|
||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
|
||||
KeepAlive, SubstreamProtocol,
|
||||
};
|
||||
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
|
||||
use either::Either;
|
||||
use libp2p_core::either::{EitherError, EitherOutput};
|
||||
use libp2p_core::upgrade::{EitherUpgrade, UpgradeError};
|
||||
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub enum IntoEitherHandler<L, R> {
|
||||
Left(L),
|
||||
Right(R),
|
||||
}
|
||||
|
||||
/// Implementation of a [`IntoConnectionHandler`] that represents either of two [`IntoConnectionHandler`]
|
||||
/// implementations.
|
||||
impl<L, R> IntoConnectionHandler for IntoEitherHandler<L, R>
|
||||
where
|
||||
L: IntoConnectionHandler,
|
||||
R: IntoConnectionHandler,
|
||||
{
|
||||
type Handler = Either<L::Handler, R::Handler>;
|
||||
|
||||
fn into_handler(self, p: &PeerId, c: &ConnectedPoint) -> Self::Handler {
|
||||
match self {
|
||||
IntoEitherHandler::Left(into_handler) => Either::Left(into_handler.into_handler(p, c)),
|
||||
IntoEitherHandler::Right(into_handler) => {
|
||||
Either::Right(into_handler.into_handler(p, c))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
|
||||
match self {
|
||||
IntoEitherHandler::Left(into_handler) => {
|
||||
EitherUpgrade::A(SendWrapper(into_handler.inbound_protocol()))
|
||||
}
|
||||
IntoEitherHandler::Right(into_handler) => {
|
||||
EitherUpgrade::B(SendWrapper(into_handler.inbound_protocol()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of a [`ConnectionHandler`] that represents either of two [`ConnectionHandler`]
|
||||
/// implementations.
|
||||
impl<L, R> ConnectionHandler for Either<L, R>
|
||||
where
|
||||
L: ConnectionHandler,
|
||||
R: ConnectionHandler,
|
||||
{
|
||||
type InEvent = Either<L::InEvent, R::InEvent>;
|
||||
type OutEvent = Either<L::OutEvent, R::OutEvent>;
|
||||
type Error = Either<L::Error, R::Error>;
|
||||
type InboundProtocol =
|
||||
EitherUpgrade<SendWrapper<L::InboundProtocol>, SendWrapper<R::InboundProtocol>>;
|
||||
type OutboundProtocol =
|
||||
EitherUpgrade<SendWrapper<L::OutboundProtocol>, SendWrapper<R::OutboundProtocol>>;
|
||||
type InboundOpenInfo = Either<L::InboundOpenInfo, R::InboundOpenInfo>;
|
||||
type OutboundOpenInfo = Either<L::OutboundOpenInfo, R::OutboundOpenInfo>;
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
match self {
|
||||
Either::Left(a) => a
|
||||
.listen_protocol()
|
||||
.map_upgrade(|u| EitherUpgrade::A(SendWrapper(u)))
|
||||
.map_info(Either::Left),
|
||||
Either::Right(b) => b
|
||||
.listen_protocol()
|
||||
.map_upgrade(|u| EitherUpgrade::B(SendWrapper(u)))
|
||||
.map_info(Either::Right),
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
output: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||
info: Self::OutboundOpenInfo,
|
||||
) {
|
||||
match (self, output, info) {
|
||||
(Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => {
|
||||
handler.inject_fully_negotiated_outbound(output, info)
|
||||
}
|
||||
(Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => {
|
||||
handler.inject_fully_negotiated_outbound(output, info)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
output: <Self::InboundProtocol as InboundUpgradeSend>::Output,
|
||||
info: Self::InboundOpenInfo,
|
||||
) {
|
||||
match (self, output, info) {
|
||||
(Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => {
|
||||
handler.inject_fully_negotiated_inbound(output, info)
|
||||
}
|
||||
(Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => {
|
||||
handler.inject_fully_negotiated_inbound(output, info)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, event: Self::InEvent) {
|
||||
match (self, event) {
|
||||
(Either::Left(handler), Either::Left(event)) => handler.inject_event(event),
|
||||
(Either::Right(handler), Either::Right(event)) => handler.inject_event(event),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||
match self {
|
||||
Either::Left(handler) => handler.inject_address_change(addr),
|
||||
Either::Right(handler) => handler.inject_address_change(addr),
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
info: Self::OutboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
|
||||
) {
|
||||
match error {
|
||||
ConnectionHandlerUpgrErr::Timer => match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
|
||||
}
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ConnectionHandlerUpgrErr::Timeout => match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout);
|
||||
}
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
|
||||
);
|
||||
}
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
|
||||
match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
|
||||
match (self, info) {
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_listen_upgrade_error(
|
||||
&mut self,
|
||||
info: Self::InboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
|
||||
) {
|
||||
match error {
|
||||
ConnectionHandlerUpgrErr::Timer => match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
|
||||
}
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ConnectionHandlerUpgrErr::Timeout => match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout);
|
||||
}
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_listen_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
|
||||
);
|
||||
}
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_listen_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
|
||||
match (self, info) {
|
||||
(Either::Left(handler), Either::Left(info)) => {
|
||||
handler.inject_listen_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
|
||||
match (self, info) {
|
||||
(Either::Right(handler), Either::Right(info)) => {
|
||||
handler.inject_listen_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
match self {
|
||||
Either::Left(handler) => handler.connection_keep_alive(),
|
||||
Either::Right(handler) => handler.connection_keep_alive(),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
let event = match self {
|
||||
Either::Left(handler) => futures::ready!(handler.poll(cx))
|
||||
.map_custom(Either::Left)
|
||||
.map_close(Either::Left)
|
||||
.map_protocol(|p| EitherUpgrade::A(SendWrapper(p)))
|
||||
.map_outbound_open_info(Either::Left),
|
||||
Either::Right(handler) => futures::ready!(handler.poll(cx))
|
||||
.map_custom(Either::Right)
|
||||
.map_close(Either::Right)
|
||||
.map_protocol(|p| EitherUpgrade::B(SendWrapper(p)))
|
||||
.map_outbound_open_info(Either::Right),
|
||||
};
|
||||
|
||||
Poll::Ready(event)
|
||||
}
|
||||
}
|
125
swarm/src/handler/map_in.rs
Normal file
125
swarm/src/handler/map_in.rs
Normal file
@ -0,0 +1,125 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::handler::{
|
||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
|
||||
use libp2p_core::Multiaddr;
|
||||
use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll};
|
||||
|
||||
/// Wrapper around a protocol handler that turns the input event into something else.
|
||||
pub struct MapInEvent<TProtoHandler, TNewIn, TMap> {
|
||||
inner: TProtoHandler,
|
||||
map: TMap,
|
||||
marker: PhantomData<TNewIn>,
|
||||
}
|
||||
|
||||
impl<TProtoHandler, TMap, TNewIn> MapInEvent<TProtoHandler, TNewIn, TMap> {
|
||||
/// Creates a `MapInEvent`.
|
||||
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
|
||||
MapInEvent {
|
||||
inner,
|
||||
map,
|
||||
marker: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TProtoHandler, TMap, TNewIn> ConnectionHandler for MapInEvent<TProtoHandler, TNewIn, TMap>
|
||||
where
|
||||
TProtoHandler: ConnectionHandler,
|
||||
TMap: Fn(TNewIn) -> Option<TProtoHandler::InEvent>,
|
||||
TNewIn: Debug + Send + 'static,
|
||||
TMap: Send + 'static,
|
||||
{
|
||||
type InEvent = TNewIn;
|
||||
type OutEvent = TProtoHandler::OutEvent;
|
||||
type Error = TProtoHandler::Error;
|
||||
type InboundProtocol = TProtoHandler::InboundProtocol;
|
||||
type OutboundProtocol = TProtoHandler::OutboundProtocol;
|
||||
type InboundOpenInfo = TProtoHandler::InboundOpenInfo;
|
||||
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
self.inner.listen_protocol()
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
|
||||
info: Self::InboundOpenInfo,
|
||||
) {
|
||||
self.inner.inject_fully_negotiated_inbound(protocol, info)
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||
info: Self::OutboundOpenInfo,
|
||||
) {
|
||||
self.inner.inject_fully_negotiated_outbound(protocol, info)
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, event: TNewIn) {
|
||||
if let Some(event) = (self.map)(event) {
|
||||
self.inner.inject_event(event);
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_address_change(addr)
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
info: Self::OutboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
|
||||
) {
|
||||
self.inner.inject_dial_upgrade_error(info, error)
|
||||
}
|
||||
|
||||
fn inject_listen_upgrade_error(
|
||||
&mut self,
|
||||
info: Self::InboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
|
||||
) {
|
||||
self.inner.inject_listen_upgrade_error(info, error)
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.inner.connection_keep_alive()
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
self.inner.poll(cx)
|
||||
}
|
||||
}
|
125
swarm/src/handler/map_out.rs
Normal file
125
swarm/src/handler/map_out.rs
Normal file
@ -0,0 +1,125 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::handler::{
|
||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
|
||||
use libp2p_core::Multiaddr;
|
||||
use std::fmt::Debug;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// Wrapper around a protocol handler that turns the output event into something else.
|
||||
pub struct MapOutEvent<TProtoHandler, TMap> {
|
||||
inner: TProtoHandler,
|
||||
map: TMap,
|
||||
}
|
||||
|
||||
impl<TProtoHandler, TMap> MapOutEvent<TProtoHandler, TMap> {
|
||||
/// Creates a `MapOutEvent`.
|
||||
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
|
||||
MapOutEvent { inner, map }
|
||||
}
|
||||
}
|
||||
|
||||
impl<TProtoHandler, TMap, TNewOut> ConnectionHandler for MapOutEvent<TProtoHandler, TMap>
|
||||
where
|
||||
TProtoHandler: ConnectionHandler,
|
||||
TMap: FnMut(TProtoHandler::OutEvent) -> TNewOut,
|
||||
TNewOut: Debug + Send + 'static,
|
||||
TMap: Send + 'static,
|
||||
{
|
||||
type InEvent = TProtoHandler::InEvent;
|
||||
type OutEvent = TNewOut;
|
||||
type Error = TProtoHandler::Error;
|
||||
type InboundProtocol = TProtoHandler::InboundProtocol;
|
||||
type OutboundProtocol = TProtoHandler::OutboundProtocol;
|
||||
type InboundOpenInfo = TProtoHandler::InboundOpenInfo;
|
||||
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
self.inner.listen_protocol()
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
|
||||
info: Self::InboundOpenInfo,
|
||||
) {
|
||||
self.inner.inject_fully_negotiated_inbound(protocol, info)
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||
info: Self::OutboundOpenInfo,
|
||||
) {
|
||||
self.inner.inject_fully_negotiated_outbound(protocol, info)
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, event: Self::InEvent) {
|
||||
self.inner.inject_event(event)
|
||||
}
|
||||
|
||||
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_address_change(addr)
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
info: Self::OutboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
|
||||
) {
|
||||
self.inner.inject_dial_upgrade_error(info, error)
|
||||
}
|
||||
|
||||
fn inject_listen_upgrade_error(
|
||||
&mut self,
|
||||
info: Self::InboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
|
||||
) {
|
||||
self.inner.inject_listen_upgrade_error(info, error)
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.inner.connection_keep_alive()
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
self.inner.poll(cx).map(|ev| match ev {
|
||||
ConnectionHandlerEvent::Custom(ev) => ConnectionHandlerEvent::Custom((self.map)(ev)),
|
||||
ConnectionHandlerEvent::Close(err) => ConnectionHandlerEvent::Close(err),
|
||||
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
|
||||
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
548
swarm/src/handler/multi.rs
Normal file
548
swarm/src/handler/multi.rs
Normal file
@ -0,0 +1,548 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! A [`ConnectionHandler`] implementation that combines multiple other [`ConnectionHandler`]s
|
||||
//! indexed by some key.
|
||||
|
||||
use crate::handler::{
|
||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
|
||||
KeepAlive, SubstreamProtocol,
|
||||
};
|
||||
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend};
|
||||
use crate::NegotiatedSubstream;
|
||||
use futures::{future::BoxFuture, prelude::*};
|
||||
use libp2p_core::upgrade::{NegotiationError, ProtocolError, ProtocolName, UpgradeError};
|
||||
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
|
||||
use rand::Rng;
|
||||
use std::{
|
||||
cmp,
|
||||
collections::{HashMap, HashSet},
|
||||
error,
|
||||
fmt::{self, Debug},
|
||||
hash::Hash,
|
||||
iter::{self, FromIterator},
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// A [`ConnectionHandler`] for multiple [`ConnectionHandler`]s of the same type.
|
||||
#[derive(Clone)]
|
||||
pub struct MultiHandler<K, H> {
|
||||
handlers: HashMap<K, H>,
|
||||
}
|
||||
|
||||
impl<K, H> fmt::Debug for MultiHandler<K, H>
|
||||
where
|
||||
K: fmt::Debug + Eq + Hash,
|
||||
H: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("MultiHandler")
|
||||
.field("handlers", &self.handlers)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> MultiHandler<K, H>
|
||||
where
|
||||
K: Hash + Eq,
|
||||
H: ConnectionHandler,
|
||||
{
|
||||
/// Create and populate a `MultiHandler` from the given handler iterator.
|
||||
///
|
||||
/// It is an error for any two protocols handlers to share the same protocol name.
|
||||
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
|
||||
where
|
||||
I: IntoIterator<Item = (K, H)>,
|
||||
{
|
||||
let m = MultiHandler {
|
||||
handlers: HashMap::from_iter(iter),
|
||||
};
|
||||
uniq_proto_names(
|
||||
m.handlers
|
||||
.values()
|
||||
.map(|h| h.listen_protocol().into_upgrade().0),
|
||||
)?;
|
||||
Ok(m)
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> ConnectionHandler for MultiHandler<K, H>
|
||||
where
|
||||
K: Clone + Debug + Hash + Eq + Send + 'static,
|
||||
H: ConnectionHandler,
|
||||
H::InboundProtocol: InboundUpgradeSend,
|
||||
H::OutboundProtocol: OutboundUpgradeSend,
|
||||
{
|
||||
type InEvent = (K, <H as ConnectionHandler>::InEvent);
|
||||
type OutEvent = (K, <H as ConnectionHandler>::OutEvent);
|
||||
type Error = <H as ConnectionHandler>::Error;
|
||||
type InboundProtocol = Upgrade<K, <H as ConnectionHandler>::InboundProtocol>;
|
||||
type OutboundProtocol = <H as ConnectionHandler>::OutboundProtocol;
|
||||
type InboundOpenInfo = Info<K, <H as ConnectionHandler>::InboundOpenInfo>;
|
||||
type OutboundOpenInfo = (K, <H as ConnectionHandler>::OutboundOpenInfo);
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
let (upgrade, info, timeout) = self
|
||||
.handlers
|
||||
.iter()
|
||||
.map(|(key, handler)| {
|
||||
let proto = handler.listen_protocol();
|
||||
let timeout = *proto.timeout();
|
||||
let (upgrade, info) = proto.into_upgrade();
|
||||
(key.clone(), (upgrade, info, timeout))
|
||||
})
|
||||
.fold(
|
||||
(Upgrade::new(), Info::new(), Duration::from_secs(0)),
|
||||
|(mut upg, mut inf, mut timeout), (k, (u, i, t))| {
|
||||
upg.upgrades.push((k.clone(), u));
|
||||
inf.infos.push((k, i));
|
||||
timeout = cmp::max(timeout, t);
|
||||
(upg, inf, timeout)
|
||||
},
|
||||
);
|
||||
SubstreamProtocol::new(upgrade, info).with_timeout(timeout)
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||
(key, arg): Self::OutboundOpenInfo,
|
||||
) {
|
||||
if let Some(h) = self.handlers.get_mut(&key) {
|
||||
h.inject_fully_negotiated_outbound(protocol, arg)
|
||||
} else {
|
||||
log::error!("inject_fully_negotiated_outbound: no handler for key")
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
(key, arg): <Self::InboundProtocol as InboundUpgradeSend>::Output,
|
||||
mut info: Self::InboundOpenInfo,
|
||||
) {
|
||||
if let Some(h) = self.handlers.get_mut(&key) {
|
||||
if let Some(i) = info.take(&key) {
|
||||
h.inject_fully_negotiated_inbound(arg, i)
|
||||
}
|
||||
} else {
|
||||
log::error!("inject_fully_negotiated_inbound: no handler for key")
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, (key, event): Self::InEvent) {
|
||||
if let Some(h) = self.handlers.get_mut(&key) {
|
||||
h.inject_event(event)
|
||||
} else {
|
||||
log::error!("inject_event: no handler for key")
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||
for h in self.handlers.values_mut() {
|
||||
h.inject_address_change(addr)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
(key, arg): Self::OutboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
|
||||
) {
|
||||
if let Some(h) = self.handlers.get_mut(&key) {
|
||||
h.inject_dial_upgrade_error(arg, error)
|
||||
} else {
|
||||
log::error!("inject_dial_upgrade_error: no handler for protocol")
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_listen_upgrade_error(
|
||||
&mut self,
|
||||
mut info: Self::InboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
|
||||
) {
|
||||
match error {
|
||||
ConnectionHandlerUpgrErr::Timer => {
|
||||
for (k, h) in &mut self.handlers {
|
||||
if let Some(i) = info.take(k) {
|
||||
h.inject_listen_upgrade_error(i, ConnectionHandlerUpgrErr::Timer)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Timeout => {
|
||||
for (k, h) in &mut self.handlers {
|
||||
if let Some(i) = info.take(k) {
|
||||
h.inject_listen_upgrade_error(i, ConnectionHandlerUpgrErr::Timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
|
||||
for (k, h) in &mut self.handlers {
|
||||
if let Some(i) = info.take(k) {
|
||||
h.inject_listen_upgrade_error(
|
||||
i,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
||||
NegotiationError::Failed,
|
||||
)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
||||
NegotiationError::ProtocolError(e),
|
||||
)) => match e {
|
||||
ProtocolError::IoError(e) => {
|
||||
for (k, h) in &mut self.handlers {
|
||||
if let Some(i) = info.take(k) {
|
||||
let e = NegotiationError::ProtocolError(ProtocolError::IoError(
|
||||
e.kind().into(),
|
||||
));
|
||||
h.inject_listen_upgrade_error(
|
||||
i,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
ProtocolError::InvalidMessage => {
|
||||
for (k, h) in &mut self.handlers {
|
||||
if let Some(i) = info.take(k) {
|
||||
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
|
||||
h.inject_listen_upgrade_error(
|
||||
i,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
ProtocolError::InvalidProtocol => {
|
||||
for (k, h) in &mut self.handlers {
|
||||
if let Some(i) = info.take(k) {
|
||||
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
|
||||
h.inject_listen_upgrade_error(
|
||||
i,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
ProtocolError::TooManyProtocols => {
|
||||
for (k, h) in &mut self.handlers {
|
||||
if let Some(i) = info.take(k) {
|
||||
let e =
|
||||
NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
|
||||
h.inject_listen_upgrade_error(
|
||||
i,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => {
|
||||
if let Some(h) = self.handlers.get_mut(&k) {
|
||||
if let Some(i) = info.take(&k) {
|
||||
h.inject_listen_upgrade_error(
|
||||
i,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.handlers
|
||||
.values()
|
||||
.map(|h| h.connection_keep_alive())
|
||||
.max()
|
||||
.unwrap_or(KeepAlive::No)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
// Calling `gen_range(0, 0)` (see below) would panic, so we have return early to avoid
|
||||
// that situation.
|
||||
if self.handlers.is_empty() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
// Not always polling handlers in the same order should give anyone the chance to make progress.
|
||||
let pos = rand::thread_rng().gen_range(0, self.handlers.len());
|
||||
|
||||
for (k, h) in self.handlers.iter_mut().skip(pos) {
|
||||
if let Poll::Ready(e) = h.poll(cx) {
|
||||
let e = e
|
||||
.map_outbound_open_info(|i| (k.clone(), i))
|
||||
.map_custom(|p| (k.clone(), p));
|
||||
return Poll::Ready(e);
|
||||
}
|
||||
}
|
||||
|
||||
for (k, h) in self.handlers.iter_mut().take(pos) {
|
||||
if let Poll::Ready(e) = h.poll(cx) {
|
||||
let e = e
|
||||
.map_outbound_open_info(|i| (k.clone(), i))
|
||||
.map_custom(|p| (k.clone(), p));
|
||||
return Poll::Ready(e);
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`IntoConnectionHandler`] for multiple other `IntoConnectionHandler`s.
|
||||
#[derive(Clone)]
|
||||
pub struct IntoMultiHandler<K, H> {
|
||||
handlers: HashMap<K, H>,
|
||||
}
|
||||
|
||||
impl<K, H> fmt::Debug for IntoMultiHandler<K, H>
|
||||
where
|
||||
K: fmt::Debug + Eq + Hash,
|
||||
H: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("IntoMultiHandler")
|
||||
.field("handlers", &self.handlers)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> IntoMultiHandler<K, H>
|
||||
where
|
||||
K: Hash + Eq,
|
||||
H: IntoConnectionHandler,
|
||||
{
|
||||
/// Create and populate an `IntoMultiHandler` from the given iterator.
|
||||
///
|
||||
/// It is an error for any two protocols handlers to share the same protocol name.
|
||||
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
|
||||
where
|
||||
I: IntoIterator<Item = (K, H)>,
|
||||
{
|
||||
let m = IntoMultiHandler {
|
||||
handlers: HashMap::from_iter(iter),
|
||||
};
|
||||
uniq_proto_names(m.handlers.values().map(|h| h.inbound_protocol()))?;
|
||||
Ok(m)
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> IntoConnectionHandler for IntoMultiHandler<K, H>
|
||||
where
|
||||
K: Debug + Clone + Eq + Hash + Send + 'static,
|
||||
H: IntoConnectionHandler,
|
||||
{
|
||||
type Handler = MultiHandler<K, H::Handler>;
|
||||
|
||||
fn into_handler(self, p: &PeerId, c: &ConnectedPoint) -> Self::Handler {
|
||||
MultiHandler {
|
||||
handlers: self
|
||||
.handlers
|
||||
.into_iter()
|
||||
.map(|(k, h)| (k, h.into_handler(p, c)))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
|
||||
Upgrade {
|
||||
upgrades: self
|
||||
.handlers
|
||||
.iter()
|
||||
.map(|(k, h)| (k.clone(), h.inbound_protocol()))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Index and protocol name pair used as `UpgradeInfo::Info`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IndexedProtoName<H>(usize, H);
|
||||
|
||||
impl<H: ProtocolName> ProtocolName for IndexedProtoName<H> {
|
||||
fn protocol_name(&self) -> &[u8] {
|
||||
self.1.protocol_name()
|
||||
}
|
||||
}
|
||||
|
||||
/// The aggregated `InboundOpenInfo`s of supported inbound substream protocols.
|
||||
#[derive(Clone)]
|
||||
pub struct Info<K, I> {
|
||||
infos: Vec<(K, I)>,
|
||||
}
|
||||
|
||||
impl<K: Eq, I> Info<K, I> {
|
||||
fn new() -> Self {
|
||||
Info { infos: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn take(&mut self, k: &K) -> Option<I> {
|
||||
if let Some(p) = self.infos.iter().position(|(key, _)| key == k) {
|
||||
return Some(self.infos.remove(p).1);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Inbound and outbound upgrade for all [`ConnectionHandler`]s.
|
||||
#[derive(Clone)]
|
||||
pub struct Upgrade<K, H> {
|
||||
upgrades: Vec<(K, H)>,
|
||||
}
|
||||
|
||||
impl<K, H> Upgrade<K, H> {
|
||||
fn new() -> Self {
|
||||
Upgrade {
|
||||
upgrades: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> fmt::Debug for Upgrade<K, H>
|
||||
where
|
||||
K: fmt::Debug + Eq + Hash,
|
||||
H: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Upgrade")
|
||||
.field("upgrades", &self.upgrades)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> UpgradeInfoSend for Upgrade<K, H>
|
||||
where
|
||||
H: UpgradeInfoSend,
|
||||
K: Send + 'static,
|
||||
{
|
||||
type Info = IndexedProtoName<H::Info>;
|
||||
type InfoIter = std::vec::IntoIter<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
self.upgrades
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, (_, h))| iter::repeat(i).zip(h.protocol_info()))
|
||||
.flatten()
|
||||
.map(|(i, h)| IndexedProtoName(i, h))
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> InboundUpgradeSend for Upgrade<K, H>
|
||||
where
|
||||
H: InboundUpgradeSend,
|
||||
K: Send + 'static,
|
||||
{
|
||||
type Output = (K, <H as InboundUpgradeSend>::Output);
|
||||
type Error = (K, <H as InboundUpgradeSend>::Error);
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_inbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future {
|
||||
let IndexedProtoName(index, info) = info;
|
||||
let (key, upgrade) = self.upgrades.remove(index);
|
||||
upgrade
|
||||
.upgrade_inbound(resource, info)
|
||||
.map(move |out| match out {
|
||||
Ok(o) => Ok((key, o)),
|
||||
Err(e) => Err((key, e)),
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, H> OutboundUpgradeSend for Upgrade<K, H>
|
||||
where
|
||||
H: OutboundUpgradeSend,
|
||||
K: Send + 'static,
|
||||
{
|
||||
type Output = (K, <H as OutboundUpgradeSend>::Output);
|
||||
type Error = (K, <H as OutboundUpgradeSend>::Error);
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future {
|
||||
let IndexedProtoName(index, info) = info;
|
||||
let (key, upgrade) = self.upgrades.remove(index);
|
||||
upgrade
|
||||
.upgrade_outbound(resource, info)
|
||||
.map(move |out| match out {
|
||||
Ok(o) => Ok((key, o)),
|
||||
Err(e) => Err((key, e)),
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// Check that no two protocol names are equal.
|
||||
fn uniq_proto_names<I, T>(iter: I) -> Result<(), DuplicateProtonameError>
|
||||
where
|
||||
I: Iterator<Item = T>,
|
||||
T: UpgradeInfoSend,
|
||||
{
|
||||
let mut set = HashSet::new();
|
||||
for infos in iter {
|
||||
for i in infos.protocol_info() {
|
||||
let v = Vec::from(i.protocol_name());
|
||||
if set.contains(&v) {
|
||||
return Err(DuplicateProtonameError(v));
|
||||
} else {
|
||||
set.insert(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It is an error if two handlers share the same protocol name.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DuplicateProtonameError(Vec<u8>);
|
||||
|
||||
impl DuplicateProtonameError {
|
||||
/// The protocol name bytes that occured in more than one handler.
|
||||
pub fn protocol_name(&self) -> &[u8] {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DuplicateProtonameError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if let Ok(s) = std::str::from_utf8(&self.0) {
|
||||
write!(f, "duplicate protocol name: {}", s)
|
||||
} else {
|
||||
write!(f, "duplicate protocol name: {:?}", self.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for DuplicateProtonameError {}
|
265
swarm/src/handler/one_shot.rs
Normal file
265
swarm/src/handler/one_shot.rs
Normal file
@ -0,0 +1,265 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::handler::{
|
||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
|
||||
use instant::Instant;
|
||||
use smallvec::SmallVec;
|
||||
use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration};
|
||||
|
||||
/// A [`ConnectionHandler`] that opens a new substream for each request.
|
||||
// TODO: Debug
|
||||
pub struct OneShotHandler<TInbound, TOutbound, TEvent>
|
||||
where
|
||||
TOutbound: OutboundUpgradeSend,
|
||||
{
|
||||
/// The upgrade for inbound substreams.
|
||||
listen_protocol: SubstreamProtocol<TInbound, ()>,
|
||||
/// If `Some`, something bad happened and we should shut down the handler with an error.
|
||||
pending_error: Option<ConnectionHandlerUpgrErr<<TOutbound as OutboundUpgradeSend>::Error>>,
|
||||
/// Queue of events to produce in `poll()`.
|
||||
events_out: SmallVec<[TEvent; 4]>,
|
||||
/// Queue of outbound substreams to open.
|
||||
dial_queue: SmallVec<[TOutbound; 4]>,
|
||||
/// Current number of concurrent outbound substreams being opened.
|
||||
dial_negotiated: u32,
|
||||
/// Value to return from `connection_keep_alive`.
|
||||
keep_alive: KeepAlive,
|
||||
/// The configuration container for the handler
|
||||
config: OneShotHandlerConfig,
|
||||
}
|
||||
|
||||
impl<TInbound, TOutbound, TEvent> OneShotHandler<TInbound, TOutbound, TEvent>
|
||||
where
|
||||
TOutbound: OutboundUpgradeSend,
|
||||
{
|
||||
/// Creates a `OneShotHandler`.
|
||||
pub fn new(
|
||||
listen_protocol: SubstreamProtocol<TInbound, ()>,
|
||||
config: OneShotHandlerConfig,
|
||||
) -> Self {
|
||||
OneShotHandler {
|
||||
listen_protocol,
|
||||
pending_error: None,
|
||||
events_out: SmallVec::new(),
|
||||
dial_queue: SmallVec::new(),
|
||||
dial_negotiated: 0,
|
||||
keep_alive: KeepAlive::Yes,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of pending requests.
|
||||
pub fn pending_requests(&self) -> u32 {
|
||||
self.dial_negotiated + self.dial_queue.len() as u32
|
||||
}
|
||||
|
||||
/// Returns a reference to the listen protocol configuration.
|
||||
///
|
||||
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
||||
/// > substreams, not the ones already being negotiated.
|
||||
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<TInbound, ()> {
|
||||
&self.listen_protocol
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the listen protocol configuration.
|
||||
///
|
||||
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
||||
/// > substreams, not the ones already being negotiated.
|
||||
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<TInbound, ()> {
|
||||
&mut self.listen_protocol
|
||||
}
|
||||
|
||||
/// Opens an outbound substream with `upgrade`.
|
||||
pub fn send_request(&mut self, upgrade: TOutbound) {
|
||||
self.keep_alive = KeepAlive::Yes;
|
||||
self.dial_queue.push(upgrade);
|
||||
}
|
||||
}
|
||||
|
||||
impl<TInbound, TOutbound, TEvent> Default for OneShotHandler<TInbound, TOutbound, TEvent>
|
||||
where
|
||||
TOutbound: OutboundUpgradeSend,
|
||||
TInbound: InboundUpgradeSend + Default,
|
||||
{
|
||||
fn default() -> Self {
|
||||
OneShotHandler::new(
|
||||
SubstreamProtocol::new(Default::default(), ()),
|
||||
OneShotHandlerConfig::default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TInbound, TOutbound, TEvent> ConnectionHandler for OneShotHandler<TInbound, TOutbound, TEvent>
|
||||
where
|
||||
TInbound: InboundUpgradeSend + Send + 'static,
|
||||
TOutbound: Debug + OutboundUpgradeSend,
|
||||
TInbound::Output: Into<TEvent>,
|
||||
TOutbound::Output: Into<TEvent>,
|
||||
TOutbound::Error: error::Error + Send + 'static,
|
||||
SubstreamProtocol<TInbound, ()>: Clone,
|
||||
TEvent: Debug + Send + 'static,
|
||||
{
|
||||
type InEvent = TOutbound;
|
||||
type OutEvent = TEvent;
|
||||
type Error = ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>;
|
||||
type InboundProtocol = TInbound;
|
||||
type OutboundProtocol = TOutbound;
|
||||
type OutboundOpenInfo = ();
|
||||
type InboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
self.listen_protocol.clone()
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
out: <Self::InboundProtocol as InboundUpgradeSend>::Output,
|
||||
(): Self::InboundOpenInfo,
|
||||
) {
|
||||
// If we're shutting down the connection for inactivity, reset the timeout.
|
||||
if !self.keep_alive.is_yes() {
|
||||
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout);
|
||||
}
|
||||
|
||||
self.events_out.push(out.into());
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
out: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||
_: Self::OutboundOpenInfo,
|
||||
) {
|
||||
self.dial_negotiated -= 1;
|
||||
self.events_out.push(out.into());
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, event: Self::InEvent) {
|
||||
self.send_request(event);
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
_info: Self::OutboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
|
||||
) {
|
||||
if self.pending_error.is_none() {
|
||||
self.pending_error = Some(error);
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.keep_alive
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
if let Some(err) = self.pending_error.take() {
|
||||
return Poll::Ready(ConnectionHandlerEvent::Close(err));
|
||||
}
|
||||
|
||||
if !self.events_out.is_empty() {
|
||||
return Poll::Ready(ConnectionHandlerEvent::Custom(self.events_out.remove(0)));
|
||||
} else {
|
||||
self.events_out.shrink_to_fit();
|
||||
}
|
||||
|
||||
if !self.dial_queue.is_empty() {
|
||||
if self.dial_negotiated < self.config.max_dial_negotiated {
|
||||
self.dial_negotiated += 1;
|
||||
let upgrade = self.dial_queue.remove(0);
|
||||
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(upgrade, ())
|
||||
.with_timeout(self.config.outbound_substream_timeout),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
self.dial_queue.shrink_to_fit();
|
||||
|
||||
if self.dial_negotiated == 0 && self.keep_alive.is_yes() {
|
||||
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout);
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration parameters for the `OneShotHandler`
|
||||
#[derive(Debug)]
|
||||
pub struct OneShotHandlerConfig {
|
||||
/// Keep-alive timeout for idle connections.
|
||||
pub keep_alive_timeout: Duration,
|
||||
/// Timeout for outbound substream upgrades.
|
||||
pub outbound_substream_timeout: Duration,
|
||||
/// Maximum number of concurrent outbound substreams being opened.
|
||||
pub max_dial_negotiated: u32,
|
||||
}
|
||||
|
||||
impl Default for OneShotHandlerConfig {
|
||||
fn default() -> Self {
|
||||
OneShotHandlerConfig {
|
||||
keep_alive_timeout: Duration::from_secs(10),
|
||||
outbound_substream_timeout: Duration::from_secs(10),
|
||||
max_dial_negotiated: 8,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use futures::executor::block_on;
|
||||
use futures::future::poll_fn;
|
||||
use libp2p_core::upgrade::DeniedUpgrade;
|
||||
use void::Void;
|
||||
|
||||
#[test]
|
||||
fn do_not_keep_idle_connection_alive() {
|
||||
let mut handler: OneShotHandler<_, DeniedUpgrade, Void> = OneShotHandler::new(
|
||||
SubstreamProtocol::new(DeniedUpgrade {}, ()),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
block_on(poll_fn(|cx| loop {
|
||||
if let Poll::Pending = handler.poll(cx) {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
}));
|
||||
|
||||
assert!(matches!(
|
||||
handler.connection_keep_alive(),
|
||||
KeepAlive::Until(_)
|
||||
));
|
||||
}
|
||||
}
|
372
swarm/src/handler/select.rs
Normal file
372
swarm/src/handler/select.rs
Normal file
@ -0,0 +1,372 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::handler::{
|
||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
|
||||
KeepAlive, SubstreamProtocol,
|
||||
};
|
||||
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
|
||||
|
||||
use libp2p_core::{
|
||||
either::{EitherError, EitherOutput},
|
||||
upgrade::{EitherUpgrade, NegotiationError, ProtocolError, SelectUpgrade, UpgradeError},
|
||||
ConnectedPoint, Multiaddr, PeerId,
|
||||
};
|
||||
use std::{cmp, task::Context, task::Poll};
|
||||
|
||||
/// Implementation of `IntoConnectionHandler` that combines two protocols into one.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IntoConnectionHandlerSelect<TProto1, TProto2> {
|
||||
/// The first protocol.
|
||||
proto1: TProto1,
|
||||
/// The second protocol.
|
||||
proto2: TProto2,
|
||||
}
|
||||
|
||||
impl<TProto1, TProto2> IntoConnectionHandlerSelect<TProto1, TProto2> {
|
||||
/// Builds a `IntoConnectionHandlerSelect`.
|
||||
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
|
||||
IntoConnectionHandlerSelect { proto1, proto2 }
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> (TProto1, TProto2) {
|
||||
(self.proto1, self.proto2)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TProto1, TProto2> IntoConnectionHandler for IntoConnectionHandlerSelect<TProto1, TProto2>
|
||||
where
|
||||
TProto1: IntoConnectionHandler,
|
||||
TProto2: IntoConnectionHandler,
|
||||
{
|
||||
type Handler = ConnectionHandlerSelect<TProto1::Handler, TProto2::Handler>;
|
||||
|
||||
fn into_handler(
|
||||
self,
|
||||
remote_peer_id: &PeerId,
|
||||
connected_point: &ConnectedPoint,
|
||||
) -> Self::Handler {
|
||||
ConnectionHandlerSelect {
|
||||
proto1: self.proto1.into_handler(remote_peer_id, connected_point),
|
||||
proto2: self.proto2.into_handler(remote_peer_id, connected_point),
|
||||
}
|
||||
}
|
||||
|
||||
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
|
||||
SelectUpgrade::new(
|
||||
SendWrapper(self.proto1.inbound_protocol()),
|
||||
SendWrapper(self.proto2.inbound_protocol()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of [`ConnectionHandler`] that combines two protocols into one.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ConnectionHandlerSelect<TProto1, TProto2> {
|
||||
/// The first protocol.
|
||||
proto1: TProto1,
|
||||
/// The second protocol.
|
||||
proto2: TProto2,
|
||||
}
|
||||
|
||||
impl<TProto1, TProto2> ConnectionHandlerSelect<TProto1, TProto2> {
|
||||
/// Builds a [`ConnectionHandlerSelect`].
|
||||
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
|
||||
ConnectionHandlerSelect { proto1, proto2 }
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> (TProto1, TProto2) {
|
||||
(self.proto1, self.proto2)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TProto1, TProto2> ConnectionHandler for ConnectionHandlerSelect<TProto1, TProto2>
|
||||
where
|
||||
TProto1: ConnectionHandler,
|
||||
TProto2: ConnectionHandler,
|
||||
{
|
||||
type InEvent = EitherOutput<TProto1::InEvent, TProto2::InEvent>;
|
||||
type OutEvent = EitherOutput<TProto1::OutEvent, TProto2::OutEvent>;
|
||||
type Error = EitherError<TProto1::Error, TProto2::Error>;
|
||||
type InboundProtocol = SelectUpgrade<
|
||||
SendWrapper<<TProto1 as ConnectionHandler>::InboundProtocol>,
|
||||
SendWrapper<<TProto2 as ConnectionHandler>::InboundProtocol>,
|
||||
>;
|
||||
type OutboundProtocol = EitherUpgrade<
|
||||
SendWrapper<TProto1::OutboundProtocol>,
|
||||
SendWrapper<TProto2::OutboundProtocol>,
|
||||
>;
|
||||
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;
|
||||
type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo);
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
let proto1 = self.proto1.listen_protocol();
|
||||
let proto2 = self.proto2.listen_protocol();
|
||||
let timeout = *std::cmp::max(proto1.timeout(), proto2.timeout());
|
||||
let (u1, i1) = proto1.into_upgrade();
|
||||
let (u2, i2) = proto2.into_upgrade();
|
||||
let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2));
|
||||
SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout)
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||
endpoint: Self::OutboundOpenInfo,
|
||||
) {
|
||||
match (protocol, endpoint) {
|
||||
(EitherOutput::First(protocol), EitherOutput::First(info)) => {
|
||||
self.proto1.inject_fully_negotiated_outbound(protocol, info)
|
||||
}
|
||||
(EitherOutput::Second(protocol), EitherOutput::Second(info)) => {
|
||||
self.proto2.inject_fully_negotiated_outbound(protocol, info)
|
||||
}
|
||||
(EitherOutput::First(_), EitherOutput::Second(_)) => {
|
||||
panic!("wrong API usage: the protocol doesn't match the upgrade info")
|
||||
}
|
||||
(EitherOutput::Second(_), EitherOutput::First(_)) => {
|
||||
panic!("wrong API usage: the protocol doesn't match the upgrade info")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
|
||||
(i1, i2): Self::InboundOpenInfo,
|
||||
) {
|
||||
match protocol {
|
||||
EitherOutput::First(protocol) => {
|
||||
self.proto1.inject_fully_negotiated_inbound(protocol, i1)
|
||||
}
|
||||
EitherOutput::Second(protocol) => {
|
||||
self.proto2.inject_fully_negotiated_inbound(protocol, i2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, event: Self::InEvent) {
|
||||
match event {
|
||||
EitherOutput::First(event) => self.proto1.inject_event(event),
|
||||
EitherOutput::Second(event) => self.proto2.inject_event(event),
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_address_change(&mut self, new_address: &Multiaddr) {
|
||||
self.proto1.inject_address_change(new_address);
|
||||
self.proto2.inject_address_change(new_address)
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
info: Self::OutboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
|
||||
) {
|
||||
match (info, error) {
|
||||
(EitherOutput::First(info), ConnectionHandlerUpgrErr::Timer) => self
|
||||
.proto1
|
||||
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer),
|
||||
(EitherOutput::First(info), ConnectionHandlerUpgrErr::Timeout) => self
|
||||
.proto1
|
||||
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout),
|
||||
(
|
||||
EitherOutput::First(info),
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||
) => self.proto1.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||
),
|
||||
(
|
||||
EitherOutput::First(info),
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))),
|
||||
) => self.proto1.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
|
||||
),
|
||||
(
|
||||
EitherOutput::First(_),
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(_))),
|
||||
) => {
|
||||
panic!("Wrong API usage; the upgrade error doesn't match the outbound open info");
|
||||
}
|
||||
(EitherOutput::Second(info), ConnectionHandlerUpgrErr::Timeout) => self
|
||||
.proto2
|
||||
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout),
|
||||
(EitherOutput::Second(info), ConnectionHandlerUpgrErr::Timer) => self
|
||||
.proto2
|
||||
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer),
|
||||
(
|
||||
EitherOutput::Second(info),
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||
) => self.proto2.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||
),
|
||||
(
|
||||
EitherOutput::Second(info),
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))),
|
||||
) => self.proto2.inject_dial_upgrade_error(
|
||||
info,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
|
||||
),
|
||||
(
|
||||
EitherOutput::Second(_),
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(_))),
|
||||
) => {
|
||||
panic!("Wrong API usage; the upgrade error doesn't match the outbound open info");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_listen_upgrade_error(
|
||||
&mut self,
|
||||
(i1, i2): Self::InboundOpenInfo,
|
||||
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
|
||||
) {
|
||||
match error {
|
||||
ConnectionHandlerUpgrErr::Timer => {
|
||||
self.proto1
|
||||
.inject_listen_upgrade_error(i1, ConnectionHandlerUpgrErr::Timer);
|
||||
self.proto2
|
||||
.inject_listen_upgrade_error(i2, ConnectionHandlerUpgrErr::Timer)
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Timeout => {
|
||||
self.proto1
|
||||
.inject_listen_upgrade_error(i1, ConnectionHandlerUpgrErr::Timeout);
|
||||
self.proto2
|
||||
.inject_listen_upgrade_error(i2, ConnectionHandlerUpgrErr::Timeout)
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
|
||||
self.proto1.inject_listen_upgrade_error(
|
||||
i1,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
||||
NegotiationError::Failed,
|
||||
)),
|
||||
);
|
||||
self.proto2.inject_listen_upgrade_error(
|
||||
i2,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
||||
NegotiationError::Failed,
|
||||
)),
|
||||
);
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
||||
NegotiationError::ProtocolError(e),
|
||||
)) => {
|
||||
let (e1, e2);
|
||||
match e {
|
||||
ProtocolError::IoError(e) => {
|
||||
e1 = NegotiationError::ProtocolError(ProtocolError::IoError(
|
||||
e.kind().into(),
|
||||
));
|
||||
e2 = NegotiationError::ProtocolError(ProtocolError::IoError(e))
|
||||
}
|
||||
ProtocolError::InvalidMessage => {
|
||||
e1 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
|
||||
e2 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage)
|
||||
}
|
||||
ProtocolError::InvalidProtocol => {
|
||||
e1 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
|
||||
e2 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol)
|
||||
}
|
||||
ProtocolError::TooManyProtocols => {
|
||||
e1 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
|
||||
e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols)
|
||||
}
|
||||
}
|
||||
self.proto1.inject_listen_upgrade_error(
|
||||
i1,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)),
|
||||
);
|
||||
self.proto2.inject_listen_upgrade_error(
|
||||
i2,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)),
|
||||
)
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
|
||||
self.proto1.inject_listen_upgrade_error(
|
||||
i1,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||
)
|
||||
}
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
|
||||
self.proto2.inject_listen_upgrade_error(
|
||||
i2,
|
||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
cmp::max(
|
||||
self.proto1.connection_keep_alive(),
|
||||
self.proto2.connection_keep_alive(),
|
||||
)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
match self.proto1.poll(cx) {
|
||||
Poll::Ready(ConnectionHandlerEvent::Custom(event)) => {
|
||||
return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::First(event)));
|
||||
}
|
||||
Poll::Ready(ConnectionHandlerEvent::Close(event)) => {
|
||||
return Poll::Ready(ConnectionHandlerEvent::Close(EitherError::A(event)));
|
||||
}
|
||||
Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => {
|
||||
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: protocol
|
||||
.map_upgrade(|u| EitherUpgrade::A(SendWrapper(u)))
|
||||
.map_info(EitherOutput::First),
|
||||
});
|
||||
}
|
||||
Poll::Pending => (),
|
||||
};
|
||||
|
||||
match self.proto2.poll(cx) {
|
||||
Poll::Ready(ConnectionHandlerEvent::Custom(event)) => {
|
||||
return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::Second(event)));
|
||||
}
|
||||
Poll::Ready(ConnectionHandlerEvent::Close(event)) => {
|
||||
return Poll::Ready(ConnectionHandlerEvent::Close(EitherError::B(event)));
|
||||
}
|
||||
Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => {
|
||||
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: protocol
|
||||
.map_upgrade(|u| EitherUpgrade::B(SendWrapper(u)))
|
||||
.map_info(EitherOutput::Second),
|
||||
});
|
||||
}
|
||||
Poll::Pending => (),
|
||||
};
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user