diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index 0e83998f..404aef01 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -16,7 +16,7 @@ use libp2p::{ multiaddr::Protocol, noise, request_response::{self, ProtocolSupport, RequestId, ResponseChannel}, - swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, StreamUpgradeError, Swarm, SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, Transport, }; @@ -216,7 +216,7 @@ impl EventLoop { async fn handle_event( &mut self, - event: SwarmEvent, io::Error>>, + event: SwarmEvent, io::Error>>, ) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index 42920b2f..2c1698f2 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -30,7 +30,7 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailu use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent}; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, + ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, ToSwarm, }; use std::collections::{HashMap, HashSet, VecDeque}; @@ -65,7 +65,7 @@ pub enum Error { #[error("Failed to dial peer.")] Dial, #[error("Failed to establish substream: {0}.")] - Handler(ConnectionHandlerUpgrErr), + Handler(StreamUpgradeError), } pub struct Behaviour { diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs index aab21248..c1470840 100644 --- a/protocols/dcutr/src/handler/direct.rs +++ b/protocols/dcutr/src/handler/direct.rs @@ -23,8 +23,7 @@ use libp2p_core::upgrade::DeniedUpgrade; use libp2p_swarm::handler::ConnectionEvent; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, }; use std::task::{Context, Poll}; use void::Void; @@ -42,7 +41,7 @@ pub struct Handler { impl ConnectionHandler for Handler { type InEvent = void::Void; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; + type Error = StreamUpgradeError; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index ed0d7f6f..c9f49ff9 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -26,15 +26,14 @@ use futures::future; use futures::future::{BoxFuture, FutureExt}; use instant::Instant; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError}; +use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; use std::fmt; @@ -82,11 +81,11 @@ pub enum Event { remote_addr: Multiaddr, }, InboundNegotiationFailed { - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, InboundConnectNegotiated(Vec), OutboundNegotiationFailed { - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, OutboundConnectNegotiated { remote_addrs: Vec, @@ -127,7 +126,7 @@ pub struct Handler { endpoint: ConnectedPoint, /// A pending fatal error that results in the connection being closed. pending_error: Option< - ConnectionHandlerUpgrErr< + StreamUpgradeError< Either, >, >, @@ -212,12 +211,10 @@ impl Handler { ::InboundProtocol, >, ) { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply( - match error { - Either::Left(e) => Either::Left(e), - Either::Right(v) => void::unreachable(v), - }, - ))); + self.pending_error = Some(StreamUpgradeError::Apply(match error { + Either::Left(e) => Either::Left(e), + Either::Right(v) => void::unreachable(v), + })); } fn on_dial_upgrade_error( @@ -230,29 +227,27 @@ impl Handler { self.keep_alive = KeepAlive::No; match error { - ConnectionHandlerUpgrErr::Timeout => { + StreamUpgradeError::Timeout => { self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::OutboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Timeout, + error: StreamUpgradeError::Timeout, }, )); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + StreamUpgradeError::NegotiationFailed => { // The remote merely doesn't support the DCUtR protocol. // This is no reason to close the connection, which may // successfully communicate with other protocols already. self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::OutboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: StreamUpgradeError::NegotiationFailed, }, )); } _ => { // Anything else is considered a fatal error or misbehaviour of // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(Either::Right))); + self.pending_error = Some(error.map_upgrade_err(Either::Right)); } } } @@ -261,7 +256,7 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = Command; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< + type Error = StreamUpgradeError< Either, >; type InboundProtocol = Either; @@ -352,9 +347,9 @@ impl ConnectionHandler for Handler { )); } Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Close( - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(e))), - )) + return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( + Either::Left(e), + ))) } } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 3ef29616..6e673516 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -27,10 +27,10 @@ use futures::future::Either; use futures::prelude::*; use futures::StreamExt; use instant::Instant; -use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError}; +use libp2p_core::upgrade::DeniedUpgrade; use libp2p_swarm::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError, SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; @@ -526,20 +526,17 @@ impl ConnectionHandler for Handler { handler.on_fully_negotiated_outbound(fully_negotiated_outbound) } ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, + error: StreamUpgradeError::Timeout, .. }) => { log::debug!("Dial upgrade error: Protocol negotiation timeout"); } ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + error: StreamUpgradeError::Apply(e), .. }) => void::unreachable(e), ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: StreamUpgradeError::NegotiationFailed, .. }) => { // The protocol is not supported @@ -551,10 +548,7 @@ impl ConnectionHandler for Handler { }); } ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::ProtocolError(e), - )), + error: StreamUpgradeError::Io(e), .. }) => { log::debug!("Protocol negotiation failed: {e}") diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index b585513d..450d6dc5 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -25,8 +25,8 @@ use libp2p_identity::PeerId; use libp2p_identity::PublicKey; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ - AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError, ExternalAddresses, - ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, + AddressScore, ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, + NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, StreamUpgradeError, THandlerInEvent, ToSwarm, }; use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent}; @@ -492,7 +492,7 @@ pub enum Event { /// The peer with whom the error originated. peer_id: PeerId, /// The error that occurred. - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 21be0afc..0c72635d 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -34,8 +34,8 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, StreamProtocol, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol, + StreamUpgradeError, SubstreamProtocol, }; use log::warn; use smallvec::SmallVec; @@ -111,7 +111,7 @@ pub enum Event { /// We received a request for identification. Identify, /// Failed to identify the remote, or to reply to an identification request. - IdentificationError(ConnectionHandlerUpgrErr), + IdentificationError(StreamUpgradeError), } impl Handler { @@ -205,13 +205,7 @@ impl Handler { ::OutboundProtocol, >, ) { - use libp2p_core::upgrade::UpgradeError; - - let err = err.map_upgrade_err(|e| match e { - UpgradeError::Select(e) => UpgradeError::Select(e), - UpgradeError::Apply(Either::Left(ioe)) => UpgradeError::Apply(ioe), - UpgradeError::Apply(Either::Right(ioe)) => UpgradeError::Apply(ioe), - }); + let err = err.map_upgrade_err(|e| e.into_inner()); self.events .push(ConnectionHandlerEvent::Custom(Event::IdentificationError( err, @@ -317,9 +311,7 @@ impl ConnectionHandler for Handler { Event::Identification(peer_id), )), Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom( - Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - )), + Event::IdentificationError(StreamUpgradeError::Apply(err)), )), Poll::Ready(None) | Poll::Pending => Poll::Pending, } diff --git a/protocols/kad/src/handler_priv.rs b/protocols/kad/src/handler_priv.rs index b99eb295..3ac82960 100644 --- a/protocols/kad/src/handler_priv.rs +++ b/protocols/kad/src/handler_priv.rs @@ -33,8 +33,8 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamUpgradeError, + SubstreamProtocol, }; use log::trace; use std::collections::VecDeque; @@ -325,7 +325,7 @@ pub enum KademliaHandlerEvent { #[derive(Debug)] pub enum KademliaHandlerQueryErr { /// Error while trying to perform the query. - Upgrade(ConnectionHandlerUpgrErr), + Upgrade(StreamUpgradeError), /// Received an answer that doesn't correspond to the request. UnexpectedMessage, /// I/O error in the substream. @@ -361,8 +361,8 @@ impl error::Error for KademliaHandlerQueryErr { } } -impl From> for KademliaHandlerQueryErr { - fn from(err: ConnectionHandlerUpgrErr) -> Self { +impl From> for KademliaHandlerQueryErr { + fn from(err: StreamUpgradeError) -> Self { KademliaHandlerQueryErr::Upgrade(err) } } diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index dade91c5..920baed1 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -28,8 +28,8 @@ use std::{ use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::{ - derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr, - ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent, + derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, + NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use void::Void; @@ -41,7 +41,7 @@ use super::{RunId, RunParams, RunStats}; #[derive(Debug)] pub struct Event { pub id: RunId, - pub result: Result>, + pub result: Result>, } #[derive(Default)] diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 25844d53..437d63f6 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -31,7 +31,7 @@ use libp2p_swarm::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, StreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use void::Void; @@ -47,7 +47,7 @@ pub struct Command { #[derive(Debug)] pub struct Event { pub(crate) id: RunId, - pub(crate) result: Result>, + pub(crate) result: Result>, } pub struct Handler { diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 1f6ca00d..22aee9d5 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -23,13 +23,12 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures_timer::Delay; use libp2p_core::upgrade::ReadyUpgrade; -use libp2p_core::{upgrade::NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, StreamProtocol, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol, + StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; use std::{ @@ -238,14 +237,14 @@ impl Handler { self.outbound = None; // Request a new substream on the next `poll`. let error = match error { - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + StreamUpgradeError::NegotiationFailed => { debug_assert_eq!(self.state, State::Active); self.state = State::Inactive { reported: false }; return; } // Note: This timeout only covers protocol negotiation. - ConnectionHandlerUpgrErr::Timeout => Failure::Timeout, + StreamUpgradeError::Timeout => Failure::Timeout, e => Failure::Other { error: Box::new(e) }, }; diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 015e62ab..b2d6538f 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -33,9 +33,8 @@ use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ - dummy, ConnectionDenied, ConnectionHandlerUpgrErr, ConnectionId, ExternalAddresses, - NetworkBehaviour, NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, - ToSwarm, + dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler, + PollParameters, StreamUpgradeError, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; @@ -170,7 +169,7 @@ pub enum Event { CircuitReqOutboundConnectFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, /// Accepting an inbound circuit request failed. CircuitReqAcceptFailed { diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index cc4a4811..580a69c7 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -30,15 +30,15 @@ use futures::io::AsyncWriteExt; use futures::stream::{FuturesUnordered, StreamExt}; use futures_timer::Delay; use instant::Instant; -use libp2p_core::{upgrade, ConnectedPoint, Multiaddr}; +use libp2p_core::{ConnectedPoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, ConnectionId, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, NegotiatedSubstream, + StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; use std::fmt; @@ -203,7 +203,7 @@ pub enum Event { src_connection_id: ConnectionId, inbound_circuit_req: inbound_hop::CircuitReq, status: proto::Status, - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, /// An inbound circuit has closed. CircuitClosed { @@ -347,7 +347,7 @@ pub struct Handler { /// A pending fatal error that results in the connection being closed. pending_error: Option< - ConnectionHandlerUpgrErr< + StreamUpgradeError< Either, >, >, @@ -471,9 +471,7 @@ impl Handler { ::InboundProtocol, >, ) { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Left(error)), - )); + self.pending_error = Some(StreamUpgradeError::Apply(Either::Left(error))); } fn on_dial_upgrade_error( @@ -487,33 +485,23 @@ impl Handler { >, ) { let (non_fatal_error, status) = match error { - ConnectionHandlerUpgrErr::Timeout => ( - ConnectionHandlerUpgrErr::Timeout, + StreamUpgradeError::Timeout => ( + StreamUpgradeError::Timeout, proto::Status::CONNECTION_FAILED, ), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => { + StreamUpgradeError::NegotiationFailed => { // The remote has previously done a reservation. Doing a reservation but not // supporting the stop protocol is pointless, thus disconnecting. - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed), - )); + self.pending_error = Some(StreamUpgradeError::NegotiationFailed); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), - )); + StreamUpgradeError::Io(e) => { + self.pending_error = Some(StreamUpgradeError::Io(e)); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => match error { + StreamUpgradeError::Apply(error) => match error { outbound_stop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Right(error)), - )); + self.pending_error = Some(StreamUpgradeError::Apply(Either::Right(error))); return; } outbound_stop::UpgradeError::CircuitFailed(error) => { @@ -525,10 +513,7 @@ impl Handler { proto::Status::PERMISSION_DENIED } }; - ( - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)), - status, - ) + (StreamUpgradeError::Apply(error), status) } }, }; @@ -563,7 +548,7 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< + type Error = StreamUpgradeError< Either, >; type InboundProtocol = inbound_hop::Upgrade; diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index c0e6b4d4..656196fb 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -39,9 +39,9 @@ use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ - dummy, ConnectionDenied, ConnectionHandler, ConnectionHandlerUpgrErr, ConnectionId, - DialFailure, NegotiatedSubstream, NetworkBehaviour, NotifyHandler, PollParameters, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NegotiatedSubstream, + NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; @@ -64,7 +64,7 @@ pub enum Event { relay_peer_id: PeerId, /// Indicates whether the request replaces an existing reservation. renewal: bool, - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, OutboundCircuitEstablished { relay_peer_id: PeerId, @@ -72,7 +72,7 @@ pub enum Event { }, OutboundCircuitReqFailed { relay_peer_id: PeerId, - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, /// An inbound circuit has been established. InboundCircuitEstablished { diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 7769f6fd..a2178ce4 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -29,15 +29,14 @@ use futures::stream::{FuturesUnordered, StreamExt}; use futures_timer::Delay; use instant::Instant; use libp2p_core::multiaddr::Protocol; -use libp2p_core::{upgrade, Multiaddr}; +use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, }; use log::debug; use std::collections::{HashMap, VecDeque}; @@ -85,12 +84,12 @@ pub enum Event { ReservationReqFailed { /// Indicates whether the request replaces an existing reservation. renewal: bool, - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, /// An outbound circuit has been established. OutboundCircuitEstablished { limit: Option }, OutboundCircuitReqFailed { - error: ConnectionHandlerUpgrErr, + error: StreamUpgradeError, }, /// An inbound circuit has been established. InboundCircuitEstablished { @@ -112,7 +111,7 @@ pub struct Handler { remote_addr: Multiaddr, /// A pending fatal error that results in the connection being closed. pending_error: Option< - ConnectionHandlerUpgrErr< + StreamUpgradeError< Either, >, >, @@ -299,9 +298,7 @@ impl Handler { ::InboundProtocol, >, ) { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Left(error)), - )); + self.pending_error = Some(StreamUpgradeError::Apply(Either::Left(error))); } fn on_dial_upgrade_error( @@ -317,42 +314,25 @@ impl Handler { match open_info { OutboundOpenInfo::Reserve { mut to_listener } => { let non_fatal_error = match error { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - ), - )); + StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, + StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, + StreamUpgradeError::Io(e) => { + self.pending_error = Some(StreamUpgradeError::Io(e)); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { - match error { - outbound_hop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Right(error)), - )); - return; - } - outbound_hop::UpgradeError::ReservationFailed(error) => { - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - error, - )) - } - outbound_hop::UpgradeError::CircuitFailed(_) => { - unreachable!( - "Do not emitt `CircuitFailed` for outgoing reservation." - ) - } + StreamUpgradeError::Apply(error) => match error { + outbound_hop::UpgradeError::Fatal(error) => { + self.pending_error = + Some(StreamUpgradeError::Apply(Either::Right(error))); + return; } - } + outbound_hop::UpgradeError::ReservationFailed(error) => { + StreamUpgradeError::Apply(error) + } + outbound_hop::UpgradeError::CircuitFailed(_) => { + unreachable!("Do not emitt `CircuitFailed` for outgoing reservation.") + } + }, }; if self.pending_error.is_none() { @@ -379,42 +359,25 @@ impl Handler { } OutboundOpenInfo::Connect { send_back } => { let non_fatal_error = match error { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - ), - )); + StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, + StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, + StreamUpgradeError::Io(e) => { + self.pending_error = Some(StreamUpgradeError::Io(e)); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { - match error { - outbound_hop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Right(error)), - )); - return; - } - outbound_hop::UpgradeError::CircuitFailed(error) => { - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - error, - )) - } - outbound_hop::UpgradeError::ReservationFailed(_) => { - unreachable!( - "Do not emitt `ReservationFailed` for outgoing circuit." - ) - } + StreamUpgradeError::Apply(error) => match error { + outbound_hop::UpgradeError::Fatal(error) => { + self.pending_error = + Some(StreamUpgradeError::Apply(Either::Right(error))); + return; } - } + outbound_hop::UpgradeError::CircuitFailed(error) => { + StreamUpgradeError::Apply(error) + } + outbound_hop::UpgradeError::ReservationFailed(_) => { + unreachable!("Do not emitt `ReservationFailed` for outgoing circuit.") + } + }, }; let _ = send_back.send(Err(())); @@ -432,7 +395,7 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< + type Error = StreamUpgradeError< Either, >; type InboundProtocol = inbound_stop::Upgrade; diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 737d820e..7ee1b13f 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -28,13 +28,12 @@ use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; use instant::Instant; -use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - handler::{ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive}, + handler::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError}, SubstreamProtocol, }; use smallvec::SmallVec; @@ -67,7 +66,7 @@ where /// The current connection keep-alive. keep_alive: KeepAlive, /// A pending fatal error that results in the connection being closed. - pending_error: Option>, + pending_error: Option>, /// Queue of events to emit in `poll()`. pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. @@ -140,10 +139,10 @@ where >, ) { match error { - ConnectionHandlerUpgrErr::Timeout => { + StreamUpgradeError::Timeout => { self.pending_events.push_back(Event::OutboundTimeout(info)); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + StreamUpgradeError::NegotiationFailed => { // The remote merely doesn't support the protocol(s) we requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. @@ -166,9 +165,7 @@ where ::InboundProtocol, >, ) { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply( - error, - ))); + self.pending_error = Some(StreamUpgradeError::Apply(error)); } } @@ -244,7 +241,7 @@ where { type InEvent = RequestProtocol; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; + type Error = StreamUpgradeError; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; type OutboundOpenInfo = RequestId; diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index aeab2912..0563b8c2 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -23,13 +23,20 @@ See changelog for `0.42` on how to migrate. See [PR 3884]. +- Remove `ConnectionHandlerUpgrErr::Timer` variant. + This variant was never constructed and thus dead code. + See [PR 3605]. + +- Flatten `ConnectionHandlerUpgrErr` and rename to `StreamUpgradeError`. + See [PR 3882]. + +[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746 [PR 3865]: https://github.com/libp2p/rust-libp2p/pull/3865 -[PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 +[PR 3882]: https://github.com/libp2p/rust-libp2p/pull/3882 [PR 3884]: https://github.com/libp2p/rust-libp2p/pull/3884 -[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 -[PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746 +[PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 ## 0.42.2 diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index b911df6a..1d4d867d 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -32,7 +32,7 @@ use crate::handler::{ FullyNegotiatedOutbound, ListenUpgradeError, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; -use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; +use crate::{ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol}; use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; @@ -41,9 +41,11 @@ use instant::Instant; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, SubstreamBox}; -use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply}; +use libp2p_core::upgrade; +use libp2p_core::upgrade::{ + InboundUpgradeApply, NegotiationError, OutboundUpgradeApply, ProtocolError, +}; use libp2p_core::Endpoint; -use libp2p_core::{upgrade, UpgradeError}; use libp2p_identity::PeerId; use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -220,7 +222,7 @@ where handler.on_connection_event(ConnectionEvent::DialUpgradeError( DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Timeout, + error: StreamUpgradeError::Timeout, }, )); continue; @@ -273,23 +275,21 @@ where )); continue; } - Poll::Ready(Some(( - info, - Err(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error))), - ))) => { + Poll::Ready(Some((info, Err(StreamUpgradeError::Apply(error))))) => { handler.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info, error }, )); continue; } - Poll::Ready(Some(( - _, - Err(ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e))), - ))) => { + Poll::Ready(Some((_, Err(StreamUpgradeError::Io(e))))) => { log::debug!("failed to upgrade inbound stream: {e}"); continue; } - Poll::Ready(Some((_, Err(ConnectionHandlerUpgrErr::Timeout)))) => { + Poll::Ready(Some((_, Err(StreamUpgradeError::NegotiationFailed)))) => { + log::debug!("no protocol could be agreed upon for inbound stream"); + continue; + } + Poll::Ready(Some((_, Err(StreamUpgradeError::Timeout)))) => { log::debug!("inbound stream upgrade timed out"); continue; } @@ -488,11 +488,11 @@ impl Unpin for SubstreamUpgrade {} impl Future for SubstreamUpgrade where - Upgrade: Future>> + Unpin, + Upgrade: Future>> + Unpin, { type Output = ( UserData, - Result>, + Result>, ); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -502,28 +502,34 @@ where self.user_data .take() .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Timeout), + Err(StreamUpgradeError::Timeout), )) } Poll::Pending => {} } - match self.upgrade.poll_unpin(cx) { - Poll::Ready(Ok(upgrade)) => Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Ok(upgrade), - )), - Poll::Ready(Err(err)) => Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Upgrade(err)), - )), - Poll::Pending => Poll::Pending, - } + let result = futures::ready!(self.upgrade.poll_unpin(cx)); + let user_data = self + .user_data + .take() + .expect("Future not to be polled again once ready."); + + Poll::Ready(( + user_data, + result.map_err(|e| match e { + upgrade::UpgradeError::Select(NegotiationError::Failed) => { + StreamUpgradeError::NegotiationFailed + } + upgrade::UpgradeError::Select(NegotiationError::ProtocolError( + ProtocolError::IoError(e), + )) => StreamUpgradeError::Io(e), + upgrade::UpgradeError::Select(NegotiationError::ProtocolError(other)) => { + StreamUpgradeError::Io(io::Error::new(io::ErrorKind::Other, other)) + } + upgrade::UpgradeError::Apply(e) => StreamUpgradeError::Apply(e), + }), + )) } } @@ -683,7 +689,7 @@ mod tests { assert!(matches!( connection.handler.error.unwrap(), - ConnectionHandlerUpgrErr::Timeout + StreamUpgradeError::Timeout )) } @@ -786,7 +792,7 @@ mod tests { struct MockConnectionHandler { outbound_requested: bool, - error: Option>, + error: Option>, upgrade_timeout: Duration, } diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 491ed964..c605e80e 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -4,12 +4,12 @@ use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use crate::{ - ConnectionDenied, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, + THandler, THandlerInEvent, THandlerOutEvent, }; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::Endpoint; -use libp2p_core::{Multiaddr, UpgradeError}; +use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use std::task::{Context, Poll}; use void::Void; @@ -132,9 +132,9 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { protocol, .. }) => void::unreachable(protocol), ConnectionEvent::DialUpgradeError(DialUpgradeError { info: _, error }) => match error { - ConnectionHandlerUpgrErr::Timeout => unreachable!(), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => void::unreachable(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)) => { + StreamUpgradeError::Timeout => unreachable!(), + StreamUpgradeError::Apply(e) => void::unreachable(e), + StreamUpgradeError::NegotiationFailed | StreamUpgradeError::Io(_) => { unreachable!("Denied upgrade does not support any protocols") } }, diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 4ce02c34..e595dccd 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -49,8 +49,8 @@ mod select; pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend}; use instant::Instant; -use libp2p_core::{upgrade::UpgradeError, Multiaddr}; -use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; +use libp2p_core::Multiaddr; +use std::{cmp::Ordering, error, fmt, io, task::Context, task::Poll, time::Duration}; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; @@ -270,7 +270,7 @@ pub struct AddressChange<'a> { /// that upgrading an outbound substream to the given protocol has failed. pub struct DialUpgradeError { pub info: OOI, - pub error: ConnectionHandlerUpgrErr, + pub error: StreamUpgradeError, } /// [`ConnectionEvent`] variant that informs the handler @@ -458,54 +458,67 @@ impl } } +#[deprecated(note = "Renamed to `StreamUpgradeError`")] +pub type ConnectionHandlerUpgrErr = StreamUpgradeError; + /// Error that can happen on an outbound substream opening attempt. #[derive(Debug)] -pub enum ConnectionHandlerUpgrErr { +pub enum StreamUpgradeError { /// The opening attempt timed out before the negotiation was fully completed. Timeout, - /// Error while upgrading the substream to the protocol we want. - Upgrade(UpgradeError), + /// The upgrade produced an error. + Apply(TUpgrErr), + /// No protocol could be agreed upon. + NegotiationFailed, + /// An IO or otherwise unrecoverable error happened. + Io(io::Error), } -impl ConnectionHandlerUpgrErr { - /// Map the inner [`UpgradeError`] type. - pub fn map_upgrade_err(self, f: F) -> ConnectionHandlerUpgrErr +impl StreamUpgradeError { + /// Map the inner [`StreamUpgradeError`] type. + pub fn map_upgrade_err(self, f: F) -> StreamUpgradeError where - F: FnOnce(UpgradeError) -> UpgradeError, + F: FnOnce(TUpgrErr) -> E, { match self { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Upgrade(e) => ConnectionHandlerUpgrErr::Upgrade(f(e)), + StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, + StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)), + StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, + StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e), } } } -impl fmt::Display for ConnectionHandlerUpgrErr +impl fmt::Display for StreamUpgradeError where TUpgrErr: error::Error + 'static, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - ConnectionHandlerUpgrErr::Timeout => { + StreamUpgradeError::Timeout => { write!(f, "Timeout error while opening a substream") } - ConnectionHandlerUpgrErr::Upgrade(err) => { - write!(f, "Upgrade: ")?; + StreamUpgradeError::Apply(err) => { + write!(f, "Apply: ")?; crate::print_error_chain(f, err) } + StreamUpgradeError::NegotiationFailed => { + write!(f, "no protocols could be agreed upon") + } + StreamUpgradeError::Io(e) => { + write!(f, "IO error: ")?; + crate::print_error_chain(f, e) + } } } } -impl error::Error for ConnectionHandlerUpgrErr +impl error::Error for StreamUpgradeError where TUpgrErr: error::Error + 'static, { fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - ConnectionHandlerUpgrErr::Timeout => None, - ConnectionHandlerUpgrErr::Upgrade(_) => None, - } + None } } diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 29ba45ab..2ab45292 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -19,8 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; @@ -37,7 +37,7 @@ where /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: Option::Error>>, + pending_error: Option::Error>>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[TEvent; 4]>, /// Queue of outbound substreams to open. @@ -123,7 +123,7 @@ where { type InEvent = TOutbound; type OutEvent = TEvent; - type Error = ConnectionHandlerUpgrErr<::Error>; + type Error = StreamUpgradeError<::Error>; type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; type OutboundOpenInfo = (); diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 197c8a5a..d8d56396 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -19,14 +19,14 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, - ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - InboundUpgradeSend, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, KeepAlive, + ListenUpgradeError, OutboundUpgradeSend, StreamUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use either::Either; use futures::future; -use libp2p_core::upgrade::{SelectUpgrade, UpgradeError}; +use libp2p_core::upgrade::SelectUpgrade; use std::{cmp, task::Context, task::Poll}; /// Implementation of [`ConnectionHandler`] that combines two protocols into one. @@ -110,47 +110,32 @@ where match self { DialUpgradeError { info: Either::Left(info), - error: ConnectionHandlerUpgrErr::Timeout, + error: StreamUpgradeError::Apply(Either::Left(err)), } => Either::Left(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Timeout, + error: StreamUpgradeError::Apply(err), + }), + DialUpgradeError { + info: Either::Right(info), + error: StreamUpgradeError::Apply(Either::Right(err)), + } => Either::Right(DialUpgradeError { + info, + error: StreamUpgradeError::Apply(err), }), DialUpgradeError { info: Either::Left(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), + error: e, } => Either::Left(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - }), - DialUpgradeError { - info: Either::Left(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(err))), - } => Either::Left(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + error: e.map_upgrade_err(|_| panic!("already handled above")), }), DialUpgradeError { info: Either::Right(info), - error: ConnectionHandlerUpgrErr::Timeout, + error: e, } => Either::Right(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Timeout, + error: e.map_upgrade_err(|_| panic!("already handled above")), }), - DialUpgradeError { - info: Either::Right(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - } => Either::Right(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - }), - DialUpgradeError { - info: Either::Right(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(err))), - } => Either::Right(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), - }), - _ => panic!("Wrong API usage; the upgrade error doesn't match the outbound open info"), } } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 5e7c44d9..11f3e425 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -122,8 +122,8 @@ pub use connection::pool::{ConnectionCounters, ConnectionLimits}; pub use connection::{ConnectionError, ConnectionId}; pub use executor::Executor; pub use handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, - KeepAlive, OneShotHandler, OneShotHandlerConfig, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, KeepAlive, OneShotHandler, + OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol, }; #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour;