Rewrite floodsub to use the ProtocolsHandler (#603)

* Move lib.rs to protocol.rs

* Rewrite floodsub for ProtocolsHandler

* Add a FloodsubBehaviour

* Fix closing floodsub after a message

* Address concern

* Make it conform to the protocol

* Make it really conformant

* Address concerns
This commit is contained in:
Pierre Krieger 2018-11-13 14:46:57 +01:00 committed by GitHub
parent b3b5a4bdfd
commit f8ccb0af36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 758 additions and 633 deletions

View File

@ -6,16 +6,13 @@ license = "MIT"
[dependencies] [dependencies]
bs58 = "0.2.0" bs58 = "0.2.0"
byteorder = "1.2.1"
bytes = "0.4" bytes = "0.4"
cuckoofilter = "0.3.2"
fnv = "1.0" fnv = "1.0"
futures = "0.1" futures = "0.1"
libp2p-core = { path = "../../core" } libp2p-core = { path = "../../core" }
log = "0.4.1"
multiaddr = { path = "../../misc/multiaddr" }
parking_lot = "0.6"
protobuf = "2.0.2" protobuf = "2.0.2"
smallvec = "0.6" smallvec = "0.6.5"
tokio-codec = "0.1" tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
unsigned-varint = { version = "0.2.1", features = ["codec"] } unsigned-varint = { version = "0.2.1", features = ["codec"] }

View File

@ -0,0 +1,238 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use libp2p_core::nodes::{NodeHandlerEndpoint, ProtocolsHandler, ProtocolsHandlerEvent};
use libp2p_core::ConnectionUpgrade;
use protocol::{FloodsubCodec, FloodsubConfig, FloodsubRpc};
use smallvec::SmallVec;
use std::{fmt, io};
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
/// Protocol handler that handles communications with the remote for the fileshare protocol.
///
/// The handler will automatically open a substream with the remote for each request we make.
///
/// It also handles requests made by the remote.
pub struct FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Configuration for the Kademlia protocol.
config: FloodsubConfig,
/// If true, we are trying to shut down the existing Kademlia substream and should refuse any
/// incoming connection.
shutting_down: bool,
/// The active substreams.
// TODO: add a limit to the number of allowed substreams
substreams: Vec<SubstreamState<TSubstream>>,
/// Queue of values that we want to send to the remote.
send_queue: SmallVec<[FloodsubRpc; 16]>,
}
/// State of an active substream, opened either by us or by the remote.
enum SubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Waiting for a message from the remote.
WaitingInput(Framed<TSubstream, FloodsubCodec>),
/// Waiting to send a message to the remote.
PendingSend(Framed<TSubstream, FloodsubCodec>, FloodsubRpc),
/// Waiting to send a message to the remote.
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(Framed<TSubstream, FloodsubCodec>),
/// The substream is being closed.
Closing(Framed<TSubstream, FloodsubCodec>),
}
impl<TSubstream> SubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Consumes this state and produces the substream, if relevant.
fn into_substream(self) -> Option<Framed<TSubstream, FloodsubCodec>> {
match self {
SubstreamState::WaitingInput(substream) => Some(substream),
SubstreamState::PendingSend(substream, _) => Some(substream),
SubstreamState::PendingFlush(substream) => Some(substream),
SubstreamState::Closing(substream) => Some(substream),
}
}
}
impl<TSubstream> FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Builds a new `FloodsubHandler`.
pub fn new() -> Self {
FloodsubHandler {
config: FloodsubConfig::new(),
shutting_down: false,
substreams: Vec::new(),
send_queue: SmallVec::new(),
}
}
}
impl<TSubstream> ProtocolsHandler for FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + 'static,
{
type InEvent = FloodsubRpc;
type OutEvent = FloodsubRpc;
type Substream = TSubstream;
type Protocol = FloodsubConfig;
type OutboundOpenInfo = FloodsubRpc;
#[inline]
fn listen_protocol(&self) -> Self::Protocol {
self.config.clone()
}
fn inject_fully_negotiated(
&mut self,
protocol: <Self::Protocol as ConnectionUpgrade<TSubstream>>::Output,
endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>,
) {
if self.shutting_down {
return;
}
match endpoint {
NodeHandlerEndpoint::Dialer(message) => {
self.substreams
.push(SubstreamState::PendingSend(protocol, message));
}
NodeHandlerEndpoint::Listener => {
self.substreams.push(SubstreamState::WaitingInput(protocol));
}
}
}
#[inline]
fn inject_event(&mut self, message: FloodsubRpc) {
self.send_queue.push(message);
}
#[inline]
fn inject_inbound_closed(&mut self) {}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {}
#[inline]
fn shutdown(&mut self) {
self.shutting_down = true;
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
if let Some(substream) = substream.into_substream() {
self.substreams.push(SubstreamState::Closing(substream));
}
}
}
fn poll(
&mut self,
) -> Poll<
Option<ProtocolsHandlerEvent<Self::Protocol, Self::OutboundOpenInfo, Self::OutEvent>>,
io::Error,
> {
if !self.send_queue.is_empty() {
let message = self.send_queue.remove(0);
return Ok(Async::Ready(Some(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
info: message,
upgrade: self.config.clone(),
},
)));
}
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
loop {
substream = match substream {
SubstreamState::WaitingInput(mut substream) => match substream.poll() {
Ok(Async::Ready(Some(message))) => {
self.substreams
.push(SubstreamState::WaitingInput(substream));
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(message))));
}
Ok(Async::Ready(None)) => SubstreamState::Closing(substream),
Ok(Async::NotReady) => {
self.substreams
.push(SubstreamState::WaitingInput(substream));
return Ok(Async::NotReady);
}
Err(_) => SubstreamState::Closing(substream),
},
SubstreamState::PendingSend(mut substream, message) => {
match substream.start_send(message)? {
AsyncSink::Ready => SubstreamState::PendingFlush(substream),
AsyncSink::NotReady(message) => {
self.substreams
.push(SubstreamState::PendingSend(substream, message));
return Ok(Async::NotReady);
}
}
}
SubstreamState::PendingFlush(mut substream) => {
match substream.poll_complete()? {
Async::Ready(()) => SubstreamState::Closing(substream),
Async::NotReady => {
self.substreams
.push(SubstreamState::PendingFlush(substream));
return Ok(Async::NotReady);
}
}
}
SubstreamState::Closing(mut substream) => match substream.close() {
Ok(Async::Ready(())) => break,
Ok(Async::NotReady) => {
self.substreams.push(SubstreamState::Closing(substream));
return Ok(Async::NotReady);
}
Err(_) => return Ok(Async::Ready(None)),
},
}
}
}
Ok(Async::NotReady)
}
}
impl<TSubstream> fmt::Debug for FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("FloodsubHandler")
.field("shutting_down", &self.shutting_down)
.field("substreams", &self.substreams.len())
.field("send_queue", &self.send_queue.len())
.finish()
}
}

View File

@ -0,0 +1,277 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use cuckoofilter::CuckooFilter;
use futures::prelude::*;
use handler::FloodsubHandler;
use libp2p_core::nodes::{ConnectedPoint, NetworkBehavior, NetworkBehaviorAction};
use libp2p_core::{nodes::protocols_handler::ProtocolsHandler, PeerId};
use protocol::{FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use smallvec::SmallVec;
use std::{collections::VecDeque, iter, marker::PhantomData};
use std::collections::hash_map::{DefaultHasher, HashMap};
use tokio_io::{AsyncRead, AsyncWrite};
use topic::{Topic, TopicHash};
/// Network behaviour that automatically identifies nodes periodically, and returns information
/// about them.
pub struct FloodsubBehaviour<TSubstream> {
/// Events that need to be produced outside when polling.
events: VecDeque<NetworkBehaviorAction<FloodsubRpc, FloodsubMessage>>,
/// Peer id of the local node. Used for the source of the messages that we publish.
local_peer_id: PeerId,
/// List of peers the network is connected to, and the topics that they're subscribed to.
// TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
// opened substream
connected_peers: HashMap<PeerId, SmallVec<[TopicHash; 8]>>,
// List of topics we're subscribed to. Necessary in order to filter out messages that we
// erroneously receive.
subscribed_topics: SmallVec<[Topic; 16]>,
// Sequence number for the messages we send.
seq_no: usize,
// We keep track of the messages we received (in the format `hash(source ID, seq_no)`) so that
// we don't dispatch the same message twice if we receive it twice on the network.
received: CuckooFilter<DefaultHasher>,
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
}
impl<TSubstream> FloodsubBehaviour<TSubstream> {
/// Creates a `FloodsubBehaviour`.
pub fn new(local_peer_id: PeerId) -> Self {
FloodsubBehaviour {
events: VecDeque::new(),
local_peer_id,
connected_peers: HashMap::new(),
subscribed_topics: SmallVec::new(),
seq_no: 0,
received: CuckooFilter::new(),
marker: PhantomData,
}
}
}
impl<TSubstream> FloodsubBehaviour<TSubstream> {
/// Subscribes to a topic.
///
/// Returns true if the subscription worked. Returns false if we were already subscribed.
pub fn subscribe(&mut self, topic: Topic) -> bool {
if self.subscribed_topics.iter().any(|t| t.hash() == topic.hash()) {
return false;
}
for peer in self.connected_peers.keys() {
self.events.push_back(NetworkBehaviorAction::SendEvent {
peer_id: peer.clone(),
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(),
action: FloodsubSubscriptionAction::Subscribe,
}],
},
});
}
self.subscribed_topics.push(topic);
true
}
/// Unsubscribes from a topic.
///
/// Returns true if we were subscribed to this topic.
pub fn unsubscribe(&mut self, topic: impl AsRef<TopicHash>) -> bool {
let topic = topic.as_ref();
let pos = match self.subscribed_topics.iter().position(|t| t.hash() == topic) {
Some(pos) => pos,
None => return false
};
self.subscribed_topics.remove(pos);
for peer in self.connected_peers.keys() {
self.events.push_back(NetworkBehaviorAction::SendEvent {
peer_id: peer.clone(),
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.clone(),
action: FloodsubSubscriptionAction::Unsubscribe,
}],
},
});
}
true
}
/// Publishes a message to the network.
///
/// > **Note**: Doesn't do anything if we're not subscribed to the topic.
pub fn publish(&mut self, topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) {
self.publish_many(iter::once(topic), data)
}
/// Publishes a message to the network that has multiple topics.
///
/// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) {
let message = FloodsubMessage {
source: self.local_peer_id.clone(),
data: data.into(),
sequence_number: self.next_sequence_number(),
topics: topic.into_iter().map(|t| t.into().clone()).collect(),
};
// Don't publish the message if we're not subscribed ourselves to any of the topics.
if !self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) {
return;
}
self.received.add(&message);
// Send to peers we know are subscribed to the topic.
for (peer_id, sub_topic) in self.connected_peers.iter() {
if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) {
continue;
}
self.events.push_back(NetworkBehaviorAction::SendEvent {
peer_id: peer_id.clone(),
event: FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
}
});
}
}
/// Builds a unique sequence number to put in a `FloodsubMessage`.
fn next_sequence_number(&mut self) -> Vec<u8> {
let data = self.seq_no.to_string();
self.seq_no += 1;
data.into()
}
}
impl<TSubstream> NetworkBehavior for FloodsubBehaviour<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static,
{
type ProtocolsHandler = FloodsubHandler<TSubstream>;
type OutEvent = FloodsubMessage;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
FloodsubHandler::new()
}
fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
// We need to send our subscriptions to the newly-connected node.
for topic in self.subscribed_topics.iter() {
self.events.push_back(NetworkBehaviorAction::SendEvent {
peer_id: id.clone(),
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(),
action: FloodsubSubscriptionAction::Subscribe,
}],
},
});
}
self.connected_peers.insert(id.clone(), SmallVec::new());
}
fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) {
let was_in = self.connected_peers.remove(id);
debug_assert!(was_in.is_some());
}
fn inject_node_event(
&mut self,
propagation_source: PeerId,
event: FloodsubRpc,
) {
// List of messages we're going to propagate on the network.
let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
for message in event.messages {
// Use `self.received` to skip the messages that we have already received in the past.
// Note that this can false positive.
if !self.received.test_and_add(&message) {
continue;
}
// Add the message to be dispatched to the user.
if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) {
self.events.push_back(NetworkBehaviorAction::GenerateEvent(message.clone()));
}
// Propagate the message to everyone else who is subscribed to any of the topics.
for (peer_id, sub_topics) in self.connected_peers.iter() {
if peer_id == &propagation_source {
continue;
}
if !sub_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
continue;
}
if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
rpcs_to_dispatch[pos].1.messages.push(message.clone());
} else {
rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
}));
}
}
}
for (peer_id, rpc) in rpcs_to_dispatch {
self.events.push_back(NetworkBehaviorAction::SendEvent {
peer_id,
event: rpc,
});
}
}
fn poll(
&mut self,
) -> Async<
NetworkBehaviorAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Async::Ready(event);
}
Async::NotReady
}
}

View File

@ -19,643 +19,24 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
extern crate bs58; extern crate bs58;
extern crate byteorder;
extern crate bytes; extern crate bytes;
extern crate cuckoofilter;
extern crate fnv; extern crate fnv;
extern crate futures; extern crate futures;
extern crate libp2p_core; extern crate libp2p_core;
#[macro_use]
extern crate log;
extern crate multiaddr;
extern crate parking_lot;
extern crate protobuf; extern crate protobuf;
extern crate smallvec; extern crate smallvec;
extern crate tokio_codec; extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate unsigned_varint; extern crate unsigned_varint;
mod handler;
mod layer;
mod protocol;
mod rpc_proto; mod rpc_proto;
mod topic; mod topic;
pub use self::handler::FloodsubHandler;
pub use self::layer::FloodsubBehaviour;
pub use self::protocol::*; // TODO: exact reexports
pub use self::topic::{Topic, TopicBuilder, TopicHash}; pub use self::topic::{Topic, TopicBuilder, TopicHash};
use byteorder::{BigEndian, WriteBytesExt};
use bytes::{Bytes, BytesMut};
use fnv::{FnvHashMap, FnvHashSet, FnvHasher};
use futures::sync::mpsc;
use futures::{future, Future, Poll, Sink, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
use log::Level;
use multiaddr::{Protocol, Multiaddr};
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use protobuf::Message as ProtobufMessage;
use smallvec::SmallVec;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec;
/// Implementation of the `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone)]
pub struct FloodSubUpgrade {
inner: Arc<Inner>,
}
impl FloodSubUpgrade {
/// Builds a new `FloodSubUpgrade`. Also returns a `FloodSubReceiver` that will stream incoming
/// messages for the floodsub system.
pub fn new(my_id: PeerId) -> (FloodSubUpgrade, FloodSubReceiver) {
let (output_tx, output_rx) = mpsc::unbounded();
let inner = Arc::new(Inner {
peer_id: my_id.into_bytes(),
output_tx: output_tx,
remote_connections: RwLock::new(FnvHashMap::default()),
subscribed_topics: RwLock::new(Vec::new()),
seq_no: AtomicUsize::new(0),
received: Mutex::new(FnvHashSet::default()),
});
let upgrade = FloodSubUpgrade { inner: inner };
let receiver = FloodSubReceiver { inner: output_rx };
(upgrade, receiver)
}
}
impl<C> ConnectionUpgrade<C> for FloodSubUpgrade
where
C: AsyncRead + AsyncWrite + Send + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once(("/floodsub/1.0.0".into(), ()))
}
type Output = FloodSubFuture;
type Future = Box<Future<Item = Self::Output, Error = IoError> + Send>;
#[inline]
fn upgrade(
self,
socket: C,
_: Self::UpgradeIdentifier,
_: Endpoint,
) -> Self::Future {
debug!("Upgrading connection as floodsub");
let future = {
// FIXME: WRONG
let remote_addr: Multiaddr = "/ip4/127.0.0.1/tcp/5000".parse().unwrap();
// Whenever a new node connects, we send to it a message containing the topics we are
// already subscribed to.
let init_msg: Vec<u8> = {
let subscribed_topics = self.inner.subscribed_topics.read();
let mut proto = rpc_proto::RPC::new();
for topic in subscribed_topics.iter() {
let mut subscription = rpc_proto::RPC_SubOpts::new();
subscription.set_subscribe(true);
subscription.set_topicid(topic.hash().clone().into_string());
proto.mut_subscriptions().push(subscription);
}
proto
.write_to_bytes()
.expect("programmer error: the protobuf message should always be valid")
};
// Split the socket into writing and reading parts.
let (floodsub_sink, floodsub_stream) = Framed::new(socket, codec::UviBytes::default())
.sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
.split();
// Build the channel that will be used to communicate outgoing message to this remote.
let (input_tx, input_rx) = mpsc::unbounded();
input_tx
.unbounded_send(init_msg.into())
.expect("newly-created channel should always be open");
self.inner.remote_connections.write().insert(
remote_addr.clone(),
RemoteInfo {
sender: input_tx,
subscribed_topics: RwLock::new(FnvHashSet::default()),
},
);
// Combine the socket read and the outgoing messages input, so that we can wake up when
// either happens.
let messages = input_rx
.map(|m| (m, MessageSource::FromChannel))
.map_err(|_| unreachable!("channel streams should never produce an error"))
.select(floodsub_stream.map(|m| (m, MessageSource::FromSocket)));
#[derive(Debug)]
enum MessageSource {
FromSocket,
FromChannel,
}
let inner = self.inner.clone();
let future = future::loop_fn(
(floodsub_sink, messages),
move |(floodsub_sink, messages)| {
let inner = inner.clone();
let remote_addr = remote_addr.clone();
messages
.into_future()
.map_err(|(err, _)| err)
.and_then(move |(input, rest)| {
match input {
Some((bytes, MessageSource::FromSocket)) => {
// Received a packet from the remote.
let fut = match handle_packet_received(bytes, inner, &remote_addr) {
Ok(()) => {
future::ok(future::Loop::Continue((floodsub_sink, rest)))
}
Err(err) => future::err(err),
};
Box::new(fut) as Box<_>
}
Some((bytes, MessageSource::FromChannel)) => {
// Received a packet from the channel.
// Need to send a message to remote.
trace!("Effectively sending message to remote");
let future = floodsub_sink.send(bytes).map(|floodsub_sink| {
future::Loop::Continue((floodsub_sink, rest))
});
Box::new(future) as Box<_>
}
None => {
// Both the connection stream and `rx` are empty, so we break
// the loop.
trace!("Pubsub future clean finish");
// TODO: what if multiple connections?
inner.remote_connections.write().remove(&remote_addr);
let future = future::ok(future::Loop::Break(()));
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
}
}
})
},
);
future::ok(FloodSubFuture {
inner: Box::new(future) as Box<_>,
})
};
Box::new(future) as Box<_>
}
}
/// Allows one to control the behaviour of the floodsub system.
#[derive(Clone)]
pub struct FloodSubController {
inner: Arc<Inner>,
}
struct Inner {
// Our local peer ID multihash, to pass as the source.
peer_id: Vec<u8>,
// Channel where to send the messages that should be dispatched to the user.
output_tx: mpsc::UnboundedSender<Message>,
// Active connections with a remote.
remote_connections: RwLock<FnvHashMap<Multiaddr, RemoteInfo>>,
// List of topics we're subscribed to. Necessary in order to filter out messages that we
// erroneously receive.
subscribed_topics: RwLock<Vec<Topic>>,
// Sequence number for the messages we send.
seq_no: AtomicUsize,
// We keep track of the messages we received (in the format `(remote ID, seq_no)`) so that we
// don't dispatch the same message twice if we receive it twice on the network.
// TODO: the `HashSet` will keep growing indefinitely :-/
received: Mutex<FnvHashSet<u64>>,
}
struct RemoteInfo {
// Sender to send data over the socket to that host.
sender: mpsc::UnboundedSender<BytesMut>,
// Topics the remote is registered to.
subscribed_topics: RwLock<FnvHashSet<TopicHash>>,
}
impl fmt::Debug for Inner {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Inner")
.field("peer_id", &self.peer_id)
.field(
"num_remote_connections",
&self.remote_connections.read().len(),
)
.field("subscribed_topics", &*self.subscribed_topics.read())
.field("seq_no", &self.seq_no)
.field("received", &self.received)
.finish()
}
}
impl FloodSubController {
/// Builds a new controller for floodsub.
#[inline]
pub fn new(upgrade: &FloodSubUpgrade) -> Self {
FloodSubController {
inner: upgrade.inner.clone(),
}
}
/// Subscribe to a topic. When a node on the network sends a message for that topic, we will
/// likely receive it.
///
/// It is not guaranteed that we receive every single message published on the network.
#[inline]
pub fn subscribe(&self, topic: &Topic) {
// This function exists for convenience.
self.subscribe_many(iter::once(topic));
}
/// Same as `subscribe`, but subscribes to multiple topics at once.
///
/// Since this results in a single packet sent to the remotes, it is preferable to use this
/// method when subscribing to multiple topics at once rather than call `subscribe` multiple
/// times.
#[inline]
pub fn subscribe_many<'a, I>(&self, topics: I)
where
I: IntoIterator<Item = &'a Topic>,
I::IntoIter: Clone,
{
// This function exists for convenience.
self.sub_unsub_multi(topics.into_iter().map::<_, fn(_) -> _>(|t| (t, true)))
}
/// Unsubscribe from a topic. We will no longer receive any message for this topic.
///
/// If a message was sent to us before we are able to notify that we don't want messages
/// anymore, then the message will be filtered out locally.
#[inline]
pub fn unsubscribe(&self, topic: &Topic) {
// This function exists for convenience.
self.unsubscribe_many(iter::once(topic));
}
/// Same as `unsubscribe` but unsubscribes from multiple topics at once.
///
/// Since this results in a single packet sent to the remotes, it is preferable to use this
/// method when unsubscribing from multiple topics at once rather than call `unsubscribe`
/// multiple times.
#[inline]
pub fn unsubscribe_many<'a, I>(&self, topics: I)
where
I: IntoIterator<Item = &'a Topic>,
I::IntoIter: Clone,
{
// This function exists for convenience.
self.sub_unsub_multi(topics.into_iter().map::<_, fn(_) -> _>(|t| (t, false)));
}
// Inner implementation. The iterator should produce a boolean that is true if we subscribe and
// false if we unsubscribe.
fn sub_unsub_multi<'a, I>(&self, topics: I)
where
I: IntoIterator<Item = (&'a Topic, bool)>,
I::IntoIter: Clone,
{
let mut proto = rpc_proto::RPC::new();
let topics = topics.into_iter();
if log_enabled!(Level::Debug) {
debug!("Queuing sub/unsub message; sub = {:?}; unsub = {:?}",
topics.clone().filter(|t| t.1)
.map(|t| t.0.hash().clone().into_string())
.collect::<Vec<_>>(),
topics.clone().filter(|t| !t.1)
.map(|t| t.0.hash().clone().into_string())
.collect::<Vec<_>>());
}
let mut subscribed_topics = self.inner.subscribed_topics.write();
for (topic, subscribe) in topics {
let mut subscription = rpc_proto::RPC_SubOpts::new();
subscription.set_subscribe(subscribe);
subscription.set_topicid(topic.hash().clone().into_string());
proto.mut_subscriptions().push(subscription);
if subscribe {
subscribed_topics.push(topic.clone());
} else {
subscribed_topics.retain(|t| t.hash() != topic.hash())
}
}
self.broadcast(proto, |_| true);
}
/// Publishes a message on the network for the specified topic
#[inline]
pub fn publish(&self, topic: &Topic, data: Vec<u8>) {
// This function exists for convenience.
self.publish_many(iter::once(topic), data)
}
/// Publishes a message on the network for the specified topics.
///
/// Since this results in a single packet sent to the remotes, it is preferable to use this
/// method when publishing multiple messages at once rather than call `publish` multiple
/// times.
pub fn publish_many<'a, I>(&self, topics: I, data: Vec<u8>)
where
I: IntoIterator<Item = &'a Topic>,
{
let topics = topics.into_iter().collect::<Vec<_>>();
debug!("Queueing publish message; topics = {:?}; data_len = {:?}",
topics.iter().map(|t| t.hash().clone().into_string()).collect::<Vec<_>>(),
data.len());
// Build the `Vec<u8>` containing our sequence number for this message.
let seq_no_bytes = {
let mut seqno_bytes = Vec::new();
let seqn = self.inner.seq_no.fetch_add(1, Ordering::Relaxed);
seqno_bytes
.write_u64::<BigEndian>(seqn as u64)
.expect("writing to a Vec never fails");
seqno_bytes
};
// TODO: should handle encryption/authentication of the message
let mut msg = rpc_proto::Message::new();
msg.set_data(data);
msg.set_from(self.inner.peer_id.clone());
msg.set_seqno(seq_no_bytes.clone());
msg.set_topicIDs(
topics
.iter()
.map(|t| t.hash().clone().into_string())
.collect(),
);
let mut proto = rpc_proto::RPC::new();
proto.mut_publish().push(msg);
// Insert into `received` so that we ignore the message if a remote sends it back to us.
self.inner
.received
.lock()
.insert(hash((self.inner.peer_id.clone(), seq_no_bytes)));
self.broadcast(proto, |r_top| {
topics.iter().any(|t| r_top.iter().any(|to| to == t.hash()))
});
}
// Internal function that dispatches an `RPC` protobuf struct to all the connected remotes
// for which `filter` returns true.
fn broadcast<F>(&self, message: rpc_proto::RPC, mut filter: F)
where
F: FnMut(&FnvHashSet<TopicHash>) -> bool,
{
let bytes = message
.write_to_bytes()
.expect("protobuf message is always valid");
let remote_connections = self.inner.remote_connections.upgradable_read();
// Number of remotes we dispatched to, for logging purposes.
let mut num_dispatched = 0;
// Will store the addresses of remotes which we failed to send a message to and which
// must be removed from the active connections.
// We use a smallvec of 6 elements because it is unlikely that we lost connection to more
// than 6 elements at once.
let mut failed_to_send: SmallVec<[_; 6]> = SmallVec::new();
for (remote_addr, remote) in remote_connections.iter() {
if !filter(&remote.subscribed_topics.read()) {
continue;
}
num_dispatched += 1;
match remote.sender.unbounded_send(bytes.clone().into()) {
Ok(_) => (),
Err(_) => {
trace!("Failed to dispatch message to {} because channel was closed",
remote_addr);
failed_to_send.push(remote_addr.clone());
}
}
}
// Remove the remotes which we failed to send a message to.
if !failed_to_send.is_empty() {
// If we fail to upgrade the read lock to a write lock, just ignore `failed_to_send`.
if let Ok(mut remote_connections) = RwLockUpgradableReadGuard::try_upgrade(remote_connections) {
for failed_to_send in failed_to_send {
remote_connections.remove(&failed_to_send);
}
}
}
debug!("Message queued for {} remotes", num_dispatched);
}
}
/// Implementation of `Stream` that provides messages for the subscribed topics you subscribed to.
pub struct FloodSubReceiver {
inner: mpsc::UnboundedReceiver<Message>,
}
impl Stream for FloodSubReceiver {
type Item = Message;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner
.poll()
.map_err(|_| unreachable!("UnboundedReceiver cannot err"))
}
}
impl fmt::Debug for FloodSubReceiver {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("FloodSubReceiver").finish()
}
}
/// A message received by the floodsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Message {
/// Remote that sent the message.
pub source: Multiaddr,
/// Content of the message. Its meaning is out of scope of this library.
pub data: Vec<u8>,
/// List of topics of this message.
///
/// Each message can belong to multiple topics at once.
pub topics: Vec<TopicHash>,
}
/// Implementation of `Future` that must be driven to completion in order for floodsub to work.
#[must_use = "futures do nothing unless polled"]
pub struct FloodSubFuture {
inner: Box<Future<Item = (), Error = IoError> + Send>,
}
impl Future for FloodSubFuture {
type Item = ();
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
impl fmt::Debug for FloodSubFuture {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("FloodSubFuture").finish()
}
}
// Handles when a packet is received on a connection.
//
// - `bytes` contains the raw data.
// - `remote_addr` is the address of the sender.
fn handle_packet_received(
bytes: BytesMut,
inner: Arc<Inner>,
remote_addr: &Multiaddr,
) -> Result<(), IoError> {
trace!("Received packet from {}", remote_addr);
// Parsing attempt.
let mut input = match protobuf::parse_from_bytes::<rpc_proto::RPC>(&bytes) {
Ok(msg) => msg,
Err(err) => {
debug!("Failed to parse protobuf message; err = {:?}", err);
return Err(err.into());
}
};
// Update the topics the remote is subscribed to.
if !input.get_subscriptions().is_empty() {
let remote_connec = inner.remote_connections.write();
// TODO: what if multiple entries?
let remote = &remote_connec[remote_addr];
let mut topics = remote.subscribed_topics.write();
for subscription in input.mut_subscriptions().iter_mut() {
let topic = TopicHash::from_raw(subscription.take_topicid());
let subscribe = subscription.get_subscribe();
if subscribe {
trace!("Remote {} subscribed to {:?}", remote_addr, topic); topics.insert(topic);
} else {
trace!("Remote {} unsubscribed from {:?}", remote_addr, topic);
topics.remove(&topic);
}
}
}
// Handle the messages coming from the remote.
for publish in input.mut_publish().iter_mut() {
let from = publish.take_from();
// We maintain a list of the messages that have already been
// processed so that we don't process the same message twice.
// Each message is identified by the `(from, seqno)` tuple.
if !inner
.received
.lock()
.insert(hash((from.clone(), publish.take_seqno())))
{
trace!("Skipping message because we had already received it; payload = {} bytes",
publish.get_data().len());
continue;
}
let peer_id = match PeerId::from_bytes(from.clone()) {
Ok(id) => id,
Err(err) => {
trace!("Parsing PeerId failed: {:?}. Skipping.", err);
continue
}
};
let from: Multiaddr = Protocol::P2p(peer_id.into()).into();
let topics = publish
.take_topicIDs()
.into_iter()
.map(|h| TopicHash::from_raw(h))
.collect::<Vec<_>>();
trace!("Processing message for topics {:?}; payload = {} bytes",
topics,
publish.get_data().len());
// TODO: should check encryption/authentication of the message
// Broadcast the message to all the other remotes.
{
let remote_connections = inner.remote_connections.read();
for (addr, info) in remote_connections.iter() {
let st = info.subscribed_topics.read();
if !topics.iter().any(|t| st.contains(t)) {
continue;
}
// TODO: don't send back to the remote that just sent it
trace!("Broadcasting received message to {}", addr);
let _ = info.sender.unbounded_send(bytes.clone());
}
}
// Send the message locally if relevant.
let dispatch_locally = {
let subscribed_topics = inner.subscribed_topics.read();
topics
.iter()
.any(|t| subscribed_topics.iter().any(|topic| topic.hash() == t))
};
if dispatch_locally {
// Ignore if channel is closed.
trace!("Dispatching message locally");
let _ = inner.output_tx.unbounded_send(Message {
source: from,
data: publish.take_data(),
topics: topics,
});
} else {
trace!("Message not dispatched locally as we are not subscribed to any of the topics");
}
}
Ok(())
}
// Shortcut function that hashes a value.
#[inline]
fn hash<V: Hash>(value: V) -> u64 {
let mut h = FnvHasher::default();
value.hash(&mut h);
h.finish()
}

View File

@ -0,0 +1,209 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::{BufMut, Bytes, BytesMut};
use futures::future;
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
use protobuf::Message as ProtobufMessage;
use rpc_proto;
use std::{io, iter};
use tokio_codec::{Decoder, Encoder, Framed};
use tokio_io::{AsyncRead, AsyncWrite};
use topic::TopicHash;
use unsigned_varint::codec;
/// Implementation of the `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone)]
pub struct FloodsubConfig {}
impl FloodsubConfig {
/// Builds a new `FloodsubConfig`.
#[inline]
pub fn new() -> FloodsubConfig {
FloodsubConfig {}
}
}
impl<TSocket> ConnectionUpgrade<TSocket> for FloodsubConfig
where
TSocket: AsyncRead + AsyncWrite,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once(("/floodsub/1.0.0".into(), ()))
}
type Output = Framed<TSocket, FloodsubCodec>;
type Future = future::FutureResult<Self::Output, io::Error>;
#[inline]
fn upgrade(self, socket: TSocket, _: Self::UpgradeIdentifier, _: Endpoint) -> Self::Future {
future::ok(Framed::new(
socket,
FloodsubCodec {
length_prefix: Default::default(),
},
))
}
}
/// Implementation of `tokio_codec::Codec`.
pub struct FloodsubCodec {
/// The codec for encoding/decoding the length prefix of messages.
length_prefix: codec::UviBytes,
}
impl Encoder for FloodsubCodec {
type Item = FloodsubRpc;
type Error = io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut proto = rpc_proto::RPC::new();
for message in item.messages.into_iter() {
let mut msg = rpc_proto::Message::new();
msg.set_from(message.source.into_bytes());
msg.set_data(message.data);
msg.set_seqno(message.sequence_number);
msg.set_topicIDs(
message
.topics
.into_iter()
.map(TopicHash::into_string)
.collect(),
);
proto.mut_publish().push(msg);
}
for topic in item.subscriptions.into_iter() {
let mut subscription = rpc_proto::RPC_SubOpts::new();
subscription.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe);
subscription.set_topicid(topic.topic.into_string());
proto.mut_subscriptions().push(subscription);
}
let msg_size = proto.compute_size();
// Reserve enough space for the data and the length. The length has a maximum of 32 bits,
// which means that 5 bytes is enough for the variable-length integer.
dst.reserve(msg_size as usize + 5);
proto
.write_length_delimited_to_writer(&mut dst.by_ref().writer())
.expect(
"there is no situation in which the protobuf message can be invalid, and \
writing to a BytesMut never fails as we reserved enough space beforehand",
);
Ok(())
}
}
impl Decoder for FloodsubCodec {
type Item = FloodsubRpc;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let packet = match self.length_prefix.decode(src)? {
Some(p) => p,
None => return Ok(None),
};
let mut rpc: rpc_proto::RPC = protobuf::parse_from_bytes(&packet)?;
let mut messages = Vec::with_capacity(rpc.get_publish().len());
for mut publish in rpc.take_publish().into_iter() {
messages.push(FloodsubMessage {
source: PeerId::from_bytes(publish.take_from()).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "Invalid peer ID in message")
})?,
data: publish.take_data(),
sequence_number: publish.take_seqno(),
topics: publish
.take_topicIDs()
.into_iter()
.map(|topic| TopicHash::from_raw(topic))
.collect(),
});
}
Ok(Some(FloodsubRpc {
messages,
subscriptions: rpc
.take_subscriptions()
.into_iter()
.map(|mut sub| FloodsubSubscription {
action: if sub.get_subscribe() {
FloodsubSubscriptionAction::Subscribe
} else {
FloodsubSubscriptionAction::Unsubscribe
},
topic: TopicHash::from_raw(sub.take_topicid()),
})
.collect(),
}))
}
}
/// An RPC received by the floodsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FloodsubRpc {
/// List of messages that were part of this RPC query.
pub messages: Vec<FloodsubMessage>,
/// List of subscriptions.
pub subscriptions: Vec<FloodsubSubscription>,
}
/// A message received by the floodsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FloodsubMessage {
/// Id of the peer that published this message.
pub source: PeerId,
/// Content of the message. Its meaning is out of scope of this library.
pub data: Vec<u8>,
/// An incrementing sequence number.
pub sequence_number: Vec<u8>,
/// List of topics this message belongs to.
///
/// Each message can belong to multiple topics at once.
pub topics: Vec<TopicHash>,
}
/// A subscription received by the floodsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FloodsubSubscription {
/// Action to perform.
pub action: FloodsubSubscriptionAction,
/// The topic from which to subscribe or unsubscribe.
pub topic: TopicHash,
}
/// Action that a subscription wants to perform.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum FloodsubSubscriptionAction {
/// The remote wants to subscribe to the given topic.
Subscribe,
/// The remote wants to unsubscribe from the given topic.
Unsubscribe,
}

View File

@ -59,6 +59,27 @@ impl Topic {
} }
} }
impl AsRef<TopicHash> for Topic {
#[inline]
fn as_ref(&self) -> &TopicHash {
&self.hash
}
}
impl From<Topic> for TopicHash {
#[inline]
fn from(topic: Topic) -> TopicHash {
topic.hash
}
}
impl<'a> From<&'a Topic> for TopicHash {
#[inline]
fn from(topic: &'a Topic) -> TopicHash {
topic.hash.clone()
}
}
/// Builder for a `TopicHash`. /// Builder for a `TopicHash`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TopicBuilder { pub struct TopicBuilder {
@ -78,15 +99,17 @@ impl TopicBuilder {
/// Turns the builder into an actual `Topic`. /// Turns the builder into an actual `Topic`.
pub fn build(self) -> Topic { pub fn build(self) -> Topic {
let bytes = self.builder let bytes = self
.builder
.write_to_bytes() .write_to_bytes()
.expect("protobuf message is always valid"); .expect("protobuf message is always valid");
// TODO: https://github.com/libp2p/rust-libp2p/issues/473
let hash = TopicHash { let hash = TopicHash {
hash: bs58::encode(&bytes).into_string(), hash: bs58::encode(&bytes).into_string(),
}; };
Topic { Topic {
descriptor: self.builder, descriptor: self.builder,
hash: hash, hash,
} }
} }
} }