swarm/src/lib: Prioritize Behaviour over Pool and Pool over Listeners (#2627)

Have the main event loop (`Swarm::poll_next_event`) prioritize:

1. Work on `NetworkBehaviour` over work on `Pool`, thus prioritizing
   local work over work coming from a remote.

2. Work on `Pool` over work on `ListenersStream`, thus prioritizing work
   on existing connections over upgrading new incoming connections.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
Max Inden
2022-05-05 20:15:24 +02:00
committed by GitHub
parent bbd2f8f009
commit afc5b8d8cd
2 changed files with 410 additions and 387 deletions

View File

@ -665,6 +665,348 @@ where
&mut self.behaviour
}
fn handle_pool_event(
&mut self,
event: PoolEvent<THandler<TBehaviour>, transport::Boxed<(PeerId, StreamMuxerBox)>>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
PoolEvent::ConnectionEstablished {
peer_id,
id,
endpoint,
other_established_connection_ids,
concurrent_dial_errors,
} => {
if self.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
self.banned_peer_connections.insert(id);
self.pool.disconnect(peer_id);
return Some(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();
log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
self.behaviour.inject_connection_established(
&peer_id,
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
);
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
}
PoolEvent::PendingOutboundConnectionError {
id: _,
error,
handler,
peer,
} => {
let error = error.into();
self.behaviour.inject_dial_failure(peer, handler, &error);
if let Some(peer) = peer {
log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
} else {
log::debug!("Connection attempt to unknown peer failed with {:?}", error);
}
return Some(SwarmEvent::OutgoingConnectionError {
peer_id: peer,
error,
});
}
PoolEvent::PendingInboundConnectionError {
id: _,
send_back_addr,
local_addr,
error,
handler,
} => {
log::debug!("Incoming connection failed: {:?}", error);
self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
return Some(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
}
PoolEvent::ConnectionClosed {
id,
connected,
error,
remaining_established_connection_ids,
handler,
..
} => {
if let Some(error) = error.as_ref() {
log::debug!(
"Connection closed with error {:?}: {:?}; Total (peer): {}.",
error,
connected,
remaining_established_connection_ids.len()
);
} else {
log::debug!(
"Connection closed: {:?}; Total (peer): {}.",
connected,
remaining_established_connection_ids.len()
);
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
let num_established =
u32::try_from(remaining_established_connection_ids.len()).unwrap();
let conn_was_reported = !self.banned_peer_connections.remove(&id);
if conn_was_reported {
let remaining_non_banned = remaining_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();
self.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler,
remaining_non_banned,
);
}
return Some(SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
cause: error,
num_established,
});
}
PoolEvent::ConnectionEvent { peer_id, id, event } => {
if self.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else {
self.behaviour.inject_event(peer_id, id, event);
}
}
PoolEvent::AddressChange {
peer_id,
id,
new_endpoint,
old_endpoint,
} => {
if !self.banned_peer_connections.contains(&id) {
self.behaviour.inject_address_change(
&peer_id,
&id,
&old_endpoint,
&new_endpoint,
);
}
}
}
None
}
fn handle_listeners_event(
&mut self,
event: ListenersEvent<transport::Boxed<(PeerId, StreamMuxerBox)>>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
ListenersEvent::Incoming {
listener_id: _,
upgrade,
local_addr,
send_back_addr,
} => {
let handler = self.behaviour.new_handler();
match self.pool.add_incoming(
upgrade,
handler,
IncomingInfo {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
},
) {
Ok(_connection_id) => {
return Some(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
}
Err((connection_limit, handler)) => {
self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
log::warn!("Incoming connection rejected: {:?}", connection_limit);
}
};
}
ListenersEvent::NewAddress {
listener_id,
listen_addr,
} => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
if !self.listened_addrs.contains(&listen_addr) {
self.listened_addrs.push(listen_addr.clone())
}
self.behaviour
.inject_new_listen_addr(listener_id, &listen_addr);
return Some(SwarmEvent::NewListenAddr {
listener_id,
address: listen_addr,
});
}
ListenersEvent::AddressExpired {
listener_id,
listen_addr,
} => {
log::debug!(
"Listener {:?}; Expired address {:?}.",
listener_id,
listen_addr
);
self.listened_addrs.retain(|a| a != &listen_addr);
self.behaviour
.inject_expired_listen_addr(listener_id, &listen_addr);
return Some(SwarmEvent::ExpiredListenAddr {
listener_id,
address: listen_addr,
});
}
ListenersEvent::Closed {
listener_id,
addresses,
reason,
} => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
self.behaviour.inject_expired_listen_addr(listener_id, addr);
}
self.behaviour.inject_listener_closed(
listener_id,
match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
},
);
return Some(SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
});
}
ListenersEvent::Error { listener_id, error } => {
self.behaviour.inject_listener_error(listener_id, &error);
return Some(SwarmEvent::ListenerError { listener_id, error });
}
}
None
}
fn handle_behaviour_event(
&mut self,
event: NetworkBehaviourAction<TBehaviour::OutEvent, TBehaviour::ConnectionHandler>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
NetworkBehaviourAction::GenerateEvent(event) => {
return Some(SwarmEvent::Behaviour(event))
}
NetworkBehaviourAction::Dial { opts, handler } => {
let peer_id = opts.get_peer_id();
if let Ok(()) = self.dial_with_handler(opts, handler) {
if let Some(peer_id) = peer_id {
return Some(SwarmEvent::Dialing(peer_id));
}
}
}
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
} => {
assert!(self.pending_event.is_none());
let handler = match handler {
NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
NotifyHandler::Any => {
let ids = self
.pool
.iter_established_connections_of_peer(&peer_id)
.collect();
PendingNotifyHandler::Any(ids)
}
};
self.pending_event = Some((peer_id, handler, event));
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
// Maps the given `observed_addr`, representing an address of the local
// node observed by a remote peer, onto the locally known listen addresses
// to yield one or more addresses of the local node that may be publicly
// reachable.
//
// I.e. self method incorporates the view of other peers into the listen
// addresses seen by the local node to account for possible IP and port
// mappings performed by intermediate network devices in an effort to
// obtain addresses for the local peer that are also reachable for peers
// other than the peer who reported the `observed_addr`.
//
// The translation is transport-specific. See [`Transport::address_translation`].
let translated_addresses = {
let transport = self.listeners.transport();
let mut addrs: Vec<_> = self
.listeners
.listen_addrs()
.filter_map(move |server| transport.address_translation(server, &address))
.collect();
// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};
for addr in translated_addresses {
self.add_external_address(addr, score);
}
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => match connection {
CloseConnection::One(connection_id) => {
if let Some(conn) = self.pool.get_established(connection_id) {
conn.start_close();
}
}
CloseConnection::All => {
self.pool.disconnect(peer_id);
}
},
}
None
}
/// Internal function used by everything event-related.
///
/// Polls the `Swarm` for the next event.
@ -676,412 +1018,92 @@ where
// across a `Deref`.
let this = &mut *self;
// This loop polls the components below in a prioritized order.
//
// 1. [`NetworkBehaviour`]
// 2. Connection [`Pool`]
// 3. [`ListenersStream`]
//
// (1) is polled before (2) to prioritize local work over work coming from a remote.
//
// (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections.
loop {
let mut listeners_not_ready = false;
let mut connections_not_ready = false;
// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut this.listeners), cx) {
Poll::Pending => {
listeners_not_ready = true;
}
Poll::Ready(ListenersEvent::Incoming {
listener_id: _,
upgrade,
local_addr,
send_back_addr,
}) => {
let handler = this.behaviour.new_handler();
match this.pool.add_incoming(
upgrade,
handler,
IncomingInfo {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
},
) {
Ok(_connection_id) => {
return Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
match this.pending_event.take() {
// Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous
// iteration to the connection handler(s).
Some((peer_id, handler, event)) => match handler {
PendingNotifyHandler::One(conn_id) => {
match this.pool.get_established(conn_id) {
Some(mut conn) => match notify_one(&mut conn, event, cx) {
None => continue,
Some(event) => {
this.pending_event = Some((peer_id, handler, event));
}
},
None => continue,
}
Err((connection_limit, handler)) => {
this.behaviour.inject_listen_failure(
&local_addr,
&send_back_addr,
handler,
);
log::warn!("Incoming connection rejected: {:?}", connection_limit);
}
PendingNotifyHandler::Any(ids) => {
match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) {
None => continue,
Some((event, ids)) => {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
}
}
}
},
// No pending event. Allow the [`NetworkBehaviour`] to make progress.
None => {
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &this.local_peer_id,
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs,
};
this.behaviour.poll(cx, &mut parameters)
};
}
Poll::Ready(ListenersEvent::NewAddress {
listener_id,
listen_addr,
}) => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
match behaviour_poll {
Poll::Pending => {}
Poll::Ready(behaviour_event) => {
if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event)
{
return Poll::Ready(swarm_event);
}
continue;
}
}
this.behaviour
.inject_new_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr {
listener_id,
address: listen_addr,
});
}
Poll::Ready(ListenersEvent::AddressExpired {
listener_id,
listen_addr,
}) => {
log::debug!(
"Listener {:?}; Expired address {:?}.",
listener_id,
listen_addr
);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour
.inject_expired_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr {
listener_id,
address: listen_addr,
});
}
Poll::Ready(ListenersEvent::Closed {
listener_id,
addresses,
reason,
}) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(listener_id, addr);
}
this.behaviour.inject_listener_closed(
listener_id,
match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
},
);
return Poll::Ready(SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
});
}
Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
this.behaviour.inject_listener_error(listener_id, &error);
return Poll::Ready(SwarmEvent::ListenerError { listener_id, error });
}
}
// Poll the known peers.
match this.pool.poll(cx) {
Poll::Pending => {
connections_not_ready = true;
}
Poll::Ready(PoolEvent::ConnectionEstablished {
id,
peer_id,
endpoint,
other_established_connection_ids,
concurrent_dial_errors,
}) => {
if this.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
this.banned_peer_connections.insert(id);
this.pool.disconnect(peer_id);
return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !this.banned_peer_connections.contains(conn_id))
.count();
log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
this.behaviour.inject_connection_established(
&peer_id,
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
);
return Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
}
Poll::Ready(PoolEvent::PendingOutboundConnectionError {
id: _,
error,
handler,
peer,
}) => {
let error = error.into();
this.behaviour.inject_dial_failure(peer, handler, &error);
if let Some(peer) = peer {
log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
} else {
log::debug!("Connection attempt to unknown peer failed with {:?}", error);
Poll::Pending => {}
Poll::Ready(pool_event) => {
if let Some(swarm_event) = this.handle_pool_event(pool_event) {
return Poll::Ready(swarm_event);
}
return Poll::Ready(SwarmEvent::OutgoingConnectionError {
peer_id: peer,
error,
});
}
Poll::Ready(PoolEvent::PendingInboundConnectionError {
id: _,
send_back_addr,
local_addr,
error,
handler,
}) => {
log::debug!("Incoming connection failed: {:?}", error);
this.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
return Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
}
Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
error,
remaining_established_connection_ids,
handler,
..
}) => {
if let Some(error) = error.as_ref() {
log::debug!(
"Connection closed with error {:?}: {:?}; Total (peer): {}.",
error,
connected,
remaining_established_connection_ids.len()
);
} else {
log::debug!(
"Connection closed: {:?}; Total (peer): {}.",
connected,
remaining_established_connection_ids.len()
);
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
let num_established =
u32::try_from(remaining_established_connection_ids.len()).unwrap();
let conn_was_reported = !this.banned_peer_connections.remove(&id);
if conn_was_reported {
let remaining_non_banned = remaining_established_connection_ids
.into_iter()
.filter(|conn_id| !this.banned_peer_connections.contains(conn_id))
.count();
this.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler,
remaining_non_banned,
);
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
cause: error,
num_established,
});
}
Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => {
if this.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else {
this.behaviour.inject_event(peer_id, id, event);
}
}
Poll::Ready(PoolEvent::AddressChange {
id,
peer_id,
new_endpoint,
old_endpoint,
}) => {
if !this.banned_peer_connections.contains(&id) {
this.behaviour.inject_address_change(
&peer_id,
&id,
&old_endpoint,
&new_endpoint,
);
}
continue;
}
};
// After the network had a chance to make progress, try to deliver
// the pending event emitted by the behaviour in the previous iteration
// to the connection handler(s). The pending event must be delivered
// before polling the behaviour again. If the targeted peer
// meanwhie disconnected, the event is discarded.
if let Some((peer_id, handler, event)) = this.pending_event.take() {
match handler {
PendingNotifyHandler::One(conn_id) => {
if let Some(mut conn) = this.pool.get_established(conn_id) {
if let Some(event) = notify_one(&mut conn, event, cx) {
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
}
}
PendingNotifyHandler::Any(ids) => {
if let Some((event, ids)) =
notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx)
{
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut this.listeners), cx) {
Poll::Pending => {}
Poll::Ready(listeners_event) => {
if let Some(swarm_event) = this.handle_listeners_event(listeners_event) {
return Poll::Ready(swarm_event);
}
continue;
}
}
debug_assert!(this.pending_event.is_none());
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &this.local_peer_id,
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs,
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending if listeners_not_ready && connections_not_ready => {
return Poll::Pending
}
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(SwarmEvent::Behaviour(event))
}
Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => {
let peer_id = opts.get_peer_id();
if let Ok(()) = this.dial_with_handler(opts, handler) {
if let Some(peer_id) = peer_id {
return Poll::Ready(SwarmEvent::Dialing(peer_id));
}
}
}
Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
}) => match handler {
NotifyHandler::One(connection) => {
if let Some(mut conn) = this.pool.get_established(connection) {
if let Some(event) = notify_one(&mut conn, event, cx) {
let handler = PendingNotifyHandler::One(connection);
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
}
}
NotifyHandler::Any => {
let ids = this
.pool
.iter_established_connections_of_peer(&peer_id)
.collect();
if let Some((event, ids)) =
notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx)
{
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
// Maps the given `observed_addr`, representing an address of the local
// node observed by a remote peer, onto the locally known listen addresses
// to yield one or more addresses of the local node that may be publicly
// reachable.
//
// I.e. this method incorporates the view of other peers into the listen
// addresses seen by the local node to account for possible IP and port
// mappings performed by intermediate network devices in an effort to
// obtain addresses for the local peer that are also reachable for peers
// other than the peer who reported the `observed_addr`.
//
// The translation is transport-specific. See [`Transport::address_translation`].
let translated_addresses = {
let transport = this.listeners.transport();
let mut addrs: Vec<_> = this
.listeners
.listen_addrs()
.filter_map(move |server| {
transport.address_translation(server, &address)
})
.collect();
// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};
for addr in translated_addresses {
this.add_external_address(addr, score);
}
}
Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
}) => match connection {
CloseConnection::One(connection_id) => {
if let Some(conn) = this.pool.get_established(connection_id) {
conn.start_close();
}
}
CloseConnection::All => {
this.pool.disconnect(peer_id);
}
},
}
return Poll::Pending;
}
}
}