mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-24 07:11:38 +00:00
feat: allow sent messages seen as subscribed (#1520)
* feat: allow sent messages seen as subscribed minor feature to allow mimicing the behaviour expected by ipfs api tests. * refactor: rename per review comments * refactor: rename Floodsub::options to config * chore: update changelog * Update CHANGELOG.md Co-Authored-By: Max Inden <mail@max-inden.de> Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
@ -1,5 +1,7 @@
|
|||||||
# Version ???
|
# Version ???
|
||||||
|
|
||||||
|
- `libp2p-floodsub`: Allow sent messages seen as subscribed.
|
||||||
|
[PR 1520](https://github.com/libp2p/rust-libp2p/pull/1520)
|
||||||
|
|
||||||
# Version 0.17.0 (2020-04-02)
|
# Version 0.17.0 (2020-04-02)
|
||||||
|
|
||||||
|
@ -18,8 +18,9 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
|
use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
|
||||||
use crate::topic::Topic;
|
use crate::topic::Topic;
|
||||||
|
use crate::FloodsubConfig;
|
||||||
use cuckoofilter::CuckooFilter;
|
use cuckoofilter::CuckooFilter;
|
||||||
use fnv::FnvHashSet;
|
use fnv::FnvHashSet;
|
||||||
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
|
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
|
||||||
@ -43,8 +44,7 @@ pub struct Floodsub {
|
|||||||
/// Events that need to be yielded to the outside when polling.
|
/// Events that need to be yielded to the outside when polling.
|
||||||
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
|
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
|
||||||
|
|
||||||
/// Peer id of the local node. Used for the source of the messages that we publish.
|
config: FloodsubConfig,
|
||||||
local_peer_id: PeerId,
|
|
||||||
|
|
||||||
/// List of peers to send messages to.
|
/// List of peers to send messages to.
|
||||||
target_peers: FnvHashSet<PeerId>,
|
target_peers: FnvHashSet<PeerId>,
|
||||||
@ -64,11 +64,16 @@ pub struct Floodsub {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Floodsub {
|
impl Floodsub {
|
||||||
/// Creates a `Floodsub`.
|
/// Creates a `Floodsub` with default configuration.
|
||||||
pub fn new(local_peer_id: PeerId) -> Self {
|
pub fn new(local_peer_id: PeerId) -> Self {
|
||||||
|
Self::from_config(FloodsubConfig::new(local_peer_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a `Floodsub` with the given configuration.
|
||||||
|
pub fn from_config(config: FloodsubConfig) -> Self {
|
||||||
Floodsub {
|
Floodsub {
|
||||||
events: VecDeque::new(),
|
events: VecDeque::new(),
|
||||||
local_peer_id,
|
config,
|
||||||
target_peers: FnvHashSet::default(),
|
target_peers: FnvHashSet::default(),
|
||||||
connected_peers: HashMap::new(),
|
connected_peers: HashMap::new(),
|
||||||
subscribed_topics: SmallVec::new(),
|
subscribed_topics: SmallVec::new(),
|
||||||
@ -190,7 +195,7 @@ impl Floodsub {
|
|||||||
|
|
||||||
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
|
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
|
||||||
let message = FloodsubMessage {
|
let message = FloodsubMessage {
|
||||||
source: self.local_peer_id.clone(),
|
source: self.config.local_peer_id.clone(),
|
||||||
data: data.into(),
|
data: data.into(),
|
||||||
// If the sequence numbers are predictable, then an attacker could flood the network
|
// If the sequence numbers are predictable, then an attacker could flood the network
|
||||||
// with packets with the predetermined sequence numbers and absorb our legitimate
|
// with packets with the predetermined sequence numbers and absorb our legitimate
|
||||||
@ -202,6 +207,10 @@ impl Floodsub {
|
|||||||
let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
|
let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
|
||||||
if self_subscribed {
|
if self_subscribed {
|
||||||
self.received.add(&message);
|
self.received.add(&message);
|
||||||
|
if self.config.subscribe_local_messages {
|
||||||
|
self.events.push_back(
|
||||||
|
NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Don't publish the message if we have to check subscriptions
|
// Don't publish the message if we have to check subscriptions
|
||||||
// and we're not subscribed ourselves to any of the topics.
|
// and we're not subscribed ourselves to any of the topics.
|
||||||
@ -228,7 +237,7 @@ impl Floodsub {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkBehaviour for Floodsub {
|
impl NetworkBehaviour for Floodsub {
|
||||||
type ProtocolsHandler = OneShotHandler<FloodsubConfig, FloodsubRpc, InnerMessage>;
|
type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
|
||||||
type OutEvent = FloodsubEvent;
|
type OutEvent = FloodsubEvent;
|
||||||
|
|
||||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
|
@ -21,6 +21,8 @@
|
|||||||
//! Implements the floodsub protocol, see also the:
|
//! Implements the floodsub protocol, see also the:
|
||||||
//! [spec](https://github.com/libp2p/specs/tree/master/pubsub).
|
//! [spec](https://github.com/libp2p/specs/tree/master/pubsub).
|
||||||
|
|
||||||
|
use libp2p_core::PeerId;
|
||||||
|
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
|
|
||||||
mod layer;
|
mod layer;
|
||||||
@ -33,3 +35,22 @@ mod rpc_proto {
|
|||||||
pub use self::layer::{Floodsub, FloodsubEvent};
|
pub use self::layer::{Floodsub, FloodsubEvent};
|
||||||
pub use self::protocol::{FloodsubMessage, FloodsubRpc};
|
pub use self::protocol::{FloodsubMessage, FloodsubRpc};
|
||||||
pub use self::topic::Topic;
|
pub use self::topic::Topic;
|
||||||
|
|
||||||
|
/// Configuration options for the Floodsub protocol.
|
||||||
|
pub struct FloodsubConfig {
|
||||||
|
/// Peer id of the local node. Used for the source of the messages that we publish.
|
||||||
|
pub local_peer_id: PeerId,
|
||||||
|
|
||||||
|
/// `true` if messages published by local node should be propagated as messages received from
|
||||||
|
/// the network, `false` by default.
|
||||||
|
pub subscribe_local_messages: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FloodsubConfig {
|
||||||
|
pub fn new(local_peer_id: PeerId) -> Self {
|
||||||
|
Self {
|
||||||
|
local_peer_id,
|
||||||
|
subscribe_local_messages: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -27,16 +27,16 @@ use futures::{Future, io::{AsyncRead, AsyncWrite}};
|
|||||||
|
|
||||||
/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
|
/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct FloodsubConfig {}
|
pub struct FloodsubProtocol {}
|
||||||
|
|
||||||
impl FloodsubConfig {
|
impl FloodsubProtocol {
|
||||||
/// Builds a new `FloodsubConfig`.
|
/// Builds a new `FloodsubProtocol`.
|
||||||
pub fn new() -> FloodsubConfig {
|
pub fn new() -> FloodsubProtocol {
|
||||||
FloodsubConfig {}
|
FloodsubProtocol {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UpgradeInfo for FloodsubConfig {
|
impl UpgradeInfo for FloodsubProtocol {
|
||||||
type Info = &'static [u8];
|
type Info = &'static [u8];
|
||||||
type InfoIter = iter::Once<Self::Info>;
|
type InfoIter = iter::Once<Self::Info>;
|
||||||
|
|
||||||
@ -45,7 +45,7 @@ impl UpgradeInfo for FloodsubConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSocket> InboundUpgrade<TSocket> for FloodsubConfig
|
impl<TSocket> InboundUpgrade<TSocket> for FloodsubProtocol
|
||||||
where
|
where
|
||||||
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
|
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user