Addressing #473 ... if I understood the ticket right, we want to pass… (#1395)

* Addressing #473 ... if I understood the ticket right, we want to pass through whatever the application provides as a topic identifier, leaving hashing (or not hashing) up to the application.

* Remove TopicDescriptor and use Topic newtype everywhere

* PR feedback

Use From<Topic> instead of Into<String>
Use impl Into<Topic> instead of Topic in public API

Co-authored-by: Peat Bakke <peat@peat.org>
This commit is contained in:
Rüdiger Klaehn
2020-01-27 15:23:01 +01:00
committed by Pierre Krieger
parent 2ef7c40cda
commit 3b50cbd1b8
6 changed files with 46 additions and 142 deletions

View File

@ -75,7 +75,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let transport = libp2p::build_development_transport(local_key)?; let transport = libp2p::build_development_transport(local_key)?;
// Create a Floodsub topic // Create a Floodsub topic
let floodsub_topic = floodsub::TopicBuilder::new("chat").build(); let floodsub_topic = floodsub::Topic::new("chat");
// We create a custom network behaviour that combines floodsub and mDNS. // We create a custom network behaviour that combines floodsub and mDNS.
// In the future, we want to improve libp2p to make this easier to do. // In the future, we want to improve libp2p to make this easier to do.
@ -150,7 +150,7 @@ fn main() -> Result<(), Box<dyn Error>> {
task::block_on(future::poll_fn(move |cx: &mut Context| { task::block_on(future::poll_fn(move |cx: &mut Context| {
loop { loop {
match stdin.try_poll_next_unpin(cx)? { match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), Poll::Ready(Some(line)) => swarm.floodsub.publish(floodsub_topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"), Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break Poll::Pending => break
} }

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::{Topic, TopicHash}; use crate::topic::Topic;
use cuckoofilter::CuckooFilter; use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet; use fnv::FnvHashSet;
use futures::prelude::*; use futures::prelude::*;
@ -52,7 +52,7 @@ pub struct Floodsub<TSubstream> {
/// List of peers the network is connected to, and the topics that they're subscribed to. /// List of peers the network is connected to, and the topics that they're subscribed to.
// TODO: filter out peers that don't support floodsub, so that we avoid hammering them with // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
// opened substreams // opened substreams
connected_peers: HashMap<PeerId, SmallVec<[TopicHash; 8]>>, connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
// List of topics we're subscribed to. Necessary to filter out messages that we receive // List of topics we're subscribed to. Necessary to filter out messages that we receive
// erroneously. // erroneously.
@ -85,13 +85,13 @@ impl<TSubstream> Floodsub<TSubstream> {
pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) { pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
// Send our topics to this node if we're already connected to it. // Send our topics to this node if we're already connected to it.
if self.connected_peers.contains_key(&peer_id) { if self.connected_peers.contains_key(&peer_id) {
for topic in self.subscribed_topics.iter() { for topic in self.subscribed_topics.iter().cloned() {
self.events.push_back(NetworkBehaviourAction::SendEvent { self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(), peer_id: peer_id.clone(),
event: FloodsubRpc { event: FloodsubRpc {
messages: Vec::new(), messages: Vec::new(),
subscriptions: vec![FloodsubSubscription { subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(), topic,
action: FloodsubSubscriptionAction::Subscribe, action: FloodsubSubscriptionAction::Subscribe,
}], }],
}, },
@ -116,7 +116,7 @@ impl<TSubstream> Floodsub<TSubstream> {
/// ///
/// Returns true if the subscription worked. Returns false if we were already subscribed. /// Returns true if the subscription worked. Returns false if we were already subscribed.
pub fn subscribe(&mut self, topic: Topic) -> bool { pub fn subscribe(&mut self, topic: Topic) -> bool {
if self.subscribed_topics.iter().any(|t| t.hash() == topic.hash()) { if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
return false; return false;
} }
@ -126,7 +126,7 @@ impl<TSubstream> Floodsub<TSubstream> {
event: FloodsubRpc { event: FloodsubRpc {
messages: Vec::new(), messages: Vec::new(),
subscriptions: vec![FloodsubSubscription { subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(), topic: topic.clone(),
action: FloodsubSubscriptionAction::Subscribe, action: FloodsubSubscriptionAction::Subscribe,
}], }],
}, },
@ -139,12 +139,11 @@ impl<TSubstream> Floodsub<TSubstream> {
/// Unsubscribes from a topic. /// Unsubscribes from a topic.
/// ///
/// Note that this only requires a `TopicHash` and not a full `Topic`. /// Note that this only requires the topic name.
/// ///
/// Returns true if we were subscribed to this topic. /// Returns true if we were subscribed to this topic.
pub fn unsubscribe(&mut self, topic: impl AsRef<TopicHash>) -> bool { pub fn unsubscribe(&mut self, topic: Topic) -> bool {
let topic = topic.as_ref(); let pos = match self.subscribed_topics.iter().position(|t| *t == topic) {
let pos = match self.subscribed_topics.iter().position(|t| t.hash() == topic) {
Some(pos) => pos, Some(pos) => pos,
None => return false None => return false
}; };
@ -168,12 +167,12 @@ impl<TSubstream> Floodsub<TSubstream> {
} }
/// Publishes a message to the network, if we're subscribed to the topic only. /// Publishes a message to the network, if we're subscribed to the topic only.
pub fn publish(&mut self, topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) { pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
self.publish_many(iter::once(topic), data) self.publish_many(iter::once(topic), data)
} }
/// Publishes a message to the network, even if we're not subscribed to the topic. /// Publishes a message to the network, even if we're not subscribed to the topic.
pub fn publish_any(&mut self, topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) { pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
self.publish_many_any(iter::once(topic), data) self.publish_many_any(iter::once(topic), data)
} }
@ -181,16 +180,16 @@ impl<TSubstream> Floodsub<TSubstream> {
/// ///
/// ///
/// > **Note**: Doesn't do anything if we're not subscribed to any of the topics. /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) { pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
self.publish_many_inner(topic, data, true) self.publish_many_inner(topic, data, true)
} }
/// Publishes a message with multiple topics to the network, even if we're not subscribed to any of the topics. /// Publishes a message with multiple topics to the network, even if we're not subscribed to any of the topics.
pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) { pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
self.publish_many_inner(topic, data, false) self.publish_many_inner(topic, data, false)
} }
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) { fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
let message = FloodsubMessage { let message = FloodsubMessage {
source: self.local_peer_id.clone(), source: self.local_peer_id.clone(),
data: data.into(), data: data.into(),
@ -198,10 +197,10 @@ impl<TSubstream> Floodsub<TSubstream> {
// with packets with the predetermined sequence numbers and absorb our legitimate // with packets with the predetermined sequence numbers and absorb our legitimate
// messages. We therefore use a random number. // messages. We therefore use a random number.
sequence_number: rand::random::<[u8; 20]>().to_vec(), sequence_number: rand::random::<[u8; 20]>().to_vec(),
topics: topic.into_iter().map(|t| t.into().clone()).collect(), topics: topic.into_iter().map(Into::into).collect(),
}; };
let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)); let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
if self_subscribed { if self_subscribed {
self.received.add(&message); self.received.add(&message);
} }
@ -246,13 +245,13 @@ where
fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
// We need to send our subscriptions to the newly-connected node. // We need to send our subscriptions to the newly-connected node.
if self.target_peers.contains(&id) { if self.target_peers.contains(&id) {
for topic in self.subscribed_topics.iter() { for topic in self.subscribed_topics.iter().cloned() {
self.events.push_back(NetworkBehaviourAction::SendEvent { self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: id.clone(), peer_id: id.clone(),
event: FloodsubRpc { event: FloodsubRpc {
messages: Vec::new(), messages: Vec::new(),
subscriptions: vec![FloodsubSubscription { subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(), topic,
action: FloodsubSubscriptionAction::Subscribe, action: FloodsubSubscriptionAction::Subscribe,
}], }],
}, },
@ -323,7 +322,7 @@ where
} }
// Add the message to be dispatched to the user. // Add the message to be dispatched to the user.
if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) { if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
let event = FloodsubEvent::Message(message.clone()); let event = FloodsubEvent::Message(message.clone());
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} }
@ -408,7 +407,7 @@ pub enum FloodsubEvent {
/// Remote that has subscribed. /// Remote that has subscribed.
peer_id: PeerId, peer_id: PeerId,
/// The topic it has subscribed to. /// The topic it has subscribed to.
topic: TopicHash, topic: Topic,
}, },
/// A remote unsubscribed from a topic. /// A remote unsubscribed from a topic.
@ -416,6 +415,6 @@ pub enum FloodsubEvent {
/// Remote that has unsubscribed. /// Remote that has unsubscribed.
peer_id: PeerId, peer_id: PeerId,
/// The topic it has subscribed from. /// The topic it has subscribed from.
topic: TopicHash, topic: Topic,
}, },
} }

View File

@ -32,4 +32,4 @@ mod rpc_proto {
pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::layer::{Floodsub, FloodsubEvent};
pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::protocol::{FloodsubMessage, FloodsubRpc};
pub use self::topic::{Topic, TopicBuilder, TopicHash}; pub use self::topic::Topic;

View File

@ -19,11 +19,11 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::rpc_proto; use crate::rpc_proto;
use crate::topic::TopicHash; use crate::topic::Topic;
use futures::prelude::*;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade};
use prost::Message; use prost::Message;
use std::{error, fmt, io, iter, pin::Pin}; use std::{error, fmt, io, iter, pin::Pin};
use futures::{Future, io::{AsyncRead, AsyncWrite}};
/// Implementation of `ConnectionUpgrade` for the floodsub protocol. /// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
@ -68,7 +68,7 @@ where
sequence_number: publish.seqno.unwrap_or_default(), sequence_number: publish.seqno.unwrap_or_default(),
topics: publish.topic_ids topics: publish.topic_ids
.into_iter() .into_iter()
.map(TopicHash::from_raw) .map(Topic::new)
.collect(), .collect(),
}); });
} }
@ -83,7 +83,7 @@ where
} else { } else {
FloodsubSubscriptionAction::Unsubscribe FloodsubSubscriptionAction::Unsubscribe
}, },
topic: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), topic: Topic::new(sub.topic_id.unwrap_or_default()),
}) })
.collect(), .collect(),
}) })
@ -184,7 +184,7 @@ impl FloodsubRpc {
seqno: Some(msg.sequence_number), seqno: Some(msg.sequence_number),
topic_ids: msg.topics topic_ids: msg.topics
.into_iter() .into_iter()
.map(TopicHash::into_string) .map(|topic| topic.into())
.collect() .collect()
} }
}) })
@ -194,7 +194,7 @@ impl FloodsubRpc {
.map(|topic| { .map(|topic| {
rpc_proto::rpc::SubOpts { rpc_proto::rpc::SubOpts {
subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe), subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe),
topic_id: Some(topic.topic.into_string()) topic_id: Some(topic.topic.into())
} }
}) })
.collect() .collect()
@ -221,7 +221,7 @@ pub struct FloodsubMessage {
/// List of topics this message belongs to. /// List of topics this message belongs to.
/// ///
/// Each message can belong to multiple topics at once. /// Each message can belong to multiple topics at once.
pub topics: Vec<TopicHash>, pub topics: Vec<Topic>,
} }
/// A subscription received by the floodsub system. /// A subscription received by the floodsub system.
@ -230,7 +230,7 @@ pub struct FloodsubSubscription {
/// Action to perform. /// Action to perform.
pub action: FloodsubSubscriptionAction, pub action: FloodsubSubscriptionAction,
/// The topic from which to subscribe or unsubscribe. /// The topic from which to subscribe or unsubscribe.
pub topic: TopicHash, pub topic: Topic,
} }
/// Action that a subscription wants to perform. /// Action that a subscription wants to perform.

View File

@ -18,32 +18,3 @@ message Message {
optional bytes seqno = 3; optional bytes seqno = 3;
repeated string topic_ids = 4; repeated string topic_ids = 4;
} }
// topicID = hash(topicDescriptor); (not the topic.name)
message TopicDescriptor {
optional string name = 1;
optional AuthOpts auth = 2;
optional EncOpts enc = 3;
message AuthOpts {
optional AuthMode mode = 1;
repeated bytes keys = 2; // root keys to trust
enum AuthMode {
NONE = 0; // no authentication, anyone can publish
KEY = 1; // only messages signed by keys in the topic descriptor are accepted
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
message EncOpts {
optional EncMode mode = 1;
repeated bytes key_hashes = 2; // the hashes of the shared keys used (salted)
enum EncMode {
NONE = 0; // no encryption, anyone can read
SHAREDKEY = 1; // messages are encrypted with shared key
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
}

View File

@ -18,93 +18,27 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use bs58;
use crate::rpc_proto;
use prost::Message;
/// Represents the hash of a topic.
///
/// Instead of a using the topic as a whole, the API of floodsub uses a hash of the topic. You only
/// have to build the hash once, then use it everywhere.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicHash {
hash: String,
}
impl TopicHash {
/// Builds a new `TopicHash` from the given hash.
pub fn from_raw(hash: String) -> TopicHash {
TopicHash { hash }
}
pub fn into_string(self) -> String {
self.hash
}
}
/// Built topic. /// Built topic.
#[derive(Debug, Clone)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Topic { pub struct Topic(String);
descriptor: rpc_proto::TopicDescriptor,
hash: TopicHash,
}
impl Topic { impl Topic {
/// Returns the hash of the topic. /// Returns the id of the topic.
pub fn hash(&self) -> &TopicHash { #[inline]
&self.hash pub fn id(&self) -> &str {
&self.0
} }
}
impl AsRef<TopicHash> for Topic { pub fn new<S>(name: S) -> Topic
fn as_ref(&self) -> &TopicHash {
&self.hash
}
}
impl From<Topic> for TopicHash {
fn from(topic: Topic) -> TopicHash {
topic.hash
}
}
impl<'a> From<&'a Topic> for TopicHash {
fn from(topic: &'a Topic) -> TopicHash {
topic.hash.clone()
}
}
/// Builder for a `TopicHash`.
#[derive(Debug, Clone)]
pub struct TopicBuilder {
builder: rpc_proto::TopicDescriptor,
}
impl TopicBuilder {
pub fn new<S>(name: S) -> TopicBuilder
where where
S: Into<String>, S: Into<String>,
{ {
TopicBuilder { Topic(name.into())
builder: rpc_proto::TopicDescriptor { }
name: Some(name.into()), }
auth: None,
enc: None impl From<Topic> for String {
} fn from(topic: Topic) -> String {
} topic.0
}
/// Turns the builder into an actual `Topic`.
pub fn build(self) -> Topic {
let mut buf = Vec::with_capacity(self.builder.encoded_len());
self.builder.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
// TODO: https://github.com/libp2p/rust-libp2p/issues/473
let hash = TopicHash {
hash: bs58::encode(&buf).into_string(),
};
Topic {
descriptor: self.builder,
hash,
}
} }
} }