diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index dffabe6f..523b1a94 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -218,6 +218,25 @@ where propagation_source: PeerId, event: FloodsubRpc, ) { + // Update connected peers topics + for subscription in event.subscriptions { + let mut remote_peer_topics = self.connected_peers + .get_mut(&propagation_source) + .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers ; qed"); + match subscription.action { + FloodsubSubscriptionAction::Subscribe => { + if !remote_peer_topics.contains(&subscription.topic) { + remote_peer_topics.push(subscription.topic); + } + } + FloodsubSubscriptionAction::Unsubscribe => { + if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) { + remote_peer_topics.remove(pos); + } + } + } + } + // List of messages we're going to propagate on the network. let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();