mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Implement ProtocolsHandler methods in wrappers. (#1710)
* Implement ProtocolsHandler methods in wrappers. This PR forwards calls to some ProtocolsHandler methods that were previously not implemented in wrappers such as `MapInEvent`. It is unclear though how this can be implemented in some handlers such as `MultiHandler` as the information at hand does not enable it to decide which handler to forward the call to. * Add `MultiHandler::inject_listen_ugrade_error`.
This commit is contained in:
parent
cbdbf656c0
commit
21f9447796
@ -156,9 +156,7 @@ pub trait ProtocolsHandler: Send + 'static {
|
|||||||
/// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
|
/// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
|
||||||
fn inject_listen_upgrade_error(
|
fn inject_listen_upgrade_error(
|
||||||
&mut self,
|
&mut self,
|
||||||
_: ProtocolsHandlerUpgrErr<
|
_: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
|
||||||
<Self::InboundProtocol as InboundUpgradeSend>::Error
|
|
||||||
>
|
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
/// Returns until when the connection should be kept alive.
|
/// Returns until when the connection should be kept alive.
|
||||||
@ -189,7 +187,6 @@ pub trait ProtocolsHandler: Send + 'static {
|
|||||||
>;
|
>;
|
||||||
|
|
||||||
/// Adds a closure that turns the input event into something else.
|
/// Adds a closure that turns the input event into something else.
|
||||||
#[inline]
|
|
||||||
fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
|
fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
@ -199,7 +196,6 @@ pub trait ProtocolsHandler: Send + 'static {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a closure that turns the output event into something else.
|
/// Adds a closure that turns the output event into something else.
|
||||||
#[inline]
|
|
||||||
fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
|
fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
@ -214,7 +210,6 @@ pub trait ProtocolsHandler: Send + 'static {
|
|||||||
/// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence,
|
/// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence,
|
||||||
/// > i.e. is returned from [`ProtocolsHandler::connection_keep_alive`] by the returned
|
/// > i.e. is returned from [`ProtocolsHandler::connection_keep_alive`] by the returned
|
||||||
/// > handler.
|
/// > handler.
|
||||||
#[inline]
|
|
||||||
fn select<TProto2>(self, other: TProto2) -> ProtocolsHandlerSelect<Self, TProto2>
|
fn select<TProto2>(self, other: TProto2) -> ProtocolsHandlerSelect<Self, TProto2>
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
@ -226,7 +221,6 @@ pub trait ProtocolsHandler: Send + 'static {
|
|||||||
/// exclusively.
|
/// exclusively.
|
||||||
///
|
///
|
||||||
/// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`.
|
/// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`.
|
||||||
#[inline]
|
|
||||||
fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder<Self>
|
fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder<Self>
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
@ -331,7 +325,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
|
|||||||
{
|
{
|
||||||
/// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
|
/// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
|
||||||
/// `TOutboundOpenInfo` to something else.
|
/// `TOutboundOpenInfo` to something else.
|
||||||
#[inline]
|
|
||||||
pub fn map_outbound_open_info<F, I>(
|
pub fn map_outbound_open_info<F, I>(
|
||||||
self,
|
self,
|
||||||
map: F,
|
map: F,
|
||||||
@ -353,7 +346,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
|
|||||||
|
|
||||||
/// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
|
/// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
|
||||||
/// to something else.
|
/// to something else.
|
||||||
#[inline]
|
|
||||||
pub fn map_protocol<F, I>(
|
pub fn map_protocol<F, I>(
|
||||||
self,
|
self,
|
||||||
map: F,
|
map: F,
|
||||||
@ -374,7 +366,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// If this is a `Custom` event, maps the content to something else.
|
/// If this is a `Custom` event, maps the content to something else.
|
||||||
#[inline]
|
|
||||||
pub fn map_custom<F, I>(
|
pub fn map_custom<F, I>(
|
||||||
self,
|
self,
|
||||||
map: F,
|
map: F,
|
||||||
@ -392,7 +383,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// If this is a `Close` event, maps the content to something else.
|
/// If this is a `Close` event, maps the content to something else.
|
||||||
#[inline]
|
|
||||||
pub fn map_close<F, I>(
|
pub fn map_close<F, I>(
|
||||||
self,
|
self,
|
||||||
map: F,
|
map: F,
|
||||||
@ -421,6 +411,20 @@ pub enum ProtocolsHandlerUpgrErr<TUpgrErr> {
|
|||||||
Upgrade(UpgradeError<TUpgrErr>),
|
Upgrade(UpgradeError<TUpgrErr>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<TUpgrErr> ProtocolsHandlerUpgrErr<TUpgrErr> {
|
||||||
|
/// Map the inner [`UpgradeError`] type.
|
||||||
|
pub fn map_upgrade_err<F, E>(self, f: F) -> ProtocolsHandlerUpgrErr<E>
|
||||||
|
where
|
||||||
|
F: FnOnce(UpgradeError<TUpgrErr>) -> UpgradeError<E>
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout,
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(e) => ProtocolsHandlerUpgrErr::Upgrade(f(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<TUpgrErr> fmt::Display for ProtocolsHandlerUpgrErr<TUpgrErr>
|
impl<TUpgrErr> fmt::Display for ProtocolsHandlerUpgrErr<TUpgrErr>
|
||||||
where
|
where
|
||||||
TUpgrErr: fmt::Display,
|
TUpgrErr: fmt::Display,
|
||||||
|
@ -26,7 +26,7 @@ use crate::protocols_handler::{
|
|||||||
ProtocolsHandlerEvent,
|
ProtocolsHandlerEvent,
|
||||||
ProtocolsHandlerUpgrErr
|
ProtocolsHandlerUpgrErr
|
||||||
};
|
};
|
||||||
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade};
|
use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use void::Void;
|
use void::Void;
|
||||||
|
|
||||||
@ -71,8 +71,12 @@ impl ProtocolsHandler for DummyProtocolsHandler {
|
|||||||
|
|
||||||
fn inject_event(&mut self, _: Self::InEvent) {}
|
fn inject_event(&mut self, _: Self::InEvent) {}
|
||||||
|
|
||||||
|
fn inject_address_change(&mut self, _: &Multiaddr) {}
|
||||||
|
|
||||||
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>) {}
|
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>) {}
|
||||||
|
|
||||||
|
fn inject_listen_upgrade_error(&mut self, _: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error>) {}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
self.keep_alive
|
self.keep_alive
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ use crate::protocols_handler::{
|
|||||||
ProtocolsHandlerEvent,
|
ProtocolsHandlerEvent,
|
||||||
ProtocolsHandlerUpgrErr
|
ProtocolsHandlerUpgrErr
|
||||||
};
|
};
|
||||||
|
use libp2p_core::Multiaddr;
|
||||||
use std::{marker::PhantomData, task::Context, task::Poll};
|
use std::{marker::PhantomData, task::Context, task::Poll};
|
||||||
|
|
||||||
/// Wrapper around a protocol handler that turns the input event into something else.
|
/// Wrapper around a protocol handler that turns the input event into something else.
|
||||||
@ -38,7 +38,6 @@ pub struct MapInEvent<TProtoHandler, TNewIn, TMap> {
|
|||||||
|
|
||||||
impl<TProtoHandler, TMap, TNewIn> MapInEvent<TProtoHandler, TNewIn, TMap> {
|
impl<TProtoHandler, TMap, TNewIn> MapInEvent<TProtoHandler, TNewIn, TMap> {
|
||||||
/// Creates a `MapInEvent`.
|
/// Creates a `MapInEvent`.
|
||||||
#[inline]
|
|
||||||
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
|
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
|
||||||
MapInEvent {
|
MapInEvent {
|
||||||
inner,
|
inner,
|
||||||
@ -62,12 +61,10 @@ where
|
|||||||
type OutboundProtocol = TProtoHandler::OutboundProtocol;
|
type OutboundProtocol = TProtoHandler::OutboundProtocol;
|
||||||
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
|
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||||
self.inner.listen_protocol()
|
self.inner.listen_protocol()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn inject_fully_negotiated_inbound(
|
fn inject_fully_negotiated_inbound(
|
||||||
&mut self,
|
&mut self,
|
||||||
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
|
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
|
||||||
@ -75,7 +72,6 @@ where
|
|||||||
self.inner.inject_fully_negotiated_inbound(protocol)
|
self.inner.inject_fully_negotiated_inbound(protocol)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn inject_fully_negotiated_outbound(
|
fn inject_fully_negotiated_outbound(
|
||||||
&mut self,
|
&mut self,
|
||||||
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||||
@ -84,24 +80,31 @@ where
|
|||||||
self.inner.inject_fully_negotiated_outbound(protocol, info)
|
self.inner.inject_fully_negotiated_outbound(protocol, info)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn inject_event(&mut self, event: TNewIn) {
|
fn inject_event(&mut self, event: TNewIn) {
|
||||||
if let Some(event) = (self.map)(event) {
|
if let Some(event) = (self.map)(event) {
|
||||||
self.inner.inject_event(event);
|
self.inner.inject_event(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||||
|
self.inner.inject_address_change(addr)
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
||||||
self.inner.inject_dial_upgrade_error(info, error)
|
self.inner.inject_dial_upgrade_error(info, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
fn inject_listen_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
|
||||||
|
) {
|
||||||
|
self.inner.inject_listen_upgrade_error(error)
|
||||||
|
}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
self.inner.connection_keep_alive()
|
self.inner.connection_keep_alive()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll(
|
fn poll(
|
||||||
&mut self,
|
&mut self,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
|
@ -26,7 +26,7 @@ use crate::protocols_handler::{
|
|||||||
ProtocolsHandlerEvent,
|
ProtocolsHandlerEvent,
|
||||||
ProtocolsHandlerUpgrErr
|
ProtocolsHandlerUpgrErr
|
||||||
};
|
};
|
||||||
|
use libp2p_core::Multiaddr;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
/// Wrapper around a protocol handler that turns the output event into something else.
|
/// Wrapper around a protocol handler that turns the output event into something else.
|
||||||
@ -37,7 +37,6 @@ pub struct MapOutEvent<TProtoHandler, TMap> {
|
|||||||
|
|
||||||
impl<TProtoHandler, TMap> MapOutEvent<TProtoHandler, TMap> {
|
impl<TProtoHandler, TMap> MapOutEvent<TProtoHandler, TMap> {
|
||||||
/// Creates a `MapOutEvent`.
|
/// Creates a `MapOutEvent`.
|
||||||
#[inline]
|
|
||||||
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
|
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
|
||||||
MapOutEvent {
|
MapOutEvent {
|
||||||
inner,
|
inner,
|
||||||
@ -60,12 +59,10 @@ where
|
|||||||
type OutboundProtocol = TProtoHandler::OutboundProtocol;
|
type OutboundProtocol = TProtoHandler::OutboundProtocol;
|
||||||
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
|
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||||
self.inner.listen_protocol()
|
self.inner.listen_protocol()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn inject_fully_negotiated_inbound(
|
fn inject_fully_negotiated_inbound(
|
||||||
&mut self,
|
&mut self,
|
||||||
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
|
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
|
||||||
@ -73,7 +70,6 @@ where
|
|||||||
self.inner.inject_fully_negotiated_inbound(protocol)
|
self.inner.inject_fully_negotiated_inbound(protocol)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn inject_fully_negotiated_outbound(
|
fn inject_fully_negotiated_outbound(
|
||||||
&mut self,
|
&mut self,
|
||||||
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
|
||||||
@ -82,22 +78,29 @@ where
|
|||||||
self.inner.inject_fully_negotiated_outbound(protocol, info)
|
self.inner.inject_fully_negotiated_outbound(protocol, info)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn inject_event(&mut self, event: Self::InEvent) {
|
fn inject_event(&mut self, event: Self::InEvent) {
|
||||||
self.inner.inject_event(event)
|
self.inner.inject_event(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||||
|
self.inner.inject_address_change(addr)
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
||||||
self.inner.inject_dial_upgrade_error(info, error)
|
self.inner.inject_dial_upgrade_error(info, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
fn inject_listen_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
|
||||||
|
) {
|
||||||
|
self.inner.inject_listen_upgrade_error(error)
|
||||||
|
}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
self.inner.connection_keep_alive()
|
self.inner.connection_keep_alive()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll(
|
fn poll(
|
||||||
&mut self,
|
&mut self,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
|
@ -36,7 +36,8 @@ use crate::upgrade::{
|
|||||||
UpgradeInfoSend
|
UpgradeInfoSend
|
||||||
};
|
};
|
||||||
use futures::{future::BoxFuture, prelude::*};
|
use futures::{future::BoxFuture, prelude::*};
|
||||||
use libp2p_core::{ConnectedPoint, PeerId, upgrade::ProtocolName};
|
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
|
||||||
|
use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
@ -135,6 +136,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
h.inject_address_change(addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_dial_upgrade_error (
|
fn inject_dial_upgrade_error (
|
||||||
&mut self,
|
&mut self,
|
||||||
(key, arg): Self::OutboundOpenInfo,
|
(key, arg): Self::OutboundOpenInfo,
|
||||||
@ -147,6 +154,53 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn inject_listen_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
|
||||||
|
) {
|
||||||
|
match error {
|
||||||
|
ProtocolsHandlerUpgrErr::Timer =>
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer)
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout =>
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout)
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) =>
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)))
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) =>
|
||||||
|
match e {
|
||||||
|
ProtocolError::IoError(e) =>
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into()));
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
|
||||||
|
}
|
||||||
|
ProtocolError::InvalidMessage =>
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
|
||||||
|
}
|
||||||
|
ProtocolError::InvalidProtocol =>
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
|
||||||
|
}
|
||||||
|
ProtocolError::TooManyProtocols =>
|
||||||
|
for h in self.handlers.values_mut() {
|
||||||
|
let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) =>
|
||||||
|
if let Some(h) = self.handlers.get_mut(&k) {
|
||||||
|
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
self.handlers.values()
|
self.handlers.values()
|
||||||
.map(|h| h.connection_keep_alive())
|
.map(|h| h.connection_keep_alive())
|
||||||
|
@ -30,9 +30,10 @@ use crate::protocols_handler::{
|
|||||||
|
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
ConnectedPoint,
|
ConnectedPoint,
|
||||||
|
Multiaddr,
|
||||||
PeerId,
|
PeerId,
|
||||||
either::{EitherError, EitherOutput},
|
either::{EitherError, EitherOutput},
|
||||||
upgrade::{EitherUpgrade, SelectUpgrade, UpgradeError}
|
upgrade::{EitherUpgrade, SelectUpgrade, UpgradeError, NegotiationError, ProtocolError}
|
||||||
};
|
};
|
||||||
use std::{cmp, task::Context, task::Poll};
|
use std::{cmp, task::Context, task::Poll};
|
||||||
|
|
||||||
@ -47,7 +48,6 @@ pub struct IntoProtocolsHandlerSelect<TProto1, TProto2> {
|
|||||||
|
|
||||||
impl<TProto1, TProto2> IntoProtocolsHandlerSelect<TProto1, TProto2> {
|
impl<TProto1, TProto2> IntoProtocolsHandlerSelect<TProto1, TProto2> {
|
||||||
/// Builds a `IntoProtocolsHandlerSelect`.
|
/// Builds a `IntoProtocolsHandlerSelect`.
|
||||||
#[inline]
|
|
||||||
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
|
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
|
||||||
IntoProtocolsHandlerSelect {
|
IntoProtocolsHandlerSelect {
|
||||||
proto1,
|
proto1,
|
||||||
@ -86,7 +86,6 @@ pub struct ProtocolsHandlerSelect<TProto1, TProto2> {
|
|||||||
|
|
||||||
impl<TProto1, TProto2> ProtocolsHandlerSelect<TProto1, TProto2> {
|
impl<TProto1, TProto2> ProtocolsHandlerSelect<TProto1, TProto2> {
|
||||||
/// Builds a `ProtocolsHandlerSelect`.
|
/// Builds a `ProtocolsHandlerSelect`.
|
||||||
#[inline]
|
|
||||||
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
|
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
|
||||||
ProtocolsHandlerSelect {
|
ProtocolsHandlerSelect {
|
||||||
proto1,
|
proto1,
|
||||||
@ -107,7 +106,6 @@ where
|
|||||||
type OutboundProtocol = EitherUpgrade<SendWrapper<TProto1::OutboundProtocol>, SendWrapper<TProto2::OutboundProtocol>>;
|
type OutboundProtocol = EitherUpgrade<SendWrapper<TProto1::OutboundProtocol>, SendWrapper<TProto2::OutboundProtocol>>;
|
||||||
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;
|
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||||
let proto1 = self.proto1.listen_protocol();
|
let proto1 = self.proto1.listen_protocol();
|
||||||
let proto2 = self.proto2.listen_protocol();
|
let proto2 = self.proto2.listen_protocol();
|
||||||
@ -138,7 +136,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn inject_event(&mut self, event: Self::InEvent) {
|
fn inject_event(&mut self, event: Self::InEvent) {
|
||||||
match event {
|
match event {
|
||||||
EitherOutput::First(event) => self.proto1.inject_event(event),
|
EitherOutput::First(event) => self.proto1.inject_event(event),
|
||||||
@ -146,7 +143,11 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||||
|
self.proto1.inject_address_change(addr);
|
||||||
|
self.proto2.inject_address_change(addr)
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
||||||
match (info, error) {
|
match (info, error) {
|
||||||
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timer) => {
|
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timer) => {
|
||||||
@ -182,7 +183,52 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
fn inject_listen_upgrade_error(&mut self, error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>) {
|
||||||
|
match error {
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => {
|
||||||
|
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer);
|
||||||
|
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer);
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => {
|
||||||
|
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout);
|
||||||
|
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout);
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
|
||||||
|
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)));
|
||||||
|
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)));
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) => {
|
||||||
|
let (e1, e2);
|
||||||
|
match e {
|
||||||
|
ProtocolError::IoError(e) => {
|
||||||
|
e1 = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into()));
|
||||||
|
e2 = NegotiationError::ProtocolError(ProtocolError::IoError(e))
|
||||||
|
}
|
||||||
|
ProtocolError::InvalidMessage => {
|
||||||
|
e1 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
|
||||||
|
e2 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage)
|
||||||
|
}
|
||||||
|
ProtocolError::InvalidProtocol => {
|
||||||
|
e1 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
|
||||||
|
e2 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol)
|
||||||
|
}
|
||||||
|
ProtocolError::TooManyProtocols => {
|
||||||
|
e1 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
|
||||||
|
e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)));
|
||||||
|
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)))
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
|
||||||
|
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
|
||||||
|
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
cmp::max(self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive())
|
cmp::max(self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive())
|
||||||
}
|
}
|
||||||
|
@ -28,13 +28,12 @@ use crate::protocols_handler::{
|
|||||||
ProtocolsHandlerUpgrErr,
|
ProtocolsHandlerUpgrErr,
|
||||||
IntoProtocolsHandler
|
IntoProtocolsHandler
|
||||||
};
|
};
|
||||||
|
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
ConnectedPoint,
|
ConnectedPoint,
|
||||||
PeerId,
|
PeerId,
|
||||||
Multiaddr,
|
Multiaddr,
|
||||||
connection::ConnectionId,
|
connection::ConnectionId,
|
||||||
either::EitherOutput,
|
either::{EitherError, EitherOutput},
|
||||||
upgrade::{DeniedUpgrade, EitherUpgrade}
|
upgrade::{DeniedUpgrade, EitherUpgrade}
|
||||||
};
|
};
|
||||||
use std::{error, task::Context, task::Poll};
|
use std::{error, task::Context, task::Poll};
|
||||||
@ -250,11 +249,27 @@ where
|
|||||||
.inject_event(event)
|
.inject_event(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn inject_address_change(&mut self, addr: &Multiaddr) {
|
||||||
|
if let Some(inner) = self.inner.as_mut() {
|
||||||
|
inner.inject_address_change(addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
|
||||||
self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED")
|
self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED")
|
||||||
.inject_dial_upgrade_error(info, err)
|
.inject_dial_upgrade_error(info, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn inject_listen_upgrade_error(&mut self, err: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>) {
|
||||||
|
if let Some(inner) = self.inner.as_mut() {
|
||||||
|
let err = err.map_upgrade_err(|e| e.map_err(|e| match e {
|
||||||
|
EitherError::A(e) => e,
|
||||||
|
EitherError::B(v) => void::unreachable(v)
|
||||||
|
}));
|
||||||
|
inner.inject_listen_upgrade_error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
self.inner.as_ref().map(|h| h.connection_keep_alive())
|
self.inner.as_ref().map(|h| h.connection_keep_alive())
|
||||||
.unwrap_or(KeepAlive::No)
|
.unwrap_or(KeepAlive::No)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user