feat: don't report inbound stream upgrade errors to handler

When an inbound stream upgrade fails, there isn't a whole lot we can do about that in the handler. In fact, for several errors, we wouldn't even know which specific handler to target, for example, `NegotiationFailed`. Similiarly, in case of an IO error during the upgrade, we don't know which handler the stream was eventually meant to be for.

Pull-Request: #3605.
This commit is contained in:
Thomas Eizinger
2023-05-08 06:54:50 +02:00
committed by GitHub
parent 2130923aa5
commit 53e5370919
21 changed files with 90 additions and 553 deletions

View File

@ -54,7 +54,6 @@ enum EventType {
ReservationReqDenied,
ReservationReqDenyFailed,
ReservationTimedOut,
CircuitReqReceiveFailed,
CircuitReqDenied,
CircuitReqDenyFailed,
CircuitReqOutboundConnectFailed,
@ -75,9 +74,6 @@ impl From<&libp2p_relay::Event> for EventType {
EventType::ReservationReqDenyFailed
}
libp2p_relay::Event::ReservationTimedOut { .. } => EventType::ReservationTimedOut,
libp2p_relay::Event::CircuitReqReceiveFailed { .. } => {
EventType::CircuitReqReceiveFailed
}
libp2p_relay::Event::CircuitReqDenied { .. } => EventType::CircuitReqDenied,
libp2p_relay::Event::CircuitReqOutboundConnectFailed { .. } => {
EventType::CircuitReqOutboundConnectFailed

View File

@ -212,45 +212,12 @@ impl Handler {
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timeout,
},
));
}
ConnectionHandlerUpgrErr::Timer => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timer,
},
));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
self.keep_alive = KeepAlive::No;
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error.map_upgrade_err(|e| {
e.map_err(|e| match e {
Either::Left(e) => Either::Left(e),
Either::Right(v) => void::unreachable(v),
})
}));
}
}
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(
match error {
Either::Left(e) => Either::Left(e),
Either::Right(v) => void::unreachable(v),
},
)));
}
fn on_dial_upgrade_error(

View File

@ -526,7 +526,7 @@ impl ConnectionHandler for Handler {
handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer,
error: ConnectionHandlerUpgrErr::Timeout,
..
}) => {
log::debug!("Dial upgrade error: Protocol negotiation timeout");

View File

@ -150,14 +150,7 @@ impl ConnectionHandler for Handler {
}));
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
match error {
ConnectionHandlerUpgrErr::Timeout => {}
ConnectionHandlerUpgrErr::Timer => {}
ConnectionHandlerUpgrErr::Upgrade(error) => match error {
libp2p_core::UpgradeError::Select(_) => {}
libp2p_core::UpgradeError::Apply(v) => void::unreachable(v),
},
}
void::unreachable(error)
}
}
}

View File

@ -30,8 +30,7 @@ use libp2p_swarm::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
},
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, StreamProtocol,
SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, SubstreamProtocol,
};
use log::error;
use void::Void;
@ -106,14 +105,7 @@ impl ConnectionHandler for Handler {
}
ConnectionEvent::AddressChange(_) => {}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
match error {
ConnectionHandlerUpgrErr::Timeout => {}
ConnectionHandlerUpgrErr::Timer => {}
ConnectionHandlerUpgrErr::Upgrade(error) => match error {
libp2p_core::UpgradeError::Select(_) => {}
libp2p_core::UpgradeError::Apply(v) => void::unreachable(v),
},
}
void::unreachable(error)
}
}
}

View File

@ -6,6 +6,11 @@
- Hide internals of `Connection` and expose only `AsyncRead` and `AsyncWrite`.
See [PR 3829].
- Remove `Event::CircuitReqReceiveFailed` and `Event::InboundCircuitReqFailed` variants.
These variants are no longer constructed.
See [PR 3605].
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3829]: https://github.com/libp2p/rust-libp2p/pull/3829

View File

@ -150,10 +150,6 @@ pub enum Event {
},
/// An inbound reservation has timed out.
ReservationTimedOut { src_peer_id: PeerId },
CircuitReqReceiveFailed {
src_peer_id: PeerId,
error: ConnectionHandlerUpgrErr<void::Void>,
},
/// An inbound circuit request has been denied.
CircuitReqDenied {
src_peer_id: PeerId,
@ -537,15 +533,6 @@ impl NetworkBehaviour for Behaviour {
};
self.queued_actions.push_back(action.into());
}
handler::Event::CircuitReqReceiveFailed { error } => {
self.queued_actions.push_back(
ToSwarm::GenerateEvent(Event::CircuitReqReceiveFailed {
src_peer_id: event_source,
error,
})
.into(),
);
}
handler::Event::CircuitReqDenied {
circuit_id,
dst_peer_id,

View File

@ -163,10 +163,6 @@ pub enum Event {
inbound_circuit_req: inbound_hop::CircuitReq,
endpoint: ConnectedPoint,
},
/// Receiving an inbound circuit request failed.
CircuitReqReceiveFailed {
error: ConnectionHandlerUpgrErr<void::Void>,
},
/// An inbound circuit request has been denied.
CircuitReqDenied {
circuit_id: Option<CircuitId>,
@ -252,10 +248,6 @@ impl fmt::Debug for Event {
.debug_struct("Event::CircuitReqReceived")
.field("endpoint", endpoint)
.finish(),
Event::CircuitReqReceiveFailed { error } => f
.debug_struct("Event::CircuitReqReceiveFailed")
.field("error", error)
.finish(),
Event::CircuitReqDenied {
circuit_id,
dst_peer_id,
@ -471,41 +463,16 @@ impl Handler {
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { error, .. }: ListenUpgradeError<
ListenUpgradeError {
error: inbound_hop::UpgradeError::Fatal(error),
..
}: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
let non_fatal_error = match error {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)),
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::ProtocolError(e),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(
inbound_hop::UpgradeError::Fatal(error),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(Either::Left(error)),
));
return;
}
};
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::CircuitReqReceiveFailed {
error: non_fatal_error,
},
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(Either::Left(error)),
));
}
@ -524,10 +491,6 @@ impl Handler {
ConnectionHandlerUpgrErr::Timeout,
proto::Status::CONNECTION_FAILED,
),
ConnectionHandlerUpgrErr::Timer => (
ConnectionHandlerUpgrErr::Timer,
proto::Status::CONNECTION_FAILED,
),
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => {

View File

@ -79,10 +79,6 @@ pub enum Event {
src_peer_id: PeerId,
limit: Option<protocol::Limit>,
},
InboundCircuitReqFailed {
relay_peer_id: PeerId,
error: ConnectionHandlerUpgrErr<void::Void>,
},
/// An inbound circuit request has been denied.
InboundCircuitReqDenied { src_peer_id: PeerId },
/// Denying an inbound circuit request failed.
@ -282,10 +278,6 @@ impl NetworkBehaviour for Behaviour {
handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
Event::InboundCircuitEstablished { src_peer_id, limit }
}
handler::Event::InboundCircuitReqFailed { error } => Event::InboundCircuitReqFailed {
relay_peer_id: event_source,
error,
},
handler::Event::InboundCircuitReqDenied { src_peer_id } => {
Event::InboundCircuitReqDenied { src_peer_id }
}

View File

@ -97,10 +97,6 @@ pub enum Event {
src_peer_id: PeerId,
limit: Option<protocol::Limit>,
},
/// An inbound circuit request has failed.
InboundCircuitReqFailed {
error: ConnectionHandlerUpgrErr<void::Void>,
},
/// An inbound circuit request has been denied.
InboundCircuitReqDenied { src_peer_id: PeerId },
/// Denying an inbound circuit request failed.
@ -295,41 +291,16 @@ impl Handler {
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { error, .. }: ListenUpgradeError<
ListenUpgradeError {
error: inbound_stop::UpgradeError::Fatal(error),
..
}: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
let non_fatal_error = match error {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)),
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::ProtocolError(e),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(
inbound_stop::UpgradeError::Fatal(error),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(Either::Left(error)),
));
return;
}
};
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundCircuitReqFailed {
error: non_fatal_error,
},
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(Either::Left(error)),
));
}
@ -347,7 +318,6 @@ impl Handler {
OutboundOpenInfo::Reserve { mut to_listener } => {
let non_fatal_error = match error {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
@ -410,7 +380,6 @@ impl Handler {
OutboundOpenInfo::Connect { send_back } => {
let non_fatal_error = match error {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(

View File

@ -4,6 +4,11 @@
See [PR 3715].
- Remove deprecated `RequestResponse` prefixed items. See [PR 3702].
- Remove `InboundFailure::UnsupportedProtocols` and `InboundFailure::InboundTimeout`.
These variants are no longer constructed.
See [PR 3605].
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3702]: https://github.com/libp2p/rust-libp2p/pull/3702

View File

@ -161,30 +161,14 @@ where
}
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { info, error }: ListenUpgradeError<
ListenUpgradeError { error, .. }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.pending_events.push_back(Event::InboundTimeout(info))
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The local peer merely doesn't support the protocol(s) requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the local peer does not support the requested protocol(s).
self.pending_events
.push_back(Event::InboundUnsupportedProtocols(info));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(
error,
)));
}
}
@ -214,11 +198,6 @@ where
OutboundTimeout(RequestId),
/// An outbound request failed to negotiate a mutually supported protocol.
OutboundUnsupportedProtocols(RequestId),
/// An inbound request timed out while waiting for the request
/// or sending the response.
InboundTimeout(RequestId),
/// An inbound request failed to negotiate a mutually supported protocol.
InboundUnsupportedProtocols(RequestId),
}
impl<TCodec: Codec> fmt::Debug for Event<TCodec> {
@ -255,14 +234,6 @@ impl<TCodec: Codec> fmt::Debug for Event<TCodec> {
.debug_tuple("Event::OutboundUnsupportedProtocols")
.field(request_id)
.finish(),
Event::InboundTimeout(request_id) => f
.debug_tuple("Event::InboundTimeout")
.field(request_id)
.finish(),
Event::InboundUnsupportedProtocols(request_id) => f
.debug_tuple("Event::InboundUnsupportedProtocols")
.field(request_id)
.finish(),
}
}
}

View File

@ -857,20 +857,6 @@ where
error: OutboundFailure::Timeout,
}));
}
handler::Event::InboundTimeout(request_id) => {
// Note: `Event::InboundTimeout` is emitted both for timing
// out to receive the request and for timing out sending the response. In the former
// case the request is never added to `pending_outbound_responses` and thus one can
// not assert the request_id to be present before removing it.
self.remove_pending_outbound_response(&peer, connection, request_id);
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
peer,
request_id,
error: InboundFailure::Timeout,
}));
}
handler::Event::OutboundUnsupportedProtocols(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
debug_assert!(
@ -885,17 +871,6 @@ where
error: OutboundFailure::UnsupportedProtocols,
}));
}
handler::Event::InboundUnsupportedProtocols(request_id) => {
// Note: No need to call `self.remove_pending_outbound_response`,
// `Event::Request` was never emitted for this request and
// thus request was never added to `pending_outbound_responses`.
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
peer,
request_id,
error: InboundFailure::UnsupportedProtocols,
}));
}
}
}

View File

@ -10,6 +10,12 @@
- Return a bool from `ExternalAddresses::on_swarm_event` and `ListenAddresses::on_swarm_event` indicating whether any state was changed.
See [PR 3865].
- Remove `ConnectionHandlerUpgrErr::Timer` variant.
This variant was never constructed and thus dead code.
See [PR 3605].
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746
[PR 3865]: https://github.com/libp2p/rust-libp2p/pull/3865

View File

@ -21,9 +21,9 @@
use crate::behaviour::FromSwarm;
use crate::connection::ConnectionId;
use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
KeepAlive, ListenUpgradeError, SubstreamProtocol,
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
SubstreamProtocol,
};
use crate::upgrade::SendWrapper;
use crate::{
@ -252,14 +252,8 @@ where
};
let err = match err {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(err) => {
ConnectionHandlerUpgrErr::Upgrade(err.map_err(|err| match err {
Either::Left(e) => e,
Either::Right(v) => void::unreachable(v),
}))
}
Either::Left(e) => e,
Either::Right(v) => void::unreachable(v),
};
inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
@ -371,33 +365,3 @@ where
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dummy;
/// A disabled [`ToggleConnectionHandler`] can receive listen upgrade errors in
/// the following two cases:
///
/// 1. Protocol negotiation on an incoming stream failed with no protocol
/// being agreed on.
///
/// 2. When combining [`ConnectionHandler`] implementations a single
/// [`ConnectionHandler`] might be notified of an inbound upgrade error
/// unrelated to its own upgrade logic. For example when nesting a
/// [`ToggleConnectionHandler`] in a
/// [`ConnectionHandlerSelect`](crate::connection_handler::ConnectionHandlerSelect)
/// the former might receive an inbound upgrade error even when disabled.
///
/// [`ToggleConnectionHandler`] should ignore the error in both of these cases.
#[test]
fn ignore_listen_upgrade_error_when_disabled() {
let mut handler = ToggleConnectionHandler::<dummy::ConnectionHandler> { inner: None };
handler.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: Either::Right(()),
error: ConnectionHandlerUpgrErr::Timeout,
}));
}
}

View File

@ -273,12 +273,26 @@ where
));
continue;
}
Poll::Ready(Some((info, Err(error)))) => {
Poll::Ready(Some((
info,
Err(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error))),
))) => {
handler.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError { info, error },
));
continue;
}
Poll::Ready(Some((
_,
Err(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e))),
))) => {
log::debug!("failed to upgrade inbound stream: {e}");
continue;
}
Poll::Ready(Some((_, Err(ConnectionHandlerUpgrErr::Timeout)))) => {
log::debug!("inbound stream upgrade timed out");
continue;
}
}
// Ask the handler whether it wants the connection (and the handler itself)

View File

@ -133,7 +133,6 @@ impl crate::handler::ConnectionHandler for ConnectionHandler {
}) => void::unreachable(protocol),
ConnectionEvent::DialUpgradeError(DialUpgradeError { info: _, error }) => match error {
ConnectionHandlerUpgrErr::Timeout => unreachable!(),
ConnectionHandlerUpgrErr::Timer => unreachable!(),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => void::unreachable(e),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)) => {
unreachable!("Denied upgrade does not support any protocols")

View File

@ -229,15 +229,11 @@ impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
/// Whether the event concerns an inbound stream.
pub fn is_inbound(&self) -> bool {
// Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605.
match self {
ConnectionEvent::FullyNegotiatedInbound(_)
| ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too.
..
}) => true,
ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::ListenUpgradeError(_) => {
true
}
ConnectionEvent::FullyNegotiatedOutbound(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::AddressChange(_)
| ConnectionEvent::DialUpgradeError(_) => false,
}
@ -282,7 +278,7 @@ pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
/// that upgrading an inbound substream to the given protocol has failed.
pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
pub info: IOI,
pub error: ConnectionHandlerUpgrErr<IP::Error>,
pub error: IP::Error,
}
/// Configuration of inbound or outbound substream protocol(s)
@ -468,8 +464,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
pub enum ConnectionHandlerUpgrErr<TUpgrErr> {
/// The opening attempt timed out before the negotiation was fully completed.
Timeout,
/// There was an error in the timer used.
Timer,
/// Error while upgrading the substream to the protocol we want.
Upgrade(UpgradeError<TUpgrErr>),
}
@ -482,7 +476,6 @@ impl<TUpgrErr> ConnectionHandlerUpgrErr<TUpgrErr> {
{
match self {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(e) => ConnectionHandlerUpgrErr::Upgrade(f(e)),
}
}
@ -497,9 +490,6 @@ where
ConnectionHandlerUpgrErr::Timeout => {
write!(f, "Timeout error while opening a substream")
}
ConnectionHandlerUpgrErr::Timer => {
write!(f, "Timer error while opening a substream")
}
ConnectionHandlerUpgrErr::Upgrade(err) => {
write!(f, "Upgrade: ")?;
crate::print_error_chain(f, err)
@ -515,7 +505,6 @@ where
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
ConnectionHandlerUpgrErr::Timeout => None,
ConnectionHandlerUpgrErr::Timer => None,
ConnectionHandlerUpgrErr::Upgrade(_) => None,
}
}

View File

@ -25,10 +25,9 @@ use crate::handler::{
InboundUpgradeSend, KeepAlive, ListenUpgradeError, SubstreamProtocol,
};
use crate::upgrade::SendWrapper;
use crate::ConnectionHandlerUpgrErr;
use either::Either;
use futures::future;
use libp2p_core::{ConnectedPoint, UpgradeError};
use libp2p_core::ConnectedPoint;
use libp2p_identity::PeerId;
use std::task::{Context, Poll};
@ -125,61 +124,13 @@ where
fn transpose(self) -> Either<ListenUpgradeError<LIOI, LIP>, ListenUpgradeError<RIOI, RIP>> {
match self {
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(error))),
error: Either::Left(error),
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)),
info,
}),
} => Either::Left(ListenUpgradeError { error, info }),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(error))),
error: Either::Right(error),
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)),
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info,
}),
} => Either::Right(ListenUpgradeError { error, info }),
_ => unreachable!(),
}
}

View File

@ -24,14 +24,13 @@
#[allow(deprecated)]
use crate::handler::IntoConnectionHandler;
use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
KeepAlive, ListenUpgradeError, SubstreamProtocol,
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
SubstreamProtocol,
};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend};
use crate::NegotiatedSubstream;
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::upgrade::{NegotiationError, ProtocolError, UpgradeError};
use libp2p_core::ConnectedPoint;
use libp2p_identity::PeerId;
use rand::Rng;
@ -89,128 +88,20 @@ where
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { error, mut info }: ListenUpgradeError<
ListenUpgradeError {
error: (key, error),
mut info,
}: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timer => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Timer,
},
));
}
}
}
ConnectionHandlerUpgrErr::Timeout => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Timeout,
},
));
}
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
},
));
}
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(e) => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::IoError(
e.kind().into(),
));
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
ProtocolError::InvalidMessage => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
ProtocolError::InvalidProtocol => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
ProtocolError::TooManyProtocols => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e =
NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
},
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => {
if let Some(h) = self.handlers.get_mut(&k) {
if let Some(i) = info.take(&k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
},
));
}
}
if let Some(h) = self.handlers.get_mut(&key) {
if let Some(i) = info.take(&key) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i,
error,
}));
}
}
}

View File

@ -29,7 +29,7 @@ use crate::upgrade::SendWrapper;
use either::Either;
use futures::future;
use libp2p_core::{
upgrade::{NegotiationError, ProtocolError, SelectUpgrade, UpgradeError},
upgrade::{SelectUpgrade, UpgradeError},
ConnectedPoint,
};
use libp2p_identity::PeerId;
@ -161,13 +161,6 @@ where
self,
) -> Either<DialUpgradeError<S1OOI, S1OP>, DialUpgradeError<S2OOI, S2OP>> {
match self {
DialUpgradeError {
info: Either::Left(info),
error: ConnectionHandlerUpgrErr::Timer,
} => Either::Left(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Timer,
}),
DialUpgradeError {
info: Either::Left(info),
error: ConnectionHandlerUpgrErr::Timeout,
@ -189,13 +182,6 @@ where
info,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
}),
DialUpgradeError {
info: Either::Right(info),
error: ConnectionHandlerUpgrErr::Timer,
} => Either::Right(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Timer,
}),
DialUpgradeError {
info: Either::Right(info),
error: ConnectionHandlerUpgrErr::Timeout,
@ -238,96 +224,18 @@ where
>,
) {
match error {
ConnectionHandlerUpgrErr::Timer => {
Either::Left(error) => {
self.proto1
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i1,
error: ConnectionHandlerUpgrErr::Timer,
error,
}));
}
Either::Right(error) => {
self.proto2
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i2,
error: ConnectionHandlerUpgrErr::Timer,
}));
}
ConnectionHandlerUpgrErr::Timeout => {
self.proto1
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i1,
error: ConnectionHandlerUpgrErr::Timeout,
}));
self.proto2
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i2,
error: ConnectionHandlerUpgrErr::Timeout,
}));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
self.proto1
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i1,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
}));
self.proto2
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i2,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
}));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => {
let (e1, e2);
match e {
ProtocolError::IoError(e) => {
e1 = NegotiationError::ProtocolError(ProtocolError::IoError(
e.kind().into(),
));
e2 = NegotiationError::ProtocolError(ProtocolError::IoError(e))
}
ProtocolError::InvalidMessage => {
e1 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
e2 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage)
}
ProtocolError::InvalidProtocol => {
e1 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
e2 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol)
}
ProtocolError::TooManyProtocols => {
e1 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols)
}
}
self.proto1
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i1,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)),
}));
self.proto2
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i2,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)),
}));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(e))) => {
self.proto1
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i1,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
}));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(e))) => {
self.proto2
.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i2,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
error,
}));
}
}