Floodsub now produces FloodsubEvent (#823)

This commit is contained in:
Pierre Krieger
2019-01-07 13:42:47 +01:00
committed by GitHub
parent 64b388ddd1
commit b1f2ddd4b5
3 changed files with 42 additions and 8 deletions

View File

@ -90,12 +90,14 @@ fn main() {
} }
} }
impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::core::swarm::NetworkBehaviourEventProcess<libp2p::floodsub::FloodsubMessage> for MyBehaviour<TSubstream> { impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::core::swarm::NetworkBehaviourEventProcess<libp2p::floodsub::FloodsubEvent> for MyBehaviour<TSubstream> {
// Called when `floodsub` produces an event. // Called when `floodsub` produces an event.
fn inject_event(&mut self, message: libp2p::floodsub::FloodsubMessage) { fn inject_event(&mut self, message: libp2p::floodsub::FloodsubEvent) {
if let libp2p::floodsub::FloodsubEvent::Message(message) = message {
println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source); println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source);
} }
} }
}
// Create a Swarm to manage peers and events // Create a Swarm to manage peers and events
let mut swarm = { let mut swarm = {

View File

@ -35,7 +35,7 @@ use topic::{Topic, TopicHash};
/// about them. /// about them.
pub struct Floodsub<TSubstream> { pub struct Floodsub<TSubstream> {
/// Events that need to be yielded to the outside when polling. /// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubMessage>>, events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
/// Peer id of the local node. Used for the source of the messages that we publish. /// Peer id of the local node. Used for the source of the messages that we publish.
local_peer_id: PeerId, local_peer_id: PeerId,
@ -177,7 +177,7 @@ where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
{ {
type ProtocolsHandler = FloodsubHandler<TSubstream>; type ProtocolsHandler = FloodsubHandler<TSubstream>;
type OutEvent = FloodsubMessage; type OutEvent = FloodsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
FloodsubHandler::new() FloodsubHandler::new()
@ -219,13 +219,21 @@ where
match subscription.action { match subscription.action {
FloodsubSubscriptionAction::Subscribe => { FloodsubSubscriptionAction::Subscribe => {
if !remote_peer_topics.contains(&subscription.topic) { if !remote_peer_topics.contains(&subscription.topic) {
remote_peer_topics.push(subscription.topic); remote_peer_topics.push(subscription.topic.clone());
} }
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic,
}));
} }
FloodsubSubscriptionAction::Unsubscribe => { FloodsubSubscriptionAction::Unsubscribe => {
if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) { if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) {
remote_peer_topics.remove(pos); remote_peer_topics.remove(pos);
} }
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic,
}));
} }
} }
} }
@ -242,7 +250,8 @@ where
// Add the message to be dispatched to the user. // Add the message to be dispatched to the user.
if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) { if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) {
self.events.push_back(NetworkBehaviourAction::GenerateEvent(message.clone())); let event = FloodsubEvent::Message(message.clone());
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} }
// Propagate the message to everyone else who is subscribed to any of the topics. // Propagate the message to everyone else who is subscribed to any of the topics.
@ -290,3 +299,26 @@ where
Async::NotReady Async::NotReady
} }
} }
/// Event that can happen on the floodsub behaviour.
#[derive(Debug)]
pub enum FloodsubEvent {
/// A message has been received.
Message(FloodsubMessage),
/// A remote subscribed to a topic.
Subscribed {
/// Remote that has subscribed.
peer_id: PeerId,
/// The topic it has subscribed to.
topic: TopicHash,
},
/// A remote unsubscribed from a topic.
Unsubscribed {
/// Remote that has unsubscribed.
peer_id: PeerId,
/// The topic it has subscribed from.
topic: TopicHash,
},
}

View File

@ -41,6 +41,6 @@ mod layer;
mod rpc_proto; mod rpc_proto;
mod topic; mod topic;
pub use self::layer::Floodsub; pub use self::layer::{Floodsub, FloodsubEvent};
pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::protocol::{FloodsubMessage, FloodsubRpc};
pub use self::topic::{Topic, TopicBuilder, TopicHash}; pub use self::topic::{Topic, TopicBuilder, TopicHash};