diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index bf47938a..cdc64300 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -6,16 +6,13 @@ license = "MIT" [dependencies] bs58 = "0.2.0" -byteorder = "1.2.1" bytes = "0.4" +cuckoofilter = "0.3.2" fnv = "1.0" futures = "0.1" libp2p-core = { path = "../../core" } -log = "0.4.1" -multiaddr = { path = "../../misc/multiaddr" } -parking_lot = "0.6" protobuf = "2.0.2" -smallvec = "0.6" +smallvec = "0.6.5" tokio-codec = "0.1" tokio-io = "0.1" unsigned-varint = { version = "0.2.1", features = ["codec"] } diff --git a/protocols/floodsub/src/handler.rs b/protocols/floodsub/src/handler.rs new file mode 100644 index 00000000..0b5325ca --- /dev/null +++ b/protocols/floodsub/src/handler.rs @@ -0,0 +1,238 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use libp2p_core::nodes::{NodeHandlerEndpoint, ProtocolsHandler, ProtocolsHandlerEvent}; +use libp2p_core::ConnectionUpgrade; +use protocol::{FloodsubCodec, FloodsubConfig, FloodsubRpc}; +use smallvec::SmallVec; +use std::{fmt, io}; +use tokio_codec::Framed; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Protocol handler that handles communications with the remote for the fileshare protocol. +/// +/// The handler will automatically open a substream with the remote for each request we make. +/// +/// It also handles requests made by the remote. +pub struct FloodsubHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Configuration for the Kademlia protocol. + config: FloodsubConfig, + + /// If true, we are trying to shut down the existing Kademlia substream and should refuse any + /// incoming connection. + shutting_down: bool, + + /// The active substreams. + // TODO: add a limit to the number of allowed substreams + substreams: Vec>, + + /// Queue of values that we want to send to the remote. + send_queue: SmallVec<[FloodsubRpc; 16]>, +} + +/// State of an active substream, opened either by us or by the remote. +enum SubstreamState +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Waiting for a message from the remote. + WaitingInput(Framed), + /// Waiting to send a message to the remote. + PendingSend(Framed, FloodsubRpc), + /// Waiting to send a message to the remote. + /// Waiting to flush the substream so that the data arrives to the remote. + PendingFlush(Framed), + /// The substream is being closed. + Closing(Framed), +} + +impl SubstreamState +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Consumes this state and produces the substream, if relevant. + fn into_substream(self) -> Option> { + match self { + SubstreamState::WaitingInput(substream) => Some(substream), + SubstreamState::PendingSend(substream, _) => Some(substream), + SubstreamState::PendingFlush(substream) => Some(substream), + SubstreamState::Closing(substream) => Some(substream), + } + } +} + +impl FloodsubHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Builds a new `FloodsubHandler`. + pub fn new() -> Self { + FloodsubHandler { + config: FloodsubConfig::new(), + shutting_down: false, + substreams: Vec::new(), + send_queue: SmallVec::new(), + } + } +} + +impl ProtocolsHandler for FloodsubHandler +where + TSubstream: AsyncRead + AsyncWrite + 'static, +{ + type InEvent = FloodsubRpc; + type OutEvent = FloodsubRpc; + type Substream = TSubstream; + type Protocol = FloodsubConfig; + type OutboundOpenInfo = FloodsubRpc; + + #[inline] + fn listen_protocol(&self) -> Self::Protocol { + self.config.clone() + } + + fn inject_fully_negotiated( + &mut self, + protocol: >::Output, + endpoint: NodeHandlerEndpoint, + ) { + if self.shutting_down { + return; + } + + match endpoint { + NodeHandlerEndpoint::Dialer(message) => { + self.substreams + .push(SubstreamState::PendingSend(protocol, message)); + } + NodeHandlerEndpoint::Listener => { + self.substreams.push(SubstreamState::WaitingInput(protocol)); + } + } + } + + #[inline] + fn inject_event(&mut self, message: FloodsubRpc) { + self.send_queue.push(message); + } + + #[inline] + fn inject_inbound_closed(&mut self) {} + + #[inline] + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {} + + #[inline] + fn shutdown(&mut self) { + self.shutting_down = true; + for n in (0..self.substreams.len()).rev() { + let mut substream = self.substreams.swap_remove(n); + if let Some(substream) = substream.into_substream() { + self.substreams.push(SubstreamState::Closing(substream)); + } + } + } + + fn poll( + &mut self, + ) -> Poll< + Option>, + io::Error, + > { + if !self.send_queue.is_empty() { + let message = self.send_queue.remove(0); + return Ok(Async::Ready(Some( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + info: message, + upgrade: self.config.clone(), + }, + ))); + } + + for n in (0..self.substreams.len()).rev() { + let mut substream = self.substreams.swap_remove(n); + loop { + substream = match substream { + SubstreamState::WaitingInput(mut substream) => match substream.poll() { + Ok(Async::Ready(Some(message))) => { + self.substreams + .push(SubstreamState::WaitingInput(substream)); + return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(message)))); + } + Ok(Async::Ready(None)) => SubstreamState::Closing(substream), + Ok(Async::NotReady) => { + self.substreams + .push(SubstreamState::WaitingInput(substream)); + return Ok(Async::NotReady); + } + Err(_) => SubstreamState::Closing(substream), + }, + SubstreamState::PendingSend(mut substream, message) => { + match substream.start_send(message)? { + AsyncSink::Ready => SubstreamState::PendingFlush(substream), + AsyncSink::NotReady(message) => { + self.substreams + .push(SubstreamState::PendingSend(substream, message)); + return Ok(Async::NotReady); + } + } + } + SubstreamState::PendingFlush(mut substream) => { + match substream.poll_complete()? { + Async::Ready(()) => SubstreamState::Closing(substream), + Async::NotReady => { + self.substreams + .push(SubstreamState::PendingFlush(substream)); + return Ok(Async::NotReady); + } + } + } + SubstreamState::Closing(mut substream) => match substream.close() { + Ok(Async::Ready(())) => break, + Ok(Async::NotReady) => { + self.substreams.push(SubstreamState::Closing(substream)); + return Ok(Async::NotReady); + } + Err(_) => return Ok(Async::Ready(None)), + }, + } + } + } + + Ok(Async::NotReady) + } +} + +impl fmt::Debug for FloodsubHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_struct("FloodsubHandler") + .field("shutting_down", &self.shutting_down) + .field("substreams", &self.substreams.len()) + .field("send_queue", &self.send_queue.len()) + .finish() + } +} diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs new file mode 100644 index 00000000..fe7725fa --- /dev/null +++ b/protocols/floodsub/src/layer.rs @@ -0,0 +1,277 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use cuckoofilter::CuckooFilter; +use futures::prelude::*; +use handler::FloodsubHandler; +use libp2p_core::nodes::{ConnectedPoint, NetworkBehavior, NetworkBehaviorAction}; +use libp2p_core::{nodes::protocols_handler::ProtocolsHandler, PeerId}; +use protocol::{FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; +use smallvec::SmallVec; +use std::{collections::VecDeque, iter, marker::PhantomData}; +use std::collections::hash_map::{DefaultHasher, HashMap}; +use tokio_io::{AsyncRead, AsyncWrite}; +use topic::{Topic, TopicHash}; + +/// Network behaviour that automatically identifies nodes periodically, and returns information +/// about them. +pub struct FloodsubBehaviour { + /// Events that need to be produced outside when polling. + events: VecDeque>, + + /// Peer id of the local node. Used for the source of the messages that we publish. + local_peer_id: PeerId, + + /// List of peers the network is connected to, and the topics that they're subscribed to. + // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with + // opened substream + connected_peers: HashMap>, + + // List of topics we're subscribed to. Necessary in order to filter out messages that we + // erroneously receive. + subscribed_topics: SmallVec<[Topic; 16]>, + + // Sequence number for the messages we send. + seq_no: usize, + + // We keep track of the messages we received (in the format `hash(source ID, seq_no)`) so that + // we don't dispatch the same message twice if we receive it twice on the network. + received: CuckooFilter, + + /// Marker to pin the generics. + marker: PhantomData, +} + +impl FloodsubBehaviour { + /// Creates a `FloodsubBehaviour`. + pub fn new(local_peer_id: PeerId) -> Self { + FloodsubBehaviour { + events: VecDeque::new(), + local_peer_id, + connected_peers: HashMap::new(), + subscribed_topics: SmallVec::new(), + seq_no: 0, + received: CuckooFilter::new(), + marker: PhantomData, + } + } +} + +impl FloodsubBehaviour { + /// Subscribes to a topic. + /// + /// Returns true if the subscription worked. Returns false if we were already subscribed. + pub fn subscribe(&mut self, topic: Topic) -> bool { + if self.subscribed_topics.iter().any(|t| t.hash() == topic.hash()) { + return false; + } + + for peer in self.connected_peers.keys() { + self.events.push_back(NetworkBehaviorAction::SendEvent { + peer_id: peer.clone(), + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.hash().clone(), + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); + } + + self.subscribed_topics.push(topic); + true + } + + /// Unsubscribes from a topic. + /// + /// Returns true if we were subscribed to this topic. + pub fn unsubscribe(&mut self, topic: impl AsRef) -> bool { + let topic = topic.as_ref(); + let pos = match self.subscribed_topics.iter().position(|t| t.hash() == topic) { + Some(pos) => pos, + None => return false + }; + + self.subscribed_topics.remove(pos); + + for peer in self.connected_peers.keys() { + self.events.push_back(NetworkBehaviorAction::SendEvent { + peer_id: peer.clone(), + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.clone(), + action: FloodsubSubscriptionAction::Unsubscribe, + }], + }, + }); + } + + true + } + + /// Publishes a message to the network. + /// + /// > **Note**: Doesn't do anything if we're not subscribed to the topic. + pub fn publish(&mut self, topic: impl Into, data: impl Into>) { + self.publish_many(iter::once(topic), data) + } + + /// Publishes a message to the network that has multiple topics. + /// + /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics. + pub fn publish_many(&mut self, topic: impl IntoIterator>, data: impl Into>) { + let message = FloodsubMessage { + source: self.local_peer_id.clone(), + data: data.into(), + sequence_number: self.next_sequence_number(), + topics: topic.into_iter().map(|t| t.into().clone()).collect(), + }; + + // Don't publish the message if we're not subscribed ourselves to any of the topics. + if !self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) { + return; + } + + self.received.add(&message); + + // Send to peers we know are subscribed to the topic. + for (peer_id, sub_topic) in self.connected_peers.iter() { + if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) { + continue; + } + + self.events.push_back(NetworkBehaviorAction::SendEvent { + peer_id: peer_id.clone(), + event: FloodsubRpc { + subscriptions: Vec::new(), + messages: vec![message.clone()], + } + }); + } + } + + /// Builds a unique sequence number to put in a `FloodsubMessage`. + fn next_sequence_number(&mut self) -> Vec { + let data = self.seq_no.to_string(); + self.seq_no += 1; + data.into() + } +} + +impl NetworkBehavior for FloodsubBehaviour +where + TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, +{ + type ProtocolsHandler = FloodsubHandler; + type OutEvent = FloodsubMessage; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + FloodsubHandler::new() + } + + fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { + // We need to send our subscriptions to the newly-connected node. + for topic in self.subscribed_topics.iter() { + self.events.push_back(NetworkBehaviorAction::SendEvent { + peer_id: id.clone(), + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.hash().clone(), + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); + } + + self.connected_peers.insert(id.clone(), SmallVec::new()); + } + + fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) { + let was_in = self.connected_peers.remove(id); + debug_assert!(was_in.is_some()); + } + + fn inject_node_event( + &mut self, + propagation_source: PeerId, + event: FloodsubRpc, + ) { + // List of messages we're going to propagate on the network. + let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new(); + + for message in event.messages { + // Use `self.received` to skip the messages that we have already received in the past. + // Note that this can false positive. + if !self.received.test_and_add(&message) { + continue; + } + + // Add the message to be dispatched to the user. + if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) { + self.events.push_back(NetworkBehaviorAction::GenerateEvent(message.clone())); + } + + // Propagate the message to everyone else who is subscribed to any of the topics. + for (peer_id, sub_topics) in self.connected_peers.iter() { + if peer_id == &propagation_source { + continue; + } + + if !sub_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) { + continue; + } + + if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) { + rpcs_to_dispatch[pos].1.messages.push(message.clone()); + } else { + rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc { + subscriptions: Vec::new(), + messages: vec![message.clone()], + })); + } + } + } + + for (peer_id, rpc) in rpcs_to_dispatch { + self.events.push_back(NetworkBehaviorAction::SendEvent { + peer_id, + event: rpc, + }); + } + } + + fn poll( + &mut self, + ) -> Async< + NetworkBehaviorAction< + ::InEvent, + Self::OutEvent, + >, + > { + if let Some(event) = self.events.pop_front() { + return Async::Ready(event); + } + + Async::NotReady + } +} diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index 48e04ea3..4b58d717 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -19,643 +19,24 @@ // DEALINGS IN THE SOFTWARE. extern crate bs58; -extern crate byteorder; extern crate bytes; +extern crate cuckoofilter; extern crate fnv; extern crate futures; extern crate libp2p_core; -#[macro_use] -extern crate log; -extern crate multiaddr; -extern crate parking_lot; extern crate protobuf; extern crate smallvec; extern crate tokio_codec; extern crate tokio_io; extern crate unsigned_varint; +mod handler; +mod layer; +mod protocol; mod rpc_proto; mod topic; +pub use self::handler::FloodsubHandler; +pub use self::layer::FloodsubBehaviour; +pub use self::protocol::*; // TODO: exact reexports pub use self::topic::{Topic, TopicBuilder, TopicHash}; - -use byteorder::{BigEndian, WriteBytesExt}; -use bytes::{Bytes, BytesMut}; -use fnv::{FnvHashMap, FnvHashSet, FnvHasher}; -use futures::sync::mpsc; -use futures::{future, Future, Poll, Sink, Stream}; -use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId}; -use log::Level; -use multiaddr::{Protocol, Multiaddr}; -use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; -use protobuf::Message as ProtobufMessage; -use smallvec::SmallVec; -use std::fmt; -use std::hash::{Hash, Hasher}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::iter; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use tokio_codec::Framed; -use tokio_io::{AsyncRead, AsyncWrite}; -use unsigned_varint::codec; - -/// Implementation of the `ConnectionUpgrade` for the floodsub protocol. -#[derive(Debug, Clone)] -pub struct FloodSubUpgrade { - inner: Arc, -} - -impl FloodSubUpgrade { - /// Builds a new `FloodSubUpgrade`. Also returns a `FloodSubReceiver` that will stream incoming - /// messages for the floodsub system. - pub fn new(my_id: PeerId) -> (FloodSubUpgrade, FloodSubReceiver) { - let (output_tx, output_rx) = mpsc::unbounded(); - - let inner = Arc::new(Inner { - peer_id: my_id.into_bytes(), - output_tx: output_tx, - remote_connections: RwLock::new(FnvHashMap::default()), - subscribed_topics: RwLock::new(Vec::new()), - seq_no: AtomicUsize::new(0), - received: Mutex::new(FnvHashSet::default()), - }); - - let upgrade = FloodSubUpgrade { inner: inner }; - - let receiver = FloodSubReceiver { inner: output_rx }; - - (upgrade, receiver) - } -} - -impl ConnectionUpgrade for FloodSubUpgrade -where - C: AsyncRead + AsyncWrite + Send + 'static, -{ - type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; - type UpgradeIdentifier = (); - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once(("/floodsub/1.0.0".into(), ())) - } - - type Output = FloodSubFuture; - type Future = Box + Send>; - - #[inline] - fn upgrade( - self, - socket: C, - _: Self::UpgradeIdentifier, - _: Endpoint, - ) -> Self::Future { - debug!("Upgrading connection as floodsub"); - - let future = { - // FIXME: WRONG - let remote_addr: Multiaddr = "/ip4/127.0.0.1/tcp/5000".parse().unwrap(); - - // Whenever a new node connects, we send to it a message containing the topics we are - // already subscribed to. - let init_msg: Vec = { - let subscribed_topics = self.inner.subscribed_topics.read(); - let mut proto = rpc_proto::RPC::new(); - - for topic in subscribed_topics.iter() { - let mut subscription = rpc_proto::RPC_SubOpts::new(); - subscription.set_subscribe(true); - subscription.set_topicid(topic.hash().clone().into_string()); - proto.mut_subscriptions().push(subscription); - } - - proto - .write_to_bytes() - .expect("programmer error: the protobuf message should always be valid") - }; - - // Split the socket into writing and reading parts. - let (floodsub_sink, floodsub_stream) = Framed::new(socket, codec::UviBytes::default()) - .sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) - .map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) - .split(); - - // Build the channel that will be used to communicate outgoing message to this remote. - let (input_tx, input_rx) = mpsc::unbounded(); - input_tx - .unbounded_send(init_msg.into()) - .expect("newly-created channel should always be open"); - self.inner.remote_connections.write().insert( - remote_addr.clone(), - RemoteInfo { - sender: input_tx, - subscribed_topics: RwLock::new(FnvHashSet::default()), - }, - ); - - // Combine the socket read and the outgoing messages input, so that we can wake up when - // either happens. - let messages = input_rx - .map(|m| (m, MessageSource::FromChannel)) - .map_err(|_| unreachable!("channel streams should never produce an error")) - .select(floodsub_stream.map(|m| (m, MessageSource::FromSocket))); - - #[derive(Debug)] - enum MessageSource { - FromSocket, - FromChannel, - } - - let inner = self.inner.clone(); - let future = future::loop_fn( - (floodsub_sink, messages), - move |(floodsub_sink, messages)| { - let inner = inner.clone(); - let remote_addr = remote_addr.clone(); - - messages - .into_future() - .map_err(|(err, _)| err) - .and_then(move |(input, rest)| { - match input { - Some((bytes, MessageSource::FromSocket)) => { - // Received a packet from the remote. - let fut = match handle_packet_received(bytes, inner, &remote_addr) { - Ok(()) => { - future::ok(future::Loop::Continue((floodsub_sink, rest))) - } - Err(err) => future::err(err), - }; - Box::new(fut) as Box<_> - } - - Some((bytes, MessageSource::FromChannel)) => { - // Received a packet from the channel. - // Need to send a message to remote. - trace!("Effectively sending message to remote"); - let future = floodsub_sink.send(bytes).map(|floodsub_sink| { - future::Loop::Continue((floodsub_sink, rest)) - }); - Box::new(future) as Box<_> - } - - None => { - // Both the connection stream and `rx` are empty, so we break - // the loop. - trace!("Pubsub future clean finish"); - // TODO: what if multiple connections? - inner.remote_connections.write().remove(&remote_addr); - let future = future::ok(future::Loop::Break(())); - Box::new(future) as Box + Send> - } - } - }) - }, - ); - - future::ok(FloodSubFuture { - inner: Box::new(future) as Box<_>, - }) - }; - - Box::new(future) as Box<_> - } -} - -/// Allows one to control the behaviour of the floodsub system. -#[derive(Clone)] -pub struct FloodSubController { - inner: Arc, -} - -struct Inner { - // Our local peer ID multihash, to pass as the source. - peer_id: Vec, - - // Channel where to send the messages that should be dispatched to the user. - output_tx: mpsc::UnboundedSender, - - // Active connections with a remote. - remote_connections: RwLock>, - - // List of topics we're subscribed to. Necessary in order to filter out messages that we - // erroneously receive. - subscribed_topics: RwLock>, - - // Sequence number for the messages we send. - seq_no: AtomicUsize, - - // We keep track of the messages we received (in the format `(remote ID, seq_no)`) so that we - // don't dispatch the same message twice if we receive it twice on the network. - // TODO: the `HashSet` will keep growing indefinitely :-/ - received: Mutex>, -} - -struct RemoteInfo { - // Sender to send data over the socket to that host. - sender: mpsc::UnboundedSender, - // Topics the remote is registered to. - subscribed_topics: RwLock>, -} - -impl fmt::Debug for Inner { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Inner") - .field("peer_id", &self.peer_id) - .field( - "num_remote_connections", - &self.remote_connections.read().len(), - ) - .field("subscribed_topics", &*self.subscribed_topics.read()) - .field("seq_no", &self.seq_no) - .field("received", &self.received) - .finish() - } -} - -impl FloodSubController { - /// Builds a new controller for floodsub. - #[inline] - pub fn new(upgrade: &FloodSubUpgrade) -> Self { - FloodSubController { - inner: upgrade.inner.clone(), - } - } - - /// Subscribe to a topic. When a node on the network sends a message for that topic, we will - /// likely receive it. - /// - /// It is not guaranteed that we receive every single message published on the network. - #[inline] - pub fn subscribe(&self, topic: &Topic) { - // This function exists for convenience. - self.subscribe_many(iter::once(topic)); - } - - /// Same as `subscribe`, but subscribes to multiple topics at once. - /// - /// Since this results in a single packet sent to the remotes, it is preferable to use this - /// method when subscribing to multiple topics at once rather than call `subscribe` multiple - /// times. - #[inline] - pub fn subscribe_many<'a, I>(&self, topics: I) - where - I: IntoIterator, - I::IntoIter: Clone, - { - // This function exists for convenience. - self.sub_unsub_multi(topics.into_iter().map::<_, fn(_) -> _>(|t| (t, true))) - } - - /// Unsubscribe from a topic. We will no longer receive any message for this topic. - /// - /// If a message was sent to us before we are able to notify that we don't want messages - /// anymore, then the message will be filtered out locally. - #[inline] - pub fn unsubscribe(&self, topic: &Topic) { - // This function exists for convenience. - self.unsubscribe_many(iter::once(topic)); - } - - /// Same as `unsubscribe` but unsubscribes from multiple topics at once. - /// - /// Since this results in a single packet sent to the remotes, it is preferable to use this - /// method when unsubscribing from multiple topics at once rather than call `unsubscribe` - /// multiple times. - #[inline] - pub fn unsubscribe_many<'a, I>(&self, topics: I) - where - I: IntoIterator, - I::IntoIter: Clone, - { - // This function exists for convenience. - self.sub_unsub_multi(topics.into_iter().map::<_, fn(_) -> _>(|t| (t, false))); - } - - // Inner implementation. The iterator should produce a boolean that is true if we subscribe and - // false if we unsubscribe. - fn sub_unsub_multi<'a, I>(&self, topics: I) - where - I: IntoIterator, - I::IntoIter: Clone, - { - let mut proto = rpc_proto::RPC::new(); - - let topics = topics.into_iter(); - - if log_enabled!(Level::Debug) { - debug!("Queuing sub/unsub message; sub = {:?}; unsub = {:?}", - topics.clone().filter(|t| t.1) - .map(|t| t.0.hash().clone().into_string()) - .collect::>(), - topics.clone().filter(|t| !t.1) - .map(|t| t.0.hash().clone().into_string()) - .collect::>()); - } - - let mut subscribed_topics = self.inner.subscribed_topics.write(); - for (topic, subscribe) in topics { - let mut subscription = rpc_proto::RPC_SubOpts::new(); - subscription.set_subscribe(subscribe); - subscription.set_topicid(topic.hash().clone().into_string()); - proto.mut_subscriptions().push(subscription); - - if subscribe { - subscribed_topics.push(topic.clone()); - } else { - subscribed_topics.retain(|t| t.hash() != topic.hash()) - } - } - - self.broadcast(proto, |_| true); - } - - /// Publishes a message on the network for the specified topic - #[inline] - pub fn publish(&self, topic: &Topic, data: Vec) { - // This function exists for convenience. - self.publish_many(iter::once(topic), data) - } - - /// Publishes a message on the network for the specified topics. - /// - /// Since this results in a single packet sent to the remotes, it is preferable to use this - /// method when publishing multiple messages at once rather than call `publish` multiple - /// times. - pub fn publish_many<'a, I>(&self, topics: I, data: Vec) - where - I: IntoIterator, - { - let topics = topics.into_iter().collect::>(); - - debug!("Queueing publish message; topics = {:?}; data_len = {:?}", - topics.iter().map(|t| t.hash().clone().into_string()).collect::>(), - data.len()); - - // Build the `Vec` containing our sequence number for this message. - let seq_no_bytes = { - let mut seqno_bytes = Vec::new(); - let seqn = self.inner.seq_no.fetch_add(1, Ordering::Relaxed); - seqno_bytes - .write_u64::(seqn as u64) - .expect("writing to a Vec never fails"); - seqno_bytes - }; - - // TODO: should handle encryption/authentication of the message - - let mut msg = rpc_proto::Message::new(); - msg.set_data(data); - msg.set_from(self.inner.peer_id.clone()); - msg.set_seqno(seq_no_bytes.clone()); - msg.set_topicIDs( - topics - .iter() - .map(|t| t.hash().clone().into_string()) - .collect(), - ); - - let mut proto = rpc_proto::RPC::new(); - proto.mut_publish().push(msg); - - // Insert into `received` so that we ignore the message if a remote sends it back to us. - self.inner - .received - .lock() - .insert(hash((self.inner.peer_id.clone(), seq_no_bytes))); - - self.broadcast(proto, |r_top| { - topics.iter().any(|t| r_top.iter().any(|to| to == t.hash())) - }); - } - - // Internal function that dispatches an `RPC` protobuf struct to all the connected remotes - // for which `filter` returns true. - fn broadcast(&self, message: rpc_proto::RPC, mut filter: F) - where - F: FnMut(&FnvHashSet) -> bool, - { - let bytes = message - .write_to_bytes() - .expect("protobuf message is always valid"); - - let remote_connections = self.inner.remote_connections.upgradable_read(); - - // Number of remotes we dispatched to, for logging purposes. - let mut num_dispatched = 0; - // Will store the addresses of remotes which we failed to send a message to and which - // must be removed from the active connections. - // We use a smallvec of 6 elements because it is unlikely that we lost connection to more - // than 6 elements at once. - let mut failed_to_send: SmallVec<[_; 6]> = SmallVec::new(); - for (remote_addr, remote) in remote_connections.iter() { - if !filter(&remote.subscribed_topics.read()) { - continue; - } - - num_dispatched += 1; - match remote.sender.unbounded_send(bytes.clone().into()) { - Ok(_) => (), - Err(_) => { - trace!("Failed to dispatch message to {} because channel was closed", - remote_addr); - failed_to_send.push(remote_addr.clone()); - } - } - } - - // Remove the remotes which we failed to send a message to. - if !failed_to_send.is_empty() { - // If we fail to upgrade the read lock to a write lock, just ignore `failed_to_send`. - if let Ok(mut remote_connections) = RwLockUpgradableReadGuard::try_upgrade(remote_connections) { - for failed_to_send in failed_to_send { - remote_connections.remove(&failed_to_send); - } - } - } - - debug!("Message queued for {} remotes", num_dispatched); - } -} - -/// Implementation of `Stream` that provides messages for the subscribed topics you subscribed to. -pub struct FloodSubReceiver { - inner: mpsc::UnboundedReceiver, -} - -impl Stream for FloodSubReceiver { - type Item = Message; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - self.inner - .poll() - .map_err(|_| unreachable!("UnboundedReceiver cannot err")) - } -} - -impl fmt::Debug for FloodSubReceiver { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("FloodSubReceiver").finish() - } -} - -/// A message received by the floodsub system. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Message { - /// Remote that sent the message. - pub source: Multiaddr, - - /// Content of the message. Its meaning is out of scope of this library. - pub data: Vec, - - /// List of topics of this message. - /// - /// Each message can belong to multiple topics at once. - pub topics: Vec, -} - -/// Implementation of `Future` that must be driven to completion in order for floodsub to work. -#[must_use = "futures do nothing unless polled"] -pub struct FloodSubFuture { - inner: Box + Send>, -} - -impl Future for FloodSubFuture { - type Item = (); - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - self.inner.poll() - } -} - -impl fmt::Debug for FloodSubFuture { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("FloodSubFuture").finish() - } -} - -// Handles when a packet is received on a connection. -// -// - `bytes` contains the raw data. -// - `remote_addr` is the address of the sender. -fn handle_packet_received( - bytes: BytesMut, - inner: Arc, - remote_addr: &Multiaddr, -) -> Result<(), IoError> { - trace!("Received packet from {}", remote_addr); - - // Parsing attempt. - let mut input = match protobuf::parse_from_bytes::(&bytes) { - Ok(msg) => msg, - Err(err) => { - debug!("Failed to parse protobuf message; err = {:?}", err); - return Err(err.into()); - } - }; - - // Update the topics the remote is subscribed to. - if !input.get_subscriptions().is_empty() { - let remote_connec = inner.remote_connections.write(); - // TODO: what if multiple entries? - let remote = &remote_connec[remote_addr]; - let mut topics = remote.subscribed_topics.write(); - for subscription in input.mut_subscriptions().iter_mut() { - let topic = TopicHash::from_raw(subscription.take_topicid()); - let subscribe = subscription.get_subscribe(); - if subscribe { - trace!("Remote {} subscribed to {:?}", remote_addr, topic); topics.insert(topic); - } else { - trace!("Remote {} unsubscribed from {:?}", remote_addr, topic); - topics.remove(&topic); - } - } - } - - // Handle the messages coming from the remote. - for publish in input.mut_publish().iter_mut() { - let from = publish.take_from(); - // We maintain a list of the messages that have already been - // processed so that we don't process the same message twice. - // Each message is identified by the `(from, seqno)` tuple. - if !inner - .received - .lock() - .insert(hash((from.clone(), publish.take_seqno()))) - { - trace!("Skipping message because we had already received it; payload = {} bytes", - publish.get_data().len()); - continue; - } - - let peer_id = match PeerId::from_bytes(from.clone()) { - Ok(id) => id, - Err(err) => { - trace!("Parsing PeerId failed: {:?}. Skipping.", err); - continue - } - }; - - let from: Multiaddr = Protocol::P2p(peer_id.into()).into(); - - let topics = publish - .take_topicIDs() - .into_iter() - .map(|h| TopicHash::from_raw(h)) - .collect::>(); - - trace!("Processing message for topics {:?}; payload = {} bytes", - topics, - publish.get_data().len()); - - // TODO: should check encryption/authentication of the message - - // Broadcast the message to all the other remotes. - { - let remote_connections = inner.remote_connections.read(); - for (addr, info) in remote_connections.iter() { - let st = info.subscribed_topics.read(); - if !topics.iter().any(|t| st.contains(t)) { - continue; - } - // TODO: don't send back to the remote that just sent it - trace!("Broadcasting received message to {}", addr); - let _ = info.sender.unbounded_send(bytes.clone()); - } - } - - // Send the message locally if relevant. - let dispatch_locally = { - let subscribed_topics = inner.subscribed_topics.read(); - topics - .iter() - .any(|t| subscribed_topics.iter().any(|topic| topic.hash() == t)) - }; - if dispatch_locally { - // Ignore if channel is closed. - trace!("Dispatching message locally"); - let _ = inner.output_tx.unbounded_send(Message { - source: from, - data: publish.take_data(), - topics: topics, - }); - } else { - trace!("Message not dispatched locally as we are not subscribed to any of the topics"); - } - } - - Ok(()) -} - -// Shortcut function that hashes a value. -#[inline] -fn hash(value: V) -> u64 { - let mut h = FnvHasher::default(); - value.hash(&mut h); - h.finish() -} diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs new file mode 100644 index 00000000..484d252d --- /dev/null +++ b/protocols/floodsub/src/protocol.rs @@ -0,0 +1,209 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::future; +use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId}; +use protobuf::Message as ProtobufMessage; +use rpc_proto; +use std::{io, iter}; +use tokio_codec::{Decoder, Encoder, Framed}; +use tokio_io::{AsyncRead, AsyncWrite}; +use topic::TopicHash; +use unsigned_varint::codec; + +/// Implementation of the `ConnectionUpgrade` for the floodsub protocol. +#[derive(Debug, Clone)] +pub struct FloodsubConfig {} + +impl FloodsubConfig { + /// Builds a new `FloodsubConfig`. + #[inline] + pub fn new() -> FloodsubConfig { + FloodsubConfig {} + } +} + +impl ConnectionUpgrade for FloodsubConfig +where + TSocket: AsyncRead + AsyncWrite, +{ + type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once(("/floodsub/1.0.0".into(), ())) + } + + type Output = Framed; + type Future = future::FutureResult; + + #[inline] + fn upgrade(self, socket: TSocket, _: Self::UpgradeIdentifier, _: Endpoint) -> Self::Future { + future::ok(Framed::new( + socket, + FloodsubCodec { + length_prefix: Default::default(), + }, + )) + } +} + +/// Implementation of `tokio_codec::Codec`. +pub struct FloodsubCodec { + /// The codec for encoding/decoding the length prefix of messages. + length_prefix: codec::UviBytes, +} + +impl Encoder for FloodsubCodec { + type Item = FloodsubRpc; + type Error = io::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut proto = rpc_proto::RPC::new(); + + for message in item.messages.into_iter() { + let mut msg = rpc_proto::Message::new(); + msg.set_from(message.source.into_bytes()); + msg.set_data(message.data); + msg.set_seqno(message.sequence_number); + msg.set_topicIDs( + message + .topics + .into_iter() + .map(TopicHash::into_string) + .collect(), + ); + proto.mut_publish().push(msg); + } + + for topic in item.subscriptions.into_iter() { + let mut subscription = rpc_proto::RPC_SubOpts::new(); + subscription.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe); + subscription.set_topicid(topic.topic.into_string()); + proto.mut_subscriptions().push(subscription); + } + + let msg_size = proto.compute_size(); + // Reserve enough space for the data and the length. The length has a maximum of 32 bits, + // which means that 5 bytes is enough for the variable-length integer. + dst.reserve(msg_size as usize + 5); + + proto + .write_length_delimited_to_writer(&mut dst.by_ref().writer()) + .expect( + "there is no situation in which the protobuf message can be invalid, and \ + writing to a BytesMut never fails as we reserved enough space beforehand", + ); + Ok(()) + } +} + +impl Decoder for FloodsubCodec { + type Item = FloodsubRpc; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let packet = match self.length_prefix.decode(src)? { + Some(p) => p, + None => return Ok(None), + }; + + let mut rpc: rpc_proto::RPC = protobuf::parse_from_bytes(&packet)?; + + let mut messages = Vec::with_capacity(rpc.get_publish().len()); + for mut publish in rpc.take_publish().into_iter() { + messages.push(FloodsubMessage { + source: PeerId::from_bytes(publish.take_from()).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "Invalid peer ID in message") + })?, + data: publish.take_data(), + sequence_number: publish.take_seqno(), + topics: publish + .take_topicIDs() + .into_iter() + .map(|topic| TopicHash::from_raw(topic)) + .collect(), + }); + } + + Ok(Some(FloodsubRpc { + messages, + subscriptions: rpc + .take_subscriptions() + .into_iter() + .map(|mut sub| FloodsubSubscription { + action: if sub.get_subscribe() { + FloodsubSubscriptionAction::Subscribe + } else { + FloodsubSubscriptionAction::Unsubscribe + }, + topic: TopicHash::from_raw(sub.take_topicid()), + }) + .collect(), + })) + } +} + +/// An RPC received by the floodsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FloodsubRpc { + /// List of messages that were part of this RPC query. + pub messages: Vec, + /// List of subscriptions. + pub subscriptions: Vec, +} + +/// A message received by the floodsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FloodsubMessage { + /// Id of the peer that published this message. + pub source: PeerId, + + /// Content of the message. Its meaning is out of scope of this library. + pub data: Vec, + + /// An incrementing sequence number. + pub sequence_number: Vec, + + /// List of topics this message belongs to. + /// + /// Each message can belong to multiple topics at once. + pub topics: Vec, +} + +/// A subscription received by the floodsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FloodsubSubscription { + /// Action to perform. + pub action: FloodsubSubscriptionAction, + /// The topic from which to subscribe or unsubscribe. + pub topic: TopicHash, +} + +/// Action that a subscription wants to perform. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum FloodsubSubscriptionAction { + /// The remote wants to subscribe to the given topic. + Subscribe, + /// The remote wants to unsubscribe from the given topic. + Unsubscribe, +} diff --git a/protocols/floodsub/src/topic.rs b/protocols/floodsub/src/topic.rs index fc8d280d..b26e6f15 100644 --- a/protocols/floodsub/src/topic.rs +++ b/protocols/floodsub/src/topic.rs @@ -59,6 +59,27 @@ impl Topic { } } +impl AsRef for Topic { + #[inline] + fn as_ref(&self) -> &TopicHash { + &self.hash + } +} + +impl From for TopicHash { + #[inline] + fn from(topic: Topic) -> TopicHash { + topic.hash + } +} + +impl<'a> From<&'a Topic> for TopicHash { + #[inline] + fn from(topic: &'a Topic) -> TopicHash { + topic.hash.clone() + } +} + /// Builder for a `TopicHash`. #[derive(Debug, Clone)] pub struct TopicBuilder { @@ -78,15 +99,17 @@ impl TopicBuilder { /// Turns the builder into an actual `Topic`. pub fn build(self) -> Topic { - let bytes = self.builder + let bytes = self + .builder .write_to_bytes() .expect("protobuf message is always valid"); + // TODO: https://github.com/libp2p/rust-libp2p/issues/473 let hash = TopicHash { hash: bs58::encode(&bytes).into_string(), }; Topic { descriptor: self.builder, - hash: hash, + hash, } } }