diff --git a/examples/chat.rs b/examples/chat.rs index 34e34751..8ce44858 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -90,10 +90,12 @@ fn main() { } } - impl libp2p::core::swarm::NetworkBehaviourEventProcess for MyBehaviour { + impl libp2p::core::swarm::NetworkBehaviourEventProcess for MyBehaviour { // Called when `floodsub` produces an event. - fn inject_event(&mut self, message: libp2p::floodsub::FloodsubMessage) { - println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source); + fn inject_event(&mut self, message: libp2p::floodsub::FloodsubEvent) { + if let libp2p::floodsub::FloodsubEvent::Message(message) = message { + println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source); + } } } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 664be25a..7a4a1d2e 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -35,7 +35,7 @@ use topic::{Topic, TopicHash}; /// about them. pub struct Floodsub { /// Events that need to be yielded to the outside when polling. - events: VecDeque>, + events: VecDeque>, /// Peer id of the local node. Used for the source of the messages that we publish. local_peer_id: PeerId, @@ -177,7 +177,7 @@ where TSubstream: AsyncRead + AsyncWrite, { type ProtocolsHandler = FloodsubHandler; - type OutEvent = FloodsubMessage; + type OutEvent = FloodsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { FloodsubHandler::new() @@ -219,13 +219,21 @@ where match subscription.action { FloodsubSubscriptionAction::Subscribe => { if !remote_peer_topics.contains(&subscription.topic) { - remote_peer_topics.push(subscription.topic); + remote_peer_topics.push(subscription.topic.clone()); } + self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed { + peer_id: propagation_source.clone(), + topic: subscription.topic, + })); } FloodsubSubscriptionAction::Unsubscribe => { if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) { remote_peer_topics.remove(pos); } + self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed { + peer_id: propagation_source.clone(), + topic: subscription.topic, + })); } } } @@ -242,7 +250,8 @@ where // Add the message to be dispatched to the user. if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) { - self.events.push_back(NetworkBehaviourAction::GenerateEvent(message.clone())); + let event = FloodsubEvent::Message(message.clone()); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } // Propagate the message to everyone else who is subscribed to any of the topics. @@ -290,3 +299,26 @@ where Async::NotReady } } + +/// Event that can happen on the floodsub behaviour. +#[derive(Debug)] +pub enum FloodsubEvent { + /// A message has been received. + Message(FloodsubMessage), + + /// A remote subscribed to a topic. + Subscribed { + /// Remote that has subscribed. + peer_id: PeerId, + /// The topic it has subscribed to. + topic: TopicHash, + }, + + /// A remote unsubscribed from a topic. + Unsubscribed { + /// Remote that has unsubscribed. + peer_id: PeerId, + /// The topic it has subscribed from. + topic: TopicHash, + }, +} diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index 4405ccaf..07325ad2 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -41,6 +41,6 @@ mod layer; mod rpc_proto; mod topic; -pub use self::layer::Floodsub; +pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::topic::{Topic, TopicBuilder, TopicHash};