swarm/src/behaviour: Merge inject_* paired methods (#2445)

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Divma
2022-02-09 10:08:28 -05:00
committed by GitHub
parent 5a95a46cd3
commit dc8433e3fc
30 changed files with 583 additions and 481 deletions

View File

@ -3055,147 +3055,21 @@ where
)
}
fn inject_connected(&mut self, peer_id: &PeerId) {
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
} else {
debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
}
// Insert an empty set of the topics of this peer until known.
self.peer_topics.insert(*peer_id, Default::default());
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(*peer_id);
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
// remove from mesh, topic_peers, peer_topic and the fanout
debug!("Peer disconnected: {}", peer_id);
{
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => (topics),
None => {
debug_assert!(
self.blacklisted_peers.contains(peer_id),
"Disconnected node not in connected list"
);
return;
}
};
// remove peer from all mappings
for topic in topics {
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
if mesh_peers.remove(peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic, Churn::Dc, 1);
m.set_mesh_peers(topic, mesh_peers.len());
}
};
}
// remove from topic_peers
if let Some(peer_list) = self.topic_peers.get_mut(topic) {
if !peer_list.remove(peer_id) {
// debugging purposes
warn!(
"Disconnected node: {} not in topic_peers peer list",
peer_id
);
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic, peer_list.len())
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",
&peer_id, &topic
);
}
// remove from fanout
self.fanout
.get_mut(topic)
.map(|peers| peers.remove(peer_id));
}
}
// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);
// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(peer_id);
// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
}
self.connected_peers.remove(peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(peer_id);
}
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
// Check if the peer is an outbound peer
if let ConnectedPoint::Dialer { .. } = endpoint {
// Diverging from the go implementation we only want to consider a peer as outbound peer
// if its first connection is outbound. To check if this connection is the first we
// check if the peer isn't connected yet. This only works because the
// `inject_connection_established` event for the first connection gets called immediately
// before `inject_connected` gets called.
if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
// The first connection is outbound and it is not a peer from peer exchange => mark
// it as outbound peer
self.outbound_peers.insert(*peer_id);
}
// Diverging from the go implementation we only want to consider a peer as outbound peer
// if its first connection is outbound.
if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(peer_id) {
// The first connection is outbound and it is not a peer from peer exchange => mark
// it as outbound peer
self.outbound_peers.insert(*peer_id);
}
// Add the IP to the peer scoring system
@ -3224,6 +3098,48 @@ where
})
.connections
.push(*connection_id);
if other_established == 0 {
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
} else {
debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
}
// Insert an empty set of the topics of this peer until known.
self.peer_topics.insert(*peer_id, Default::default());
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(*peer_id);
}
}
}
fn inject_connection_closed(
@ -3232,6 +3148,7 @@ where
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
remaining_established: usize,
) {
// Remove IP from peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
@ -3246,35 +3163,114 @@ where
}
}
// Remove the connection from the list
// If there are no connections left, inject_disconnected will remove the mapping entirely.
if let Some(connections) = self.connected_peers.get_mut(peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);
if remaining_established != 0 {
// Remove the connection from the list
if let Some(connections) = self.connected_peers.get_mut(peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);
// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(topics) = self.peer_topics.get(peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(peer_id) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connections.connections[0]),
});
break;
// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(topics) = self.peer_topics.get(peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(peer_id) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connections.connections[0]),
});
break;
}
}
}
}
}
}
} else {
// remove from mesh, topic_peers, peer_topic and the fanout
debug!("Peer disconnected: {}", peer_id);
{
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => (topics),
None => {
debug_assert!(
self.blacklisted_peers.contains(peer_id),
"Disconnected node not in connected list"
);
return;
}
};
// remove peer from all mappings
for topic in topics {
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
if mesh_peers.remove(peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic, Churn::Dc, 1);
m.set_mesh_peers(topic, mesh_peers.len());
}
};
}
// remove from topic_peers
if let Some(peer_list) = self.topic_peers.get_mut(topic) {
if !peer_list.remove(peer_id) {
// debugging purposes
warn!(
"Disconnected node: {} not in topic_peers peer list",
peer_id
);
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic, peer_list.len())
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",
&peer_id, &topic
);
}
// remove from fanout
self.fanout
.get_mut(topic)
.map(|peers| peers.remove(peer_id));
}
}
// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);
// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(peer_id);
// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
}
self.connected_peers.remove(peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(peer_id);
}
}
}