mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-27 16:51:34 +00:00
Allow changing the Kademlia protocol name (#1118)
* Allow changing the Kademlia protocol name * Expose the method to the behaviour * Address review
This commit is contained in:
@ -30,7 +30,7 @@ use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActio
|
|||||||
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
|
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
|
||||||
use multihash::Multihash;
|
use multihash::Multihash;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{error, marker::PhantomData, num::NonZeroUsize, time::Duration};
|
use std::{borrow::Cow, error, marker::PhantomData, num::NonZeroUsize, time::Duration};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use wasm_timer::{Instant, Interval};
|
use wasm_timer::{Instant, Interval};
|
||||||
|
|
||||||
@ -41,6 +41,9 @@ pub struct Kademlia<TSubstream> {
|
|||||||
/// Storage for the nodes. Contains the known multiaddresses for this node.
|
/// Storage for the nodes. Contains the known multiaddresses for this node.
|
||||||
kbuckets: KBucketsTable<KadHash, Addresses>,
|
kbuckets: KBucketsTable<KadHash, Addresses>,
|
||||||
|
|
||||||
|
/// If `Some`, we overwrite the Kademlia protocol name with this one.
|
||||||
|
protocol_name_override: Option<Cow<'static, [u8]>>,
|
||||||
|
|
||||||
/// All the iterative queries we are currently performing, with their ID. The last parameter
|
/// All the iterative queries we are currently performing, with their ID. The last parameter
|
||||||
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
|
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
|
||||||
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
|
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
|
||||||
@ -189,6 +192,16 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
Self::new_inner(local_peer_id)
|
Self::new_inner(local_peer_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The same as `new`, but using a custom protocol name.
|
||||||
|
///
|
||||||
|
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
|
||||||
|
/// custom name therefore allows to segregate the DHT from others, if that is desired.
|
||||||
|
pub fn with_protocol_name(local_peer_id: PeerId, name: impl Into<Cow<'static, [u8]>>) -> Self {
|
||||||
|
let mut me = Kademlia::new_inner(local_peer_id);
|
||||||
|
me.protocol_name_override = Some(name.into());
|
||||||
|
me
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a `Kademlia`.
|
/// Creates a `Kademlia`.
|
||||||
///
|
///
|
||||||
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
|
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
|
||||||
@ -251,6 +264,7 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
|
|
||||||
Kademlia {
|
Kademlia {
|
||||||
kbuckets: KBucketsTable::new(local_peer_id.into(), Duration::from_secs(60)), // TODO: constant
|
kbuckets: KBucketsTable::new(local_peer_id.into(), Duration::from_secs(60)), // TODO: constant
|
||||||
|
protocol_name_override: None,
|
||||||
queued_events: SmallVec::new(),
|
queued_events: SmallVec::new(),
|
||||||
active_queries: Default::default(),
|
active_queries: Default::default(),
|
||||||
connected_peers: Default::default(),
|
connected_peers: Default::default(),
|
||||||
@ -386,7 +400,11 @@ where
|
|||||||
type OutEvent = KademliaOut;
|
type OutEvent = KademliaOut;
|
||||||
|
|
||||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
KademliaHandler::dial_and_listen()
|
let mut handler = KademliaHandler::dial_and_listen();
|
||||||
|
if let Some(name) = self.protocol_name_override.as_ref() {
|
||||||
|
handler = handler.with_protocol_name(name.clone());
|
||||||
|
}
|
||||||
|
handler
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||||
|
@ -32,7 +32,7 @@ use libp2p_core::protocols_handler::{
|
|||||||
};
|
};
|
||||||
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId, upgrade::Negotiated};
|
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId, upgrade::Negotiated};
|
||||||
use multihash::Multihash;
|
use multihash::Multihash;
|
||||||
use std::{error, fmt, io, time::Duration};
|
use std::{borrow::Cow, error, fmt, io, time::Duration};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use wasm_timer::Instant;
|
use wasm_timer::Instant;
|
||||||
|
|
||||||
@ -303,7 +303,6 @@ where
|
|||||||
{
|
{
|
||||||
/// Create a `KademliaHandler` that only allows sending messages to the remote but denying
|
/// Create a `KademliaHandler` that only allows sending messages to the remote but denying
|
||||||
/// incoming connections.
|
/// incoming connections.
|
||||||
#[inline]
|
|
||||||
pub fn dial_only() -> Self {
|
pub fn dial_only() -> Self {
|
||||||
KademliaHandler::with_allow_listening(false)
|
KademliaHandler::with_allow_listening(false)
|
||||||
}
|
}
|
||||||
@ -312,7 +311,6 @@ where
|
|||||||
/// requests.
|
/// requests.
|
||||||
///
|
///
|
||||||
/// The `Default` trait implementation wraps around this function.
|
/// The `Default` trait implementation wraps around this function.
|
||||||
#[inline]
|
|
||||||
pub fn dial_and_listen() -> Self {
|
pub fn dial_and_listen() -> Self {
|
||||||
KademliaHandler::with_allow_listening(true)
|
KademliaHandler::with_allow_listening(true)
|
||||||
}
|
}
|
||||||
@ -326,6 +324,13 @@ where
|
|||||||
keep_alive: KeepAlive::Yes,
|
keep_alive: KeepAlive::Yes,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
|
||||||
|
/// between networks on purpose.
|
||||||
|
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
|
||||||
|
self.config = self.config.with_protocol_name(name);
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSubstream, TUserData> Default for KademliaHandler<TSubstream, TUserData>
|
impl<TSubstream, TUserData> Default for KademliaHandler<TSubstream, TUserData>
|
||||||
@ -355,7 +360,7 @@ where
|
|||||||
#[inline]
|
#[inline]
|
||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||||
if self.allow_listening {
|
if self.allow_listening {
|
||||||
SubstreamProtocol::new(self.config).map_upgrade(upgrade::EitherUpgrade::A)
|
SubstreamProtocol::new(self.config.clone()).map_upgrade(upgrade::EitherUpgrade::A)
|
||||||
} else {
|
} else {
|
||||||
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade))
|
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade))
|
||||||
}
|
}
|
||||||
@ -493,7 +498,7 @@ where
|
|||||||
let mut substream = self.substreams.swap_remove(n);
|
let mut substream = self.substreams.swap_remove(n);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match advance_substream(substream, self.config) {
|
match advance_substream(substream, self.config.clone()) {
|
||||||
(Some(new_state), Some(event), _) => {
|
(Some(new_state), Some(event), _) => {
|
||||||
self.substreams.push(new_state);
|
self.substreams.push(new_state);
|
||||||
return Ok(Async::Ready(event));
|
return Ok(Async::Ready(event));
|
||||||
|
@ -34,7 +34,7 @@ use libp2p_core::{Multiaddr, PeerId};
|
|||||||
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
|
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
|
||||||
use multihash::Multihash;
|
use multihash::Multihash;
|
||||||
use protobuf::{self, Message};
|
use protobuf::{self, Message};
|
||||||
use std::convert::TryFrom;
|
use std::{borrow::Cow, convert::TryFrom};
|
||||||
use std::{io, iter};
|
use std::{io, iter};
|
||||||
use tokio_codec::Framed;
|
use tokio_codec::Framed;
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
@ -138,16 +138,34 @@ impl Into<proto::Message_Peer> for KadPeer {
|
|||||||
// TODO: if, as suspected, we can confirm with Protocol Labs that each open Kademlia substream does
|
// TODO: if, as suspected, we can confirm with Protocol Labs that each open Kademlia substream does
|
||||||
// only one request, then we can change the output of the `InboundUpgrade` and
|
// only one request, then we can change the output of the `InboundUpgrade` and
|
||||||
// `OutboundUpgrade` to be just a single message
|
// `OutboundUpgrade` to be just a single message
|
||||||
#[derive(Debug, Default, Copy, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct KademliaProtocolConfig;
|
pub struct KademliaProtocolConfig {
|
||||||
|
protocol_name: Cow<'static, [u8]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KademliaProtocolConfig {
|
||||||
|
/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
|
||||||
|
/// between networks on purpose.
|
||||||
|
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
|
||||||
|
self.protocol_name = name.into();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for KademliaProtocolConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
KademliaProtocolConfig {
|
||||||
|
protocol_name: Cow::Borrowed(b"/ipfs/kad/1.0.0"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl UpgradeInfo for KademliaProtocolConfig {
|
impl UpgradeInfo for KademliaProtocolConfig {
|
||||||
type Info = &'static [u8];
|
type Info = Cow<'static, [u8]>;
|
||||||
type InfoIter = iter::Once<Self::Info>;
|
type InfoIter = iter::Once<Self::Info>;
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn protocol_info(&self) -> Self::InfoIter {
|
fn protocol_info(&self) -> Self::InfoIter {
|
||||||
iter::once(b"/ipfs/kad/1.0.0")
|
iter::once(self.protocol_name.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user