mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-20 13:26:34 +00:00
protocols/gossipsub: Add IWANT and memcache misses metrics (#2518)
Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -787,6 +787,9 @@ where
|
|||||||
"Message not in cache. Ignoring forwarding. Message Id: {}",
|
"Message not in cache. Ignoring forwarding. Message Id: {}",
|
||||||
msg_id
|
msg_id
|
||||||
);
|
);
|
||||||
|
if let Some(metrics) = self.metrics.as_mut() {
|
||||||
|
metrics.memcache_miss();
|
||||||
|
}
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -1214,8 +1217,7 @@ where
|
|||||||
|
|
||||||
trace!("Handling IHAVE for peer: {:?}", peer_id);
|
trace!("Handling IHAVE for peer: {:?}", peer_id);
|
||||||
|
|
||||||
// use a hashmap to avoid duplicates efficiently
|
let mut iwant_ids = HashSet::new();
|
||||||
let mut iwant_ids = HashMap::new();
|
|
||||||
|
|
||||||
for (topic, ids) in ihave_msgs {
|
for (topic, ids) in ihave_msgs {
|
||||||
// only process the message if we are subscribed
|
// only process the message if we are subscribed
|
||||||
@ -1236,7 +1238,12 @@ where
|
|||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
{
|
{
|
||||||
// have not seen this message and are not currently requesting it
|
// have not seen this message and are not currently requesting it
|
||||||
iwant_ids.insert(id, topic.clone());
|
if iwant_ids.insert(id) {
|
||||||
|
// Register the IWANT metric
|
||||||
|
if let Some(metrics) = self.metrics.as_mut() {
|
||||||
|
metrics.register_iwant(&topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1258,37 +1265,37 @@ where
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Ask in random order
|
// Ask in random order
|
||||||
let mut iwant_ids_vec: Vec<_> = iwant_ids.keys().collect();
|
let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
|
||||||
let mut rng = thread_rng();
|
let mut rng = thread_rng();
|
||||||
iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
|
iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
|
||||||
|
|
||||||
iwant_ids_vec.truncate(iask as usize);
|
iwant_ids_vec.truncate(iask as usize);
|
||||||
*iasked += iask;
|
*iasked += iask;
|
||||||
|
|
||||||
let mut message_ids = Vec::new();
|
for message_id in &iwant_ids_vec {
|
||||||
for message_id in iwant_ids_vec {
|
|
||||||
// Add all messages to the pending list
|
// Add all messages to the pending list
|
||||||
self.pending_iwant_msgs.insert(message_id.clone());
|
self.pending_iwant_msgs.insert(message_id.clone());
|
||||||
message_ids.push(message_id.clone());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
|
if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
|
||||||
gossip_promises.add_promise(
|
gossip_promises.add_promise(
|
||||||
*peer_id,
|
*peer_id,
|
||||||
&message_ids,
|
&iwant_ids_vec,
|
||||||
Instant::now() + self.config.iwant_followup_time(),
|
Instant::now() + self.config.iwant_followup_time(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
trace!(
|
trace!(
|
||||||
"IHAVE: Asking for the following messages from {}: {:?}",
|
"IHAVE: Asking for the following messages from {}: {:?}",
|
||||||
peer_id,
|
peer_id,
|
||||||
message_ids
|
iwant_ids_vec
|
||||||
);
|
);
|
||||||
|
|
||||||
Self::control_pool_add(
|
Self::control_pool_add(
|
||||||
&mut self.control_pool,
|
&mut self.control_pool,
|
||||||
*peer_id,
|
*peer_id,
|
||||||
GossipsubControlAction::IWant { message_ids },
|
GossipsubControlAction::IWant {
|
||||||
|
message_ids: iwant_ids_vec,
|
||||||
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
trace!("Completed IHAVE handling for peer: {:?}", peer_id);
|
trace!("Completed IHAVE handling for peer: {:?}", peer_id);
|
||||||
|
Reference in New Issue
Block a user