swarm/handler: replace inject_* methods (#3085)

Previously, we had one callback for each kind of message that a `ConnectionHandler` would receive from either its `NetworkBehaviour` or the connection itself.

With this patch, we combine these functions, resulting in two callbacks:

- `on_behaviour_event`
- `on_connection_event`

Resolves #3080.
This commit is contained in:
João Oliveira
2022-11-17 17:19:36 +00:00
committed by GitHub
parent 6d49bf4a53
commit 7803524a76
30 changed files with 1718 additions and 1094 deletions

View File

@ -31,10 +31,13 @@ use instant::Instant;
use libp2p_core::either::EitherError;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError, SendWrapper,
};
use libp2p_swarm::{
dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
IntoConnectionHandler, KeepAlive, SubstreamProtocol,
};
use log::debug;
use std::collections::{HashMap, VecDeque};
@ -152,7 +155,7 @@ impl IntoConnectionHandler for Prototype {
};
if let Some(event) = self.initial_in {
handler.inject_event(event)
handler.on_behaviour_event(event)
}
Either::Left(handler)
@ -209,25 +212,16 @@ pub struct Handler {
send_error_futs: FuturesUnordered<BoxFuture<'static, ()>>,
}
impl ConnectionHandler for Handler {
type InEvent = In;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
EitherError<inbound_stop::FatalUpgradeError, outbound_hop::FatalUpgradeError>,
>;
type InboundProtocol = inbound_stop::Upgrade;
type OutboundProtocol = outbound_hop::Upgrade;
type OutboundOpenInfo = OutboundOpenInfo;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(inbound_stop::Upgrade {}, ())
}
fn inject_fully_negotiated_inbound(
impl Handler {
fn on_fully_negotiated_inbound(
&mut self,
inbound_circuit: inbound_stop::Circuit,
_: Self::InboundOpenInfo,
FullyNegotiatedInbound {
protocol: inbound_circuit,
..
}: FullyNegotiatedInbound<
<Self as ConnectionHandler>::InboundProtocol,
<Self as ConnectionHandler>::InboundOpenInfo,
>,
) {
match &mut self.reservation {
Reservation::Accepted { pending_msgs, .. }
@ -280,10 +274,15 @@ impl ConnectionHandler for Handler {
}
}
fn inject_fully_negotiated_outbound(
fn on_fully_negotiated_outbound(
&mut self,
output: <Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Output,
info: Self::OutboundOpenInfo,
FullyNegotiatedOutbound {
protocol: output,
info,
}: FullyNegotiatedOutbound<
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
>,
) {
match (output, info) {
// Outbound reservation
@ -340,36 +339,12 @@ impl ConnectionHandler for Handler {
}
}
fn inject_event(&mut self, event: Self::InEvent) {
match event {
In::Reserve { to_listener } => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
outbound_hop::Upgrade::Reserve,
OutboundOpenInfo::Reserve { to_listener },
),
});
}
In::EstablishCircuit {
send_back,
dst_peer_id,
} => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
outbound_hop::Upgrade::Connect { dst_peer_id },
OutboundOpenInfo::Connect { send_back },
),
});
}
}
}
fn inject_listen_upgrade_error(
fn on_listen_upgrade_error(
&mut self,
_: Self::InboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
ListenUpgradeError { error, .. }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
let non_fatal_error = match error {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
@ -404,10 +379,15 @@ impl ConnectionHandler for Handler {
));
}
fn inject_dial_upgrade_error(
fn on_dial_upgrade_error(
&mut self,
open_info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
DialUpgradeError {
info: open_info,
error,
}: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
match open_info {
OutboundOpenInfo::Reserve { mut to_listener } => {
@ -524,6 +504,48 @@ impl ConnectionHandler for Handler {
}
}
}
}
impl ConnectionHandler for Handler {
type InEvent = In;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
EitherError<inbound_stop::FatalUpgradeError, outbound_hop::FatalUpgradeError>,
>;
type InboundProtocol = inbound_stop::Upgrade;
type OutboundProtocol = outbound_hop::Upgrade;
type OutboundOpenInfo = OutboundOpenInfo;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(inbound_stop::Upgrade {}, ())
}
fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
In::Reserve { to_listener } => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
outbound_hop::Upgrade::Reserve,
OutboundOpenInfo::Reserve { to_listener },
),
});
}
In::EstablishCircuit {
send_back,
dst_peer_id,
} => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
outbound_hop::Upgrade::Connect { dst_peer_id },
OutboundOpenInfo::Connect { send_back },
),
});
}
}
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
@ -610,6 +632,32 @@ impl ConnectionHandler for Handler {
Poll::Pending
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
self.on_fully_negotiated_inbound(fully_negotiated_inbound)
}
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error)
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) => {}
}
}
}
enum Reservation {

View File

@ -33,8 +33,10 @@ use instant::Instant;
use libp2p_core::connection::ConnectionId;
use libp2p_core::either::EitherError;
use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::handler::SendWrapper;
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend};
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError, SendWrapper,
};
use libp2p_swarm::{
dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
@ -429,6 +431,188 @@ pub struct Handler {
circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
}
impl Handler {
fn on_fully_negotiated_inbound(
&mut self,
FullyNegotiatedInbound {
protocol: request, ..
}: FullyNegotiatedInbound<
<Self as ConnectionHandler>::InboundProtocol,
<Self as ConnectionHandler>::InboundOpenInfo,
>,
) {
match request {
inbound_hop::Req::Reserve(inbound_reservation_req) => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::ReservationReqReceived {
inbound_reservation_req,
endpoint: self.endpoint.clone(),
renewed: self.active_reservation.is_some(),
},
));
}
inbound_hop::Req::Connect(inbound_circuit_req) => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::CircuitReqReceived {
inbound_circuit_req,
endpoint: self.endpoint.clone(),
},
));
}
}
}
fn on_fully_negotiated_outbound(
&mut self,
FullyNegotiatedOutbound {
protocol: (dst_stream, dst_pending_data),
info: outbound_open_info,
}: FullyNegotiatedOutbound<
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
>,
) {
let OutboundOpenInfo {
circuit_id,
inbound_circuit_req,
src_peer_id,
src_connection_id,
} = outbound_open_info;
let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx);
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundConnectNegotiated {
circuit_id,
src_peer_id,
src_connection_id,
inbound_circuit_req,
dst_handler_notifier: tx,
dst_stream,
dst_pending_data,
},
));
}
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { error, .. }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
let non_fatal_error = match error {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)),
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::ProtocolError(e),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(
inbound_hop::UpgradeError::Fatal(error),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(EitherError::A(error)),
));
return;
}
};
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::CircuitReqReceiveFailed {
error: non_fatal_error,
},
));
}
fn on_dial_upgrade_error(
&mut self,
DialUpgradeError {
info: open_info,
error,
}: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
let (non_fatal_error, status) = match error {
ConnectionHandlerUpgrErr::Timeout => {
(ConnectionHandlerUpgrErr::Timeout, Status::ConnectionFailed)
}
ConnectionHandlerUpgrErr::Timer => {
(ConnectionHandlerUpgrErr::Timer, Status::ConnectionFailed)
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => {
// The remote has previously done a reservation. Doing a reservation but not
// supporting the stop protocol is pointless, thus disconnecting.
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::ProtocolError(e),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => match error {
outbound_stop::UpgradeError::Fatal(error) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(EitherError::B(error)),
));
return;
}
outbound_stop::UpgradeError::CircuitFailed(error) => {
let status = match error {
outbound_stop::CircuitFailedReason::ResourceLimitExceeded => {
Status::ResourceLimitExceeded
}
outbound_stop::CircuitFailedReason::PermissionDenied => {
Status::PermissionDenied
}
};
(
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)),
status,
)
}
},
};
let OutboundOpenInfo {
circuit_id,
inbound_circuit_req,
src_peer_id,
src_connection_id,
} = open_info;
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundConnectNegotiationFailed {
circuit_id,
src_peer_id,
src_connection_id,
inbound_circuit_req,
status,
error: non_fatal_error,
},
));
}
}
enum ReservationRequestFuture {
Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>),
Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>),
@ -458,62 +642,7 @@ impl ConnectionHandler for Handler {
)
}
fn inject_fully_negotiated_inbound(
&mut self,
request: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo,
) {
match request {
inbound_hop::Req::Reserve(inbound_reservation_req) => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::ReservationReqReceived {
inbound_reservation_req,
endpoint: self.endpoint.clone(),
renewed: self.active_reservation.is_some(),
},
));
}
inbound_hop::Req::Connect(inbound_circuit_req) => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::CircuitReqReceived {
inbound_circuit_req,
endpoint: self.endpoint.clone(),
},
));
}
}
}
fn inject_fully_negotiated_outbound(
&mut self,
(dst_stream, dst_pending_data): <Self::OutboundProtocol as upgrade::OutboundUpgrade<
NegotiatedSubstream,
>>::Output,
outbound_open_info: Self::OutboundOpenInfo,
) {
let OutboundOpenInfo {
circuit_id,
inbound_circuit_req,
src_peer_id,
src_connection_id,
} = outbound_open_info;
let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx);
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundConnectNegotiated {
circuit_id,
src_peer_id,
src_connection_id,
inbound_circuit_req,
dst_handler_notifier: tx,
dst_stream,
dst_pending_data,
},
));
}
fn inject_event(&mut self, event: Self::InEvent) {
fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
In::AcceptReservationReq {
inbound_reservation_req,
@ -607,117 +736,6 @@ impl ConnectionHandler for Handler {
}
}
fn inject_listen_upgrade_error(
&mut self,
_: Self::InboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
) {
let non_fatal_error = match error {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)),
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::ProtocolError(e),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(
inbound_hop::UpgradeError::Fatal(error),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(EitherError::A(error)),
));
return;
}
};
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::CircuitReqReceiveFailed {
error: non_fatal_error,
},
));
}
fn inject_dial_upgrade_error(
&mut self,
open_info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
) {
let (non_fatal_error, status) = match error {
ConnectionHandlerUpgrErr::Timeout => {
(ConnectionHandlerUpgrErr::Timeout, Status::ConnectionFailed)
}
ConnectionHandlerUpgrErr::Timer => {
(ConnectionHandlerUpgrErr::Timer, Status::ConnectionFailed)
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::Failed,
)) => {
// The remote has previously done a reservation. Doing a reservation but not
// supporting the stop protocol is pointless, thus disconnecting.
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select(
upgrade::NegotiationError::ProtocolError(e),
)) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)),
));
return;
}
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => match error {
outbound_stop::UpgradeError::Fatal(error) => {
self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(
upgrade::UpgradeError::Apply(EitherError::B(error)),
));
return;
}
outbound_stop::UpgradeError::CircuitFailed(error) => {
let status = match error {
outbound_stop::CircuitFailedReason::ResourceLimitExceeded => {
Status::ResourceLimitExceeded
}
outbound_stop::CircuitFailedReason::PermissionDenied => {
Status::PermissionDenied
}
};
(
ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)),
status,
)
}
},
};
let OutboundOpenInfo {
circuit_id,
inbound_circuit_req,
src_peer_id,
src_connection_id,
} = open_info;
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundConnectNegotiationFailed {
circuit_id,
src_peer_id,
src_connection_id,
inbound_circuit_req,
status,
error: non_fatal_error,
},
));
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
@ -933,6 +951,32 @@ impl ConnectionHandler for Handler {
Poll::Pending
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
self.on_fully_negotiated_inbound(fully_negotiated_inbound)
}
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error)
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) => {}
}
}
}
pub struct OutboundOpenInfo {