Fix the concerns of #603 (#635)

This commit is contained in:
Pierre Krieger
2018-11-14 14:07:54 +01:00
committed by GitHub
parent 513b97b015
commit 24ccefbbc6
3 changed files with 19 additions and 20 deletions

View File

@ -27,7 +27,7 @@ use std::{fmt, io};
use tokio_codec::Framed; use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Protocol handler that handles communications with the remote for the fileshare protocol. /// Protocol handler that handles communication with the remote for the floodsub protocol.
/// ///
/// The handler will automatically open a substream with the remote for each request we make. /// The handler will automatically open a substream with the remote for each request we make.
/// ///
@ -36,10 +36,10 @@ pub struct FloodsubHandler<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
{ {
/// Configuration for the Kademlia protocol. /// Configuration for the floodsub protocol.
config: FloodsubConfig, config: FloodsubConfig,
/// If true, we are trying to shut down the existing Kademlia substream and should refuse any /// If true, we are trying to shut down the existing floodsub substream and should refuse any
/// incoming connection. /// incoming connection.
shutting_down: bool, shutting_down: bool,
@ -60,7 +60,6 @@ where
WaitingInput(Framed<TSubstream, FloodsubCodec>), WaitingInput(Framed<TSubstream, FloodsubCodec>),
/// Waiting to send a message to the remote. /// Waiting to send a message to the remote.
PendingSend(Framed<TSubstream, FloodsubCodec>, FloodsubRpc), PendingSend(Framed<TSubstream, FloodsubCodec>, FloodsubRpc),
/// Waiting to send a message to the remote.
/// Waiting to flush the substream so that the data arrives to the remote. /// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(Framed<TSubstream, FloodsubCodec>), PendingFlush(Framed<TSubstream, FloodsubCodec>),
/// The substream is being closed. /// The substream is being closed.
@ -71,13 +70,13 @@ impl<TSubstream> SubstreamState<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
{ {
/// Consumes this state and produces the substream, if relevant. /// Consumes this state and produces the substream.
fn into_substream(self) -> Option<Framed<TSubstream, FloodsubCodec>> { fn into_substream(self) -> Framed<TSubstream, FloodsubCodec> {
match self { match self {
SubstreamState::WaitingInput(substream) => Some(substream), SubstreamState::WaitingInput(substream) => substream,
SubstreamState::PendingSend(substream, _) => Some(substream), SubstreamState::PendingSend(substream, _) => substream,
SubstreamState::PendingFlush(substream) => Some(substream), SubstreamState::PendingFlush(substream) => substream,
SubstreamState::Closing(substream) => Some(substream), SubstreamState::Closing(substream) => substream,
} }
} }
} }
@ -148,9 +147,7 @@ where
self.shutting_down = true; self.shutting_down = true;
for n in (0..self.substreams.len()).rev() { for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n); let mut substream = self.substreams.swap_remove(n);
if let Some(substream) = substream.into_substream() { self.substreams.push(SubstreamState::Closing(substream.into_substream()));
self.substreams.push(SubstreamState::Closing(substream));
}
} }
} }

View File

@ -33,7 +33,7 @@ use topic::{Topic, TopicHash};
/// Network behaviour that automatically identifies nodes periodically, and returns information /// Network behaviour that automatically identifies nodes periodically, and returns information
/// about them. /// about them.
pub struct FloodsubBehaviour<TSubstream> { pub struct FloodsubBehaviour<TSubstream> {
/// Events that need to be produced outside when polling. /// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviorAction<FloodsubRpc, FloodsubMessage>>, events: VecDeque<NetworkBehaviorAction<FloodsubRpc, FloodsubMessage>>,
/// Peer id of the local node. Used for the source of the messages that we publish. /// Peer id of the local node. Used for the source of the messages that we publish.
@ -44,8 +44,8 @@ pub struct FloodsubBehaviour<TSubstream> {
// opened substream // opened substream
connected_peers: HashMap<PeerId, SmallVec<[TopicHash; 8]>>, connected_peers: HashMap<PeerId, SmallVec<[TopicHash; 8]>>,
// List of topics we're subscribed to. Necessary in order to filter out messages that we // List of topics we're subscribed to. Necessary to filter out messages that we receive
// erroneously receive. // erroneously.
subscribed_topics: SmallVec<[Topic; 16]>, subscribed_topics: SmallVec<[Topic; 16]>,
// Sequence number for the messages we send. // Sequence number for the messages we send.
@ -102,6 +102,8 @@ impl<TSubstream> FloodsubBehaviour<TSubstream> {
/// Unsubscribes from a topic. /// Unsubscribes from a topic.
/// ///
/// Note that this only requires a `TopicHash` and not a full `Topic`.
///
/// Returns true if we were subscribed to this topic. /// Returns true if we were subscribed to this topic.
pub fn unsubscribe(&mut self, topic: impl AsRef<TopicHash>) -> bool { pub fn unsubscribe(&mut self, topic: impl AsRef<TopicHash>) -> bool {
let topic = topic.as_ref(); let topic = topic.as_ref();
@ -135,7 +137,7 @@ impl<TSubstream> FloodsubBehaviour<TSubstream> {
self.publish_many(iter::once(topic), data) self.publish_many(iter::once(topic), data)
} }
/// Publishes a message to the network that has multiple topics. /// Publishes a message with multiple topics to the network.
/// ///
/// > **Note**: Doesn't do anything if we're not subscribed to any of the topics. /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) { pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) {
@ -232,12 +234,12 @@ where
} }
// Propagate the message to everyone else who is subscribed to any of the topics. // Propagate the message to everyone else who is subscribed to any of the topics.
for (peer_id, sub_topics) in self.connected_peers.iter() { for (peer_id, subscr_topics) in self.connected_peers.iter() {
if peer_id == &propagation_source { if peer_id == &propagation_source {
continue; continue;
} }
if !sub_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) { if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
continue; continue;
} }

View File

@ -29,7 +29,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use topic::TopicHash; use topic::TopicHash;
use unsigned_varint::codec; use unsigned_varint::codec;
/// Implementation of the `ConnectionUpgrade` for the floodsub protocol. /// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FloodsubConfig {} pub struct FloodsubConfig {}