refactor(autonat): buffer entire actions instead of events (#3241)

Previously, we used to buffer events separately and emit actions directly. That is unnecessary. We can have a single place where we return from the `poll` loop and shove all actions into the same buffer.
This commit is contained in:
Thomas Eizinger
2022-12-16 03:22:55 +11:00
committed by GitHub
parent 856e7516ef
commit f6f42968e2
3 changed files with 98 additions and 68 deletions

View File

@ -209,7 +209,12 @@ pub struct Behaviour {
last_probe: Option<Instant>, last_probe: Option<Instant>,
pending_out_events: VecDeque<<Self as NetworkBehaviour>::OutEvent>, pending_actions: VecDeque<
NetworkBehaviourAction<
<Self as NetworkBehaviour>::OutEvent,
<Self as NetworkBehaviour>::ConnectionHandler,
>,
>,
probe_id: ProbeId, probe_id: ProbeId,
@ -237,7 +242,7 @@ impl Behaviour {
throttled_servers: Vec::new(), throttled_servers: Vec::new(),
throttled_clients: Vec::new(), throttled_clients: Vec::new(),
last_probe: None, last_probe: None,
pending_out_events: VecDeque::new(), pending_actions: VecDeque::new(),
probe_id: ProbeId(0), probe_id: ProbeId(0),
listen_addresses: Default::default(), listen_addresses: Default::default(),
external_addresses: Default::default(), external_addresses: Default::default(),
@ -334,8 +339,10 @@ impl Behaviour {
role_override: Endpoint::Dialer, role_override: Endpoint::Dialer,
} => { } => {
if let Some(event) = self.as_server().on_outbound_connection(&peer, address) { if let Some(event) = self.as_server().on_outbound_connection(&peer, address) {
self.pending_out_events self.pending_actions
.push_back(Event::InboundProbe(event)); .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe(
event,
)));
} }
} }
ConnectedPoint::Dialer { ConnectedPoint::Dialer {
@ -395,8 +402,10 @@ impl Behaviour {
error, error,
})); }));
if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) {
self.pending_out_events self.pending_actions
.push_back(Event::InboundProbe(event)); .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe(
event,
)));
} }
} }
@ -431,14 +440,13 @@ impl NetworkBehaviour for Behaviour {
fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll<Action> { fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll<Action> {
loop { loop {
if let Some(event) = self.pending_out_events.pop_front() { if let Some(event) = self.pending_actions.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(event);
} }
let mut is_inner_pending = false;
match self.inner.poll(cx, params) { match self.inner.poll(cx, params) {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
let (mut events, action) = match event { let actions = match event {
request_response::Event::Message { request_response::Event::Message {
message: request_response::Message::Response { .. }, message: request_response::Message::Response { .. },
.. ..
@ -453,24 +461,32 @@ impl NetworkBehaviour for Behaviour {
| request_response::Event::InboundFailure { .. } => { | request_response::Event::InboundFailure { .. } => {
self.as_server().handle_event(params, event) self.as_server().handle_event(params, event)
} }
request_response::Event::ResponseSent { .. } => (VecDeque::new(), None), request_response::Event::ResponseSent { .. } => VecDeque::new(),
}; };
self.pending_out_events.append(&mut events);
if let Some(action) = action { self.pending_actions.extend(actions);
return Poll::Ready(action); continue;
}
} }
Poll::Ready(action) => return Poll::Ready(action.map_out(|_| unreachable!())), Poll::Ready(action) => {
Poll::Pending => is_inner_pending = true, self.pending_actions
.push_back(action.map_out(|_| unreachable!()));
continue;
}
Poll::Pending => {}
} }
match self.as_client().poll_auto_probe(cx) { match self.as_client().poll_auto_probe(cx) {
Poll::Ready(event) => self Poll::Ready(event) => {
.pending_out_events self.pending_actions
.push_back(Event::OutboundProbe(event)), .push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe(
Poll::Pending if is_inner_pending => return Poll::Pending, event,
)));
continue;
}
Poll::Pending => {} Poll::Pending => {}
} }
return Poll::Pending;
} }
} }
@ -558,7 +574,7 @@ trait HandleInnerEvent {
&mut self, &mut self,
params: &mut impl PollParameters, params: &mut impl PollParameters,
event: request_response::Event<DialRequest, DialResponse>, event: request_response::Event<DialRequest, DialResponse>,
) -> (VecDeque<Event>, Option<Action>); ) -> VecDeque<Action>;
} }
trait GlobalIp { trait GlobalIp {

View File

@ -109,9 +109,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
&mut self, &mut self,
params: &mut impl PollParameters, params: &mut impl PollParameters,
event: request_response::Event<DialRequest, DialResponse>, event: request_response::Event<DialRequest, DialResponse>,
) -> (VecDeque<Event>, Option<Action>) { ) -> VecDeque<Action> {
let mut events = VecDeque::new();
let mut action = None;
match event { match event {
request_response::Event::Message { request_response::Event::Message {
peer, peer,
@ -140,13 +138,20 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
error: OutboundProbeError::Response(e), error: OutboundProbeError::Response(e),
}, },
}; };
events.push_back(Event::OutboundProbe(event));
let mut actions = VecDeque::with_capacity(3);
actions.push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe(
event,
)));
if let Some(old) = self.handle_reported_status(response.result.clone().into()) { if let Some(old) = self.handle_reported_status(response.result.clone().into()) {
events.push_back(Event::StatusChanged { actions.push_back(NetworkBehaviourAction::GenerateEvent(
old, Event::StatusChanged {
new: self.nat_status.clone(), old,
}); new: self.nat_status.clone(),
},
));
} }
if let Ok(address) = response.result { if let Ok(address) = response.result {
@ -158,12 +163,14 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
.find_map(|r| (r.addr == address).then_some(r.score)) .find_map(|r| (r.addr == address).then_some(r.score))
.unwrap_or(AddressScore::Finite(0)); .unwrap_or(AddressScore::Finite(0));
if let AddressScore::Finite(finite_score) = score { if let AddressScore::Finite(finite_score) = score {
action = Some(NetworkBehaviourAction::ReportObservedAddr { actions.push_back(NetworkBehaviourAction::ReportObservedAddr {
address, address,
score: AddressScore::Finite(finite_score + 1), score: AddressScore::Finite(finite_score + 1),
}); });
} }
} }
actions
} }
request_response::Event::OutboundFailure { request_response::Event::OutboundFailure {
peer, peer,
@ -180,17 +187,18 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
.remove(&request_id) .remove(&request_id)
.unwrap_or_else(|| self.probe_id.next()); .unwrap_or_else(|| self.probe_id.next());
events.push_back(Event::OutboundProbe(OutboundProbeEvent::Error {
probe_id,
peer: Some(peer),
error: OutboundProbeError::OutboundRequest(error),
}));
self.schedule_probe.reset(Duration::ZERO); self.schedule_probe.reset(Duration::ZERO);
VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe(
OutboundProbeEvent::Error {
probe_id,
peer: Some(peer),
error: OutboundProbeError::OutboundRequest(error),
},
))])
} }
_ => {} _ => VecDeque::default(),
} }
(events, action)
} }
} }

View File

@ -98,9 +98,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
&mut self, &mut self,
_params: &mut impl PollParameters, _params: &mut impl PollParameters,
event: request_response::Event<DialRequest, DialResponse>, event: request_response::Event<DialRequest, DialResponse>,
) -> (VecDeque<Event>, Option<Action>) { ) -> VecDeque<Action> {
let mut events = VecDeque::new();
let mut action = None;
match event { match event {
request_response::Event::Message { request_response::Event::Message {
peer, peer,
@ -124,20 +122,25 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
.insert(peer, (probe_id, request_id, addrs.clone(), channel)); .insert(peer, (probe_id, request_id, addrs.clone(), channel));
self.throttled_clients.push((peer, Instant::now())); self.throttled_clients.push((peer, Instant::now()));
events.push_back(Event::InboundProbe(InboundProbeEvent::Request { VecDeque::from([
probe_id, NetworkBehaviourAction::GenerateEvent(Event::InboundProbe(
peer, InboundProbeEvent::Request {
addresses: addrs.clone(), probe_id,
})); peer,
addresses: addrs.clone(),
action = Some(NetworkBehaviourAction::Dial { },
opts: DialOpts::peer_id(peer) )),
.condition(PeerCondition::Always) NetworkBehaviourAction::Dial {
.override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0")) opts: DialOpts::peer_id(peer)
.addresses(addrs) .condition(PeerCondition::Always)
.build(), .override_dial_concurrency_factor(
handler: self.inner.new_handler(), NonZeroU8::new(1).expect("1 > 0"),
}); )
.addresses(addrs)
.build(),
handler: self.inner.new_handler(),
},
])
} }
Err((status_text, error)) => { Err((status_text, error)) => {
log::debug!( log::debug!(
@ -152,11 +155,13 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
}; };
let _ = self.inner.send_response(channel, response); let _ = self.inner.send_response(channel, response);
events.push_back(Event::InboundProbe(InboundProbeEvent::Error { VecDeque::from([NetworkBehaviourAction::GenerateEvent(
probe_id, Event::InboundProbe(InboundProbeEvent::Error {
peer, probe_id,
error: InboundProbeError::Response(error), peer,
})); error: InboundProbeError::Response(error),
}),
)])
} }
} }
} }
@ -178,15 +183,16 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
_ => self.probe_id.next(), _ => self.probe_id.next(),
}; };
events.push_back(Event::InboundProbe(InboundProbeEvent::Error { VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::InboundProbe(
probe_id, InboundProbeEvent::Error {
peer, probe_id,
error: InboundProbeError::InboundRequest(error), peer,
})); error: InboundProbeError::InboundRequest(error),
},
))])
} }
_ => {} _ => VecDeque::new(),
} }
(events, action)
} }
} }