swarm/behaviour: Replace inject_* with on_event (#3011)

This commit is contained in:
João Oliveira
2022-11-17 09:28:40 +00:00
committed by GitHub
parent a714864885
commit 3df3c88f3d
38 changed files with 2482 additions and 1652 deletions

View File

@ -38,10 +38,12 @@ use rand::{seq::SliceRandom, thread_rng};
use libp2p_core::{
connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId,
multiaddr::Protocol::Ip6, Multiaddr, PeerId,
};
use libp2p_swarm::{
dial_opts::DialOpts, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
dial_opts::DialOpts,
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
};
use wasm_timer::Instant;
@ -3028,6 +3030,261 @@ where
Ok(rpc_list)
}
fn on_connection_established(
&mut self,
ConnectionEstablished {
peer_id,
connection_id,
endpoint,
other_established,
..
}: ConnectionEstablished,
) {
// Diverging from the go implementation we only want to consider a peer as outbound peer
// if its first connection is outbound.
if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) {
// The first connection is outbound and it is not a peer from peer exchange => mark
// it as outbound peer
self.outbound_peers.insert(peer_id);
}
// Add the IP to the peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
peer_score.add_ip(&peer_id, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer_id,
endpoint
)
}
}
// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
self.connected_peers
.entry(peer_id)
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![],
})
.connections
.push(connection_id);
if other_established == 0 {
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(&peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
} else {
debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
}
// Insert an empty set of the topics of this peer until known.
self.peer_topics.insert(peer_id, Default::default());
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(peer_id);
}
}
}
fn on_connection_closed(
&mut self,
ConnectionClosed {
peer_id,
connection_id,
endpoint,
remaining_established,
..
}: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
) {
// Remove IP from peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
peer_score.remove_ip(&peer_id, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer_id,
endpoint
)
}
}
if remaining_established != 0 {
// Remove the connection from the list
if let Some(connections) = self.connected_peers.get_mut(&peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == &connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);
// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(topics) = self.peer_topics.get(&peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(&peer_id) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connections.connections[0]),
});
break;
}
}
}
}
}
}
} else {
// remove from mesh, topic_peers, peer_topic and the fanout
debug!("Peer disconnected: {}", peer_id);
{
let topics = match self.peer_topics.get(&peer_id) {
Some(topics) => topics,
None => {
debug_assert!(
self.blacklisted_peers.contains(&peer_id),
"Disconnected node not in connected list"
);
return;
}
};
// remove peer from all mappings
for topic in topics {
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
if mesh_peers.remove(&peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic, Churn::Dc, 1);
m.set_mesh_peers(topic, mesh_peers.len());
}
};
}
// remove from topic_peers
if let Some(peer_list) = self.topic_peers.get_mut(topic) {
if !peer_list.remove(&peer_id) {
// debugging purposes
warn!(
"Disconnected node: {} not in topic_peers peer list",
peer_id
);
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic, peer_list.len())
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",
&peer_id, &topic
);
}
// remove from fanout
self.fanout
.get_mut(topic)
.map(|peers| peers.remove(&peer_id));
}
}
// Forget px and outbound status for this peer
self.px_peers.remove(&peer_id);
self.outbound_peers.remove(&peer_id);
// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(&peer_id);
// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(&peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
}
self.connected_peers.remove(&peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(&peer_id);
}
}
}
fn on_address_change(
&mut self,
AddressChange {
peer_id,
old: endpoint_old,
new: endpoint_new,
..
}: AddressChange,
) {
// Exchange IP in peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
peer_score.remove_ip(&peer_id, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
&peer_id,
endpoint_old
)
}
if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
peer_score.add_ip(&peer_id, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer_id,
endpoint_new
)
}
}
}
}
fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
@ -3058,260 +3315,12 @@ where
GossipsubHandler::new(protocol_config, self.config.idle_timeout())
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
// Diverging from the go implementation we only want to consider a peer as outbound peer
// if its first connection is outbound.
if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(peer_id) {
// The first connection is outbound and it is not a peer from peer exchange => mark
// it as outbound peer
self.outbound_peers.insert(*peer_id);
}
// Add the IP to the peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
peer_score.add_ip(peer_id, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer_id,
endpoint
)
}
}
// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
self.connected_peers
.entry(*peer_id)
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![],
})
.connections
.push(*connection_id);
if other_established == 0 {
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
} else {
debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
}
// Insert an empty set of the topics of this peer until known.
self.peer_topics.insert(*peer_id, Default::default());
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(*peer_id);
}
}
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
// Remove IP from peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
peer_score.remove_ip(peer_id, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer_id,
endpoint
)
}
}
if remaining_established != 0 {
// Remove the connection from the list
if let Some(connections) = self.connected_peers.get_mut(peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);
// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(topics) = self.peer_topics.get(peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(peer_id) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connections.connections[0]),
});
break;
}
}
}
}
}
}
} else {
// remove from mesh, topic_peers, peer_topic and the fanout
debug!("Peer disconnected: {}", peer_id);
{
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => topics,
None => {
debug_assert!(
self.blacklisted_peers.contains(peer_id),
"Disconnected node not in connected list"
);
return;
}
};
// remove peer from all mappings
for topic in topics {
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
if mesh_peers.remove(peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic, Churn::Dc, 1);
m.set_mesh_peers(topic, mesh_peers.len());
}
};
}
// remove from topic_peers
if let Some(peer_list) = self.topic_peers.get_mut(topic) {
if !peer_list.remove(peer_id) {
// debugging purposes
warn!(
"Disconnected node: {} not in topic_peers peer list",
peer_id
);
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic, peer_list.len())
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",
&peer_id, &topic
);
}
// remove from fanout
self.fanout
.get_mut(topic)
.map(|peers| peers.remove(peer_id));
}
}
// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);
// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(peer_id);
// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
}
self.connected_peers.remove(peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(peer_id);
}
}
}
fn inject_address_change(
&mut self,
peer: &PeerId,
_: &ConnectionId,
endpoint_old: &ConnectedPoint,
endpoint_new: &ConnectedPoint,
) {
// Exchange IP in peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
peer_score.remove_ip(peer, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint_old
)
}
if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
peer_score.add_ip(peer, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint_new
)
}
}
}
fn inject_event(
fn on_connection_handler_event(
&mut self,
propagation_source: PeerId,
_: ConnectionId,
handler_event: HandlerEvent,
_connection_id: ConnectionId,
handler_event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as
ConnectionHandler>::OutEvent,
) {
match handler_event {
HandlerEvent::PeerKind(kind) => {
@ -3333,7 +3342,8 @@ where
));
} else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
// Only change the value if the old value is Floodsub (the default set in
// inject_connection_established). All other PeerKind changes are ignored.
// `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
// All other PeerKind changes are ignored.
debug!(
"New peer type found: {} for peer: {}",
kind, propagation_source
@ -3458,6 +3468,27 @@ where
Poll::Pending
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.on_connection_established(connection_established)
}
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
FromSwarm::DialFailure(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {}
}
}
}
/// This is called when peers are added to any mesh. It checks if the peer existed