feat(swarm): rename Custom variant to NotifyBehaviour

Rename `ConnectionHandlerEvent::Custom` to `ConnectionHandlerEvent::NotifyBehaviour`.

Related #3848.

Pull-Request: #3955.
This commit is contained in:
Thomas Coratger
2023-05-16 21:20:00 +02:00
committed by GitHub
parent fbd471cd79
commit 9f3c85164c
18 changed files with 221 additions and 175 deletions

View File

@ -70,7 +70,7 @@ impl ConnectionHandler for Handler {
> { > {
if !self.reported { if !self.reported {
self.reported = true; self.reported = true;
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::DirectConnectionEstablished, Event::DirectConnectionEstablished,
)); ));
} }

View File

@ -171,12 +171,13 @@ impl Handler {
ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), ConnectedPoint::Dialer { address, role_override: _ } => address.clone(),
ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."), ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."),
}; };
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::InboundConnectRequest { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
inbound_connect: Box::new(inbound_connect), Event::InboundConnectRequest {
remote_addr, inbound_connect: Box::new(inbound_connect),
}, remote_addr,
)); },
));
} }
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated. // A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
future::Either::Right(output) => void::unreachable(output), future::Either::Right(output) => void::unreachable(output),
@ -197,11 +198,12 @@ impl Handler {
self.endpoint.is_listener(), self.endpoint.is_listener(),
"A connection dialer never initiates a connection upgrade." "A connection dialer never initiates a connection upgrade."
); );
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::OutboundConnectNegotiated { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
remote_addrs: obs_addrs, Event::OutboundConnectNegotiated {
}, remote_addrs: obs_addrs,
)); },
));
} }
fn on_listen_upgrade_error( fn on_listen_upgrade_error(
@ -228,21 +230,23 @@ impl Handler {
match error { match error {
StreamUpgradeError::Timeout => { StreamUpgradeError::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::OutboundNegotiationFailed { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
error: StreamUpgradeError::Timeout, Event::OutboundNegotiationFailed {
}, error: StreamUpgradeError::Timeout,
)); },
));
} }
StreamUpgradeError::NegotiationFailed => { StreamUpgradeError::NegotiationFailed => {
// The remote merely doesn't support the DCUtR protocol. // The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may // This is no reason to close the connection, which may
// successfully communicate with other protocols already. // successfully communicate with other protocols already.
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::OutboundNegotiationFailed { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
error: StreamUpgradeError::NegotiationFailed, Event::OutboundNegotiationFailed {
}, error: StreamUpgradeError::NegotiationFailed,
)); },
));
} }
_ => { _ => {
// Anything else is considered a fatal error or misbehaviour of // Anything else is considered a fatal error or misbehaviour of
@ -342,7 +346,7 @@ impl ConnectionHandler for Handler {
self.inbound_connect = None; self.inbound_connect = None;
match result { match result {
Ok(addresses) => { Ok(addresses) => {
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::InboundConnectNegotiated(addresses), Event::InboundConnectNegotiated(addresses),
)); ));
} }

View File

@ -232,9 +232,9 @@ impl EnabledHandler {
if !self.peer_kind_sent { if !self.peer_kind_sent {
if let Some(peer_kind) = self.peer_kind.as_ref() { if let Some(peer_kind) = self.peer_kind.as_ref() {
self.peer_kind_sent = true; self.peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
peer_kind.clone(), HandlerEvent::PeerKind(peer_kind.clone()),
))); ));
} }
} }
@ -261,7 +261,7 @@ impl EnabledHandler {
self.last_io_activity = Instant::now(); self.last_io_activity = Instant::now();
self.inbound_substream = self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream)); Some(InboundSubstreamState::WaitingInput(substream));
return Poll::Ready(ConnectionHandlerEvent::Custom(message)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
} }
Poll::Ready(Some(Err(error))) => { Poll::Ready(Some(Err(error))) => {
log::debug!("Failed to read from inbound stream: {error}"); log::debug!("Failed to read from inbound stream: {error}");
@ -466,9 +466,9 @@ impl ConnectionHandler for Handler {
Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => { Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
if !*peer_kind_sent { if !*peer_kind_sent {
*peer_kind_sent = true; *peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
PeerKind::NotSupported, HandlerEvent::PeerKind(PeerKind::NotSupported),
))); ));
} }
Poll::Pending Poll::Pending

View File

@ -174,13 +174,13 @@ impl Handler {
future::Either::Left(remote_info) => { future::Either::Left(remote_info) => {
self.update_supported_protocols_for_remote(&remote_info); self.update_supported_protocols_for_remote(&remote_info);
self.events self.events
.push(ConnectionHandlerEvent::Custom(Event::Identified( .push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info, remote_info,
))); )));
} }
future::Either::Right(()) => self future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
.events Event::IdentificationPushed,
.push(ConnectionHandlerEvent::Custom(Event::IdentificationPushed)), )),
} }
} }
@ -192,10 +192,9 @@ impl Handler {
>, >,
) { ) {
let err = err.map_upgrade_err(|e| e.into_inner()); let err = err.map_upgrade_err(|e| e.into_inner());
self.events self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
.push(ConnectionHandlerEvent::Custom(Event::IdentificationError( Event::IdentificationError(err),
err, ));
)));
self.trigger_next_identify.reset(self.interval); self.trigger_next_identify.reset(self.interval);
} }
@ -309,7 +308,9 @@ impl ConnectionHandler for Handler {
if let Ok(info) = res { if let Ok(info) = res {
self.update_supported_protocols_for_remote(&info); self.update_supported_protocols_for_remote(&info);
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identified(info))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
} }
} }
@ -319,7 +320,7 @@ impl ConnectionHandler for Handler {
.map(|()| Event::Identification) .map(|()| Event::Identification)
.unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err))); .unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err)));
return Poll::Ready(ConnectionHandlerEvent::Custom(event)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
} }
Poll::Pending Poll::Pending

View File

@ -707,7 +707,7 @@ where
> { > {
if let ProtocolStatus::Confirmed = self.protocol_status { if let ProtocolStatus::Confirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Reported; self.protocol_status = ProtocolStatus::Reported;
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::ProtocolConfirmed { KademliaHandlerEvent::ProtocolConfirmed {
endpoint: self.endpoint.clone(), endpoint: self.endpoint.clone(),
}, },
@ -826,7 +826,7 @@ where
Err(error) => { Err(error) => {
*this = OutboundSubstreamState::Done; *this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| { let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom( ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError { KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error), error: KademliaHandlerQueryErr::Io(error),
user_data, user_data,
@ -844,10 +844,12 @@ where
Poll::Ready(Err(error)) => { Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done; *this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| { let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError { ConnectionHandlerEvent::NotifyBehaviour(
error: KademliaHandlerQueryErr::Io(error), KademliaHandlerEvent::QueryError {
user_data, error: KademliaHandlerQueryErr::Io(error),
}) user_data,
},
)
}); });
return Poll::Ready(event); return Poll::Ready(event);
@ -870,10 +872,12 @@ where
Poll::Ready(Err(error)) => { Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done; *this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| { let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError { ConnectionHandlerEvent::NotifyBehaviour(
error: KademliaHandlerQueryErr::Io(error), KademliaHandlerEvent::QueryError {
user_data, error: KademliaHandlerQueryErr::Io(error),
}) user_data,
},
)
}); });
return Poll::Ready(event); return Poll::Ready(event);
@ -886,7 +890,9 @@ where
*this = OutboundSubstreamState::Closing(substream); *this = OutboundSubstreamState::Closing(substream);
let event = process_kad_response(msg, user_data); let event = process_kad_response(msg, user_data);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
} }
Poll::Pending => { Poll::Pending => {
*this = OutboundSubstreamState::WaitingAnswer(substream, user_data); *this = OutboundSubstreamState::WaitingAnswer(substream, user_data);
@ -899,7 +905,9 @@ where
user_data, user_data,
}; };
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
} }
Poll::Ready(None) => { Poll::Ready(None) => {
*this = OutboundSubstreamState::Done; *this = OutboundSubstreamState::Done;
@ -910,7 +918,9 @@ where
user_data, user_data,
}; };
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
} }
} }
} }
@ -918,7 +928,7 @@ where
*this = OutboundSubstreamState::Done; *this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::QueryError { error, user_data }; let event = KademliaHandlerEvent::QueryError { error, user_data };
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event)));
} }
OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) { OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None), Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None),
@ -971,7 +981,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => { Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => {
*this = *this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom( return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::FindNodeReq { KademliaHandlerEvent::FindNodeReq {
key, key,
request_id: KademliaRequestId { request_id: KademliaRequestId {
@ -983,7 +993,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => { Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => {
*this = *this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom( return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetProvidersReq { KademliaHandlerEvent::GetProvidersReq {
key, key,
request_id: KademliaRequestId { request_id: KademliaRequestId {
@ -998,14 +1008,14 @@ where
connection_id, connection_id,
substream, substream,
}; };
return Poll::Ready(Some(ConnectionHandlerEvent::Custom( return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::AddProvider { key, provider }, KademliaHandlerEvent::AddProvider { key, provider },
))); )));
} }
Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => { Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => {
*this = *this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom( return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetRecord { KademliaHandlerEvent::GetRecord {
key, key,
request_id: KademliaRequestId { request_id: KademliaRequestId {
@ -1017,7 +1027,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => { Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => {
*this = *this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom( return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::PutRecord { KademliaHandlerEvent::PutRecord {
record, record,
request_id: KademliaRequestId { request_id: KademliaRequestId {

View File

@ -146,7 +146,7 @@ impl ConnectionHandler for Handler {
.pop_front() .pop_front()
.expect("requested stream without pending command"); .expect("requested stream without pending command");
self.queued_events self.queued_events
.push_back(ConnectionHandlerEvent::Custom(Event { .push_back(ConnectionHandlerEvent::NotifyBehaviour(Event {
id, id,
result: Err(error), result: Err(error),
})); }));
@ -179,7 +179,7 @@ impl ConnectionHandler for Handler {
while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) { while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) {
match result { match result {
Ok(event) => return Poll::Ready(ConnectionHandlerEvent::Custom(event)), Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)),
Err(e) => { Err(e) => {
panic!("{e:?}") panic!("{e:?}")
} }

View File

@ -129,7 +129,9 @@ impl ConnectionHandler for Handler {
> { > {
while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
match result { match result {
Ok(stats) => return Poll::Ready(ConnectionHandlerEvent::Custom(Event { stats })), Ok(stats) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats }))
}
Err(e) => { Err(e) => {
error!("{e:?}") error!("{e:?}")
} }

View File

@ -258,7 +258,9 @@ impl ConnectionHandler for Handler {
} }
State::Inactive { reported: false } => { State::Inactive { reported: false } => {
self.state = State::Inactive { reported: true }; self.state = State::Inactive { reported: true };
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(Failure::Unsupported))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
Failure::Unsupported,
)));
} }
State::Active => {} State::Active => {}
} }
@ -274,7 +276,7 @@ impl ConnectionHandler for Handler {
Poll::Ready(Ok(stream)) => { Poll::Ready(Ok(stream)) => {
// A ping from a remote peer has been answered, wait for the next. // A ping from a remote peer has been answered, wait for the next.
self.inbound = Some(protocol::recv_ping(stream).boxed()); self.inbound = Some(protocol::recv_ping(stream).boxed());
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Pong))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(Success::Pong)));
} }
} }
} }
@ -299,7 +301,7 @@ impl ConnectionHandler for Handler {
return Poll::Ready(ConnectionHandlerEvent::Close(error)); return Poll::Ready(ConnectionHandlerEvent::Close(error));
} }
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
} }
} }
@ -318,9 +320,9 @@ impl ConnectionHandler for Handler {
self.failures = 0; self.failures = 0;
self.timer.reset(self.config.interval); self.timer.reset(self.config.interval);
self.outbound = Some(OutboundState::Idle(stream)); self.outbound = Some(OutboundState::Idle(stream));
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Ping { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(
rtt, Success::Ping { rtt },
}))); )));
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
self.pending_errors self.pending_errors

View File

@ -410,21 +410,23 @@ impl Handler {
) { ) {
match request { match request {
inbound_hop::Req::Reserve(inbound_reservation_req) => { inbound_hop::Req::Reserve(inbound_reservation_req) => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::ReservationReqReceived { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
inbound_reservation_req, Event::ReservationReqReceived {
endpoint: self.endpoint.clone(), inbound_reservation_req,
renewed: self.active_reservation.is_some(), endpoint: self.endpoint.clone(),
}, renewed: self.active_reservation.is_some(),
)); },
));
} }
inbound_hop::Req::Connect(inbound_circuit_req) => { inbound_hop::Req::Connect(inbound_circuit_req) => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::CircuitReqReceived { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
inbound_circuit_req, Event::CircuitReqReceived {
endpoint: self.endpoint.clone(), inbound_circuit_req,
}, endpoint: self.endpoint.clone(),
)); },
));
} }
} }
} }
@ -448,17 +450,18 @@ impl Handler {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx); self.alive_lend_out_substreams.push(rx);
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::OutboundConnectNegotiated { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
circuit_id, Event::OutboundConnectNegotiated {
src_peer_id, circuit_id,
src_connection_id, src_peer_id,
inbound_circuit_req, src_connection_id,
dst_handler_notifier: tx, inbound_circuit_req,
dst_stream, dst_handler_notifier: tx,
dst_pending_data, dst_stream,
}, dst_pending_data,
)); },
));
} }
fn on_listen_upgrade_error( fn on_listen_upgrade_error(
@ -525,16 +528,17 @@ impl Handler {
src_connection_id, src_connection_id,
} = open_info; } = open_info;
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::OutboundConnectNegotiationFailed { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
circuit_id, Event::OutboundConnectNegotiationFailed {
src_peer_id, circuit_id,
src_connection_id, src_peer_id,
inbound_circuit_req, src_connection_id,
status, inbound_circuit_req,
error: non_fatal_error, status,
}, error: non_fatal_error,
)); },
));
} }
} }
@ -692,18 +696,22 @@ impl ConnectionHandler for Handler {
{ {
match result { match result {
Ok(()) => { Ok(()) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitClosed { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
circuit_id, Event::CircuitClosed {
dst_peer_id, circuit_id,
error: None, dst_peer_id,
})) error: None,
},
))
} }
Err(e) => { Err(e) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitClosed { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
circuit_id, Event::CircuitClosed {
dst_peer_id, circuit_id,
error: Some(e), dst_peer_id,
})) error: Some(e),
},
))
} }
} }
} }
@ -714,13 +722,15 @@ impl ConnectionHandler for Handler {
{ {
match result { match result {
Ok(()) => { Ok(()) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
circuit_id, Event::CircuitReqDenied {
dst_peer_id, circuit_id,
})); dst_peer_id,
},
));
} }
Err(error) => { Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::CircuitReqDenyFailed { Event::CircuitReqDenyFailed {
circuit_id, circuit_id,
dst_peer_id, dst_peer_id,
@ -773,7 +783,7 @@ impl ConnectionHandler for Handler {
self.circuits.push(circuit); self.circuits.push(circuit);
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::CircuitReqAccepted { Event::CircuitReqAccepted {
circuit_id, circuit_id,
dst_peer_id, dst_peer_id,
@ -781,7 +791,7 @@ impl ConnectionHandler for Handler {
)); ));
} }
Err((circuit_id, dst_peer_id, error)) => { Err((circuit_id, dst_peer_id, error)) => {
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::CircuitReqAcceptFailed { Event::CircuitReqAcceptFailed {
circuit_id, circuit_id,
dst_peer_id, dst_peer_id,
@ -799,7 +809,7 @@ impl ConnectionHandler for Handler {
.map(|fut| fut.poll_unpin(cx)) .map(|fut| fut.poll_unpin(cx))
{ {
self.active_reservation = None; self.active_reservation = None;
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::ReservationTimedOut {}, Event::ReservationTimedOut {},
)); ));
} }
@ -816,12 +826,12 @@ impl ConnectionHandler for Handler {
.active_reservation .active_reservation
.replace(Delay::new(self.config.reservation_duration)) .replace(Delay::new(self.config.reservation_duration))
.is_some(); .is_some();
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::ReservationReqAccepted { renewed }, Event::ReservationReqAccepted { renewed },
)); ));
} }
Err(error) => { Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::ReservationReqAcceptFailed { error }, Event::ReservationReqAcceptFailed { error },
)); ));
} }
@ -834,12 +844,12 @@ impl ConnectionHandler for Handler {
match result { match result {
Ok(()) => { Ok(()) => {
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::ReservationReqDenied {}, Event::ReservationReqDenied {},
)) ))
} }
Err(error) => { Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::ReservationReqDenyFailed { error }, Event::ReservationReqDenyFailed { error },
)); ));
} }

View File

@ -194,9 +194,10 @@ impl Handler {
relay_addr: self.remote_addr.clone(), relay_addr: self.remote_addr.clone(),
}); });
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::InboundCircuitEstablished { src_peer_id, limit }, .push_back(ConnectionHandlerEvent::NotifyBehaviour(
)); Event::InboundCircuitEstablished { src_peer_id, limit },
));
} }
Reservation::None => { Reservation::None => {
let src_peer_id = inbound_circuit.src_peer_id(); let src_peer_id = inbound_circuit.src_peer_id();
@ -254,7 +255,7 @@ impl Handler {
); );
self.queued_events self.queued_events
.push_back(ConnectionHandlerEvent::Custom(event)); .push_back(ConnectionHandlerEvent::NotifyBehaviour(event));
} }
// Outbound circuit // Outbound circuit
@ -272,9 +273,10 @@ impl Handler {
})) { })) {
Ok(()) => { Ok(()) => {
self.alive_lend_out_substreams.push(rx); self.alive_lend_out_substreams.push(rx);
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::OutboundCircuitEstablished { limit }, .push_back(ConnectionHandlerEvent::NotifyBehaviour(
)); Event::OutboundCircuitEstablished { limit },
));
} }
Err(_) => debug!( Err(_) => debug!(
"Oneshot to `client::transport::Dial` future dropped. \ "Oneshot to `client::transport::Dial` future dropped. \
@ -350,12 +352,13 @@ impl Handler {
} }
let renewal = self.reservation.failed(); let renewal = self.reservation.failed();
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::ReservationReqFailed { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
renewal, Event::ReservationReqFailed {
error: non_fatal_error, renewal,
}, error: non_fatal_error,
)); },
));
} }
OutboundOpenInfo::Connect { send_back } => { OutboundOpenInfo::Connect { send_back } => {
let non_fatal_error = match error { let non_fatal_error = match error {
@ -382,11 +385,12 @@ impl Handler {
let _ = send_back.send(Err(())); let _ = send_back.send(Err(()));
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events
Event::OutboundCircuitReqFailed { .push_back(ConnectionHandlerEvent::NotifyBehaviour(
error: non_fatal_error, Event::OutboundCircuitReqFailed {
}, error: non_fatal_error,
)); },
));
} }
} }
} }
@ -485,7 +489,7 @@ impl ConnectionHandler for Handler {
}); });
if let Some((src_peer_id, event)) = maybe_event { if let Some((src_peer_id, event)) = maybe_event {
self.circuit_deny_futs.remove(&src_peer_id); self.circuit_deny_futs.remove(&src_peer_id);
return Poll::Ready(ConnectionHandlerEvent::Custom(event)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
} }
// Send errors to transport. // Send errors to transport.

View File

@ -468,32 +468,28 @@ where
match poll_substreams(&mut self.inbound_substreams, cx) { match poll_substreams(&mut self.inbound_substreams, cx) {
Poll::Ready(Ok((id, message))) => { Poll::Ready(Ok((id, message))) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::InboundEvent { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
id, OutEvent::InboundEvent { id, message },
message, ))
}))
} }
Poll::Ready(Err((id, error))) => { Poll::Ready(Err((id, error))) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::InboundError { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
id, OutEvent::InboundError { id, error },
error, ))
}))
} }
Poll::Pending => {} Poll::Pending => {}
} }
match poll_substreams(&mut self.outbound_substreams, cx) { match poll_substreams(&mut self.outbound_substreams, cx) {
Poll::Ready(Ok((id, message))) => { Poll::Ready(Ok((id, message))) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::OutboundEvent { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
id, OutEvent::OutboundEvent { id, message },
message, ))
}))
} }
Poll::Ready(Err((id, error))) => { Poll::Ready(Err((id, error))) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::OutboundError { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
id, OutEvent::OutboundError { id, error },
error, ))
}))
} }
Poll::Pending => {} Poll::Pending => {}
} }

View File

@ -296,7 +296,7 @@ where
> { > {
// Drain pending events. // Drain pending events.
if let Some(event) = self.pending_events.pop_front() { if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ConnectionHandlerEvent::Custom(event)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
} else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_events.shrink_to_fit(); self.pending_events.shrink_to_fit();
} }
@ -307,7 +307,7 @@ where
Ok(((id, rq), rs_sender)) => { Ok(((id, rq), rs_sender)) => {
// We received an inbound request. // We received an inbound request.
self.keep_alive = KeepAlive::Yes; self.keep_alive = KeepAlive::Yes;
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Request { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request {
request_id: id, request_id: id,
request: rq, request: rq,
sender: rs_sender, sender: rs_sender,

View File

@ -50,6 +50,8 @@
- Remove deprecated `NetworkBehaviourAction` type. - Remove deprecated `NetworkBehaviourAction` type.
See [PR 3919]. See [PR 3919].
- Rename `ConnectionHandlerEvent::Custom` to `ConnectionHandlerEvent::NotifyBehaviour`. See [PR 3955].
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651 [PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
@ -62,6 +64,7 @@
[PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 [PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886
[PR 3912]: https://github.com/libp2p/rust-libp2p/pull/3912 [PR 3912]: https://github.com/libp2p/rust-libp2p/pull/3912
[PR 3919]: https://github.com/libp2p/rust-libp2p/pull/3919 [PR 3919]: https://github.com/libp2p/rust-libp2p/pull/3919
[PR 3955]: https://github.com/libp2p/rust-libp2p/pull/3955
## 0.42.2 ## 0.42.2

View File

@ -263,7 +263,7 @@ where
requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade)); requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade));
continue; // Poll handler until exhausted. continue; // Poll handler until exhausted.
} }
Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => {
return Poll::Ready(Ok(Event::Handler(event))); return Poll::Ready(Ok(Event::Handler(event)));
} }
Poll::Ready(ConnectionHandlerEvent::Close(err)) => { Poll::Ready(ConnectionHandlerEvent::Close(err)) => {

View File

@ -101,7 +101,7 @@ use std::{cmp::Ordering, error, fmt, io, task::Context, task::Poll, time::Durati
pub trait ConnectionHandler: Send + 'static { pub trait ConnectionHandler: Send + 'static {
/// A type representing the message(s) a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`] via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler) /// A type representing the message(s) a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`] via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler)
type FromBehaviour: fmt::Debug + Send + 'static; type FromBehaviour: fmt::Debug + Send + 'static;
/// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::Custom`]. /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::NotifyBehaviour`].
type ToBehaviour: fmt::Debug + Send + 'static; type ToBehaviour: fmt::Debug + Send + 'static;
/// The type of errors returned by [`ConnectionHandler::poll`]. /// The type of errors returned by [`ConnectionHandler::poll`].
type Error: error::Error + fmt::Debug + Send + 'static; type Error: error::Error + fmt::Debug + Send + 'static;
@ -508,8 +508,8 @@ pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom,
/// We learned something about the protocols supported by the remote. /// We learned something about the protocols supported by the remote.
ReportRemoteProtocols(ProtocolSupport), ReportRemoteProtocols(ProtocolSupport),
/// Other event. /// Event that is sent to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour).
Custom(TCustom), NotifyBehaviour(TCustom),
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
@ -539,7 +539,9 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
protocol: protocol.map_info(map), protocol: protocol.map_info(map),
} }
} }
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), ConnectionHandlerEvent::NotifyBehaviour(val) => {
ConnectionHandlerEvent::NotifyBehaviour(val)
}
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
ConnectionHandlerEvent::ReportRemoteProtocols(support) ConnectionHandlerEvent::ReportRemoteProtocols(support)
@ -562,7 +564,9 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
protocol: protocol.map_upgrade(map), protocol: protocol.map_upgrade(map),
} }
} }
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), ConnectionHandlerEvent::NotifyBehaviour(val) => {
ConnectionHandlerEvent::NotifyBehaviour(val)
}
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
ConnectionHandlerEvent::ReportRemoteProtocols(support) ConnectionHandlerEvent::ReportRemoteProtocols(support)
@ -582,7 +586,9 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
} }
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)), ConnectionHandlerEvent::NotifyBehaviour(val) => {
ConnectionHandlerEvent::NotifyBehaviour(map(val))
}
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
ConnectionHandlerEvent::ReportRemoteProtocols(support) ConnectionHandlerEvent::ReportRemoteProtocols(support)
@ -602,7 +608,9 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
} }
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), ConnectionHandlerEvent::NotifyBehaviour(val) => {
ConnectionHandlerEvent::NotifyBehaviour(val)
}
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)), ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)),
ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
ConnectionHandlerEvent::ReportRemoteProtocols(support) ConnectionHandlerEvent::ReportRemoteProtocols(support)

View File

@ -76,7 +76,9 @@ where
>, >,
> { > {
self.inner.poll(cx).map(|ev| match ev { self.inner.poll(cx).map(|ev| match ev {
ConnectionHandlerEvent::Custom(ev) => ConnectionHandlerEvent::Custom((self.map)(ev)), ConnectionHandlerEvent::NotifyBehaviour(ev) => {
ConnectionHandlerEvent::NotifyBehaviour((self.map)(ev))
}
ConnectionHandlerEvent::Close(err) => ConnectionHandlerEvent::Close(err), ConnectionHandlerEvent::Close(err) => ConnectionHandlerEvent::Close(err),
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }

View File

@ -157,7 +157,9 @@ where
} }
if !self.events_out.is_empty() { if !self.events_out.is_empty() {
return Poll::Ready(ConnectionHandlerEvent::Custom(self.events_out.remove(0))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
self.events_out.remove(0),
));
} else { } else {
self.events_out.shrink_to_fit(); self.events_out.shrink_to_fit();
} }

View File

@ -227,8 +227,8 @@ where
>, >,
> { > {
match self.proto1.poll(cx) { match self.proto1.poll(cx) {
Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Left(event))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Left(event)));
} }
Poll::Ready(ConnectionHandlerEvent::Close(event)) => { Poll::Ready(ConnectionHandlerEvent::Close(event)) => {
return Poll::Ready(ConnectionHandlerEvent::Close(Either::Left(event))); return Poll::Ready(ConnectionHandlerEvent::Close(Either::Left(event)));
@ -247,8 +247,10 @@ where
}; };
match self.proto2.poll(cx) { match self.proto2.poll(cx) {
Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Right(event))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Right(
event,
)));
} }
Poll::Ready(ConnectionHandlerEvent::Close(event)) => { Poll::Ready(ConnectionHandlerEvent::Close(event)) => {
return Poll::Ready(ConnectionHandlerEvent::Close(Either::Right(event))); return Poll::Ready(ConnectionHandlerEvent::Close(Either::Right(event)));