mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-28 17:21:34 +00:00
Allow customizing the Kademlia maximum packet size (#1502)
* Allow customizing the Kademlia maximum packet size * Address concern
This commit is contained in:
@ -52,8 +52,8 @@ pub struct Kademlia<TStore> {
|
|||||||
/// The Kademlia routing table.
|
/// The Kademlia routing table.
|
||||||
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
|
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
|
||||||
|
|
||||||
/// An optional protocol name override to segregate DHTs in the network.
|
/// Configuration of the wire protocol.
|
||||||
protocol_name_override: Option<Cow<'static, [u8]>>,
|
protocol_config: KademliaProtocolConfig,
|
||||||
|
|
||||||
/// The currently active (i.e. in-progress) queries.
|
/// The currently active (i.e. in-progress) queries.
|
||||||
queries: QueryPool<QueryInner>,
|
queries: QueryPool<QueryInner>,
|
||||||
@ -94,7 +94,7 @@ pub struct Kademlia<TStore> {
|
|||||||
pub struct KademliaConfig {
|
pub struct KademliaConfig {
|
||||||
kbucket_pending_timeout: Duration,
|
kbucket_pending_timeout: Duration,
|
||||||
query_config: QueryConfig,
|
query_config: QueryConfig,
|
||||||
protocol_name_override: Option<Cow<'static, [u8]>>,
|
protocol_config: KademliaProtocolConfig,
|
||||||
record_ttl: Option<Duration>,
|
record_ttl: Option<Duration>,
|
||||||
record_replication_interval: Option<Duration>,
|
record_replication_interval: Option<Duration>,
|
||||||
record_publication_interval: Option<Duration>,
|
record_publication_interval: Option<Duration>,
|
||||||
@ -108,7 +108,7 @@ impl Default for KademliaConfig {
|
|||||||
KademliaConfig {
|
KademliaConfig {
|
||||||
kbucket_pending_timeout: Duration::from_secs(60),
|
kbucket_pending_timeout: Duration::from_secs(60),
|
||||||
query_config: QueryConfig::default(),
|
query_config: QueryConfig::default(),
|
||||||
protocol_name_override: None,
|
protocol_config: Default::default(),
|
||||||
record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
|
record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
|
||||||
record_replication_interval: Some(Duration::from_secs(60 * 60)),
|
record_replication_interval: Some(Duration::from_secs(60 * 60)),
|
||||||
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
|
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
|
||||||
@ -125,7 +125,7 @@ impl KademliaConfig {
|
|||||||
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
|
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
|
||||||
/// custom name therefore allows to segregate the DHT from others, if that is desired.
|
/// custom name therefore allows to segregate the DHT from others, if that is desired.
|
||||||
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
|
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
|
||||||
self.protocol_name_override = Some(name.into());
|
self.protocol_config.set_protocol_name(name);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,6 +228,14 @@ impl KademliaConfig {
|
|||||||
self.connection_idle_timeout = duration;
|
self.connection_idle_timeout = duration;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Modifies the maximum allowed size of individual Kademlia packets.
|
||||||
|
///
|
||||||
|
/// It might be necessary to increase this value if trying to put large records.
|
||||||
|
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
|
||||||
|
self.protocol_config.set_max_packet_size(size);
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TStore> Kademlia<TStore>
|
impl<TStore> Kademlia<TStore>
|
||||||
@ -241,9 +249,7 @@ where
|
|||||||
|
|
||||||
/// Get the protocol name of this kademlia instance.
|
/// Get the protocol name of this kademlia instance.
|
||||||
pub fn protocol_name(&self) -> &[u8] {
|
pub fn protocol_name(&self) -> &[u8] {
|
||||||
self.protocol_name_override
|
self.protocol_config.protocol_name()
|
||||||
.as_ref()
|
|
||||||
.map_or(crate::protocol::DEFAULT_PROTO_NAME.as_ref(), AsRef::as_ref)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new `Kademlia` network behaviour with the given configuration.
|
/// Creates a new `Kademlia` network behaviour with the given configuration.
|
||||||
@ -267,7 +273,7 @@ where
|
|||||||
Kademlia {
|
Kademlia {
|
||||||
store,
|
store,
|
||||||
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
|
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
|
||||||
protocol_name_override: config.protocol_name_override,
|
protocol_config: config.protocol_config,
|
||||||
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
|
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
|
||||||
queries: QueryPool::new(config.query_config),
|
queries: QueryPool::new(config.query_config),
|
||||||
connected_peers: Default::default(),
|
connected_peers: Default::default(),
|
||||||
@ -1064,13 +1070,8 @@ where
|
|||||||
type OutEvent = KademliaEvent;
|
type OutEvent = KademliaEvent;
|
||||||
|
|
||||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
let mut protocol_config = KademliaProtocolConfig::default();
|
|
||||||
if let Some(name) = self.protocol_name_override.as_ref() {
|
|
||||||
protocol_config = protocol_config.with_protocol_name(name.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
KademliaHandler::new(KademliaHandlerConfig {
|
KademliaHandler::new(KademliaHandlerConfig {
|
||||||
protocol_config,
|
protocol_config: self.protocol_config.clone(),
|
||||||
allow_listening: true,
|
allow_listening: true,
|
||||||
idle_timeout: self.connection_idle_timeout,
|
idle_timeout: self.connection_idle_timeout,
|
||||||
})
|
})
|
||||||
|
@ -141,21 +141,33 @@ impl Into<proto::message::Peer> for KadPeer {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct KademliaProtocolConfig {
|
pub struct KademliaProtocolConfig {
|
||||||
protocol_name: Cow<'static, [u8]>,
|
protocol_name: Cow<'static, [u8]>,
|
||||||
|
/// Maximum allowed size of a packet.
|
||||||
|
max_packet_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl KademliaProtocolConfig {
|
impl KademliaProtocolConfig {
|
||||||
|
/// Returns the configured protocol name.
|
||||||
|
pub fn protocol_name(&self) -> &[u8] {
|
||||||
|
&self.protocol_name
|
||||||
|
}
|
||||||
|
|
||||||
/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
|
/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
|
||||||
/// between networks on purpose.
|
/// between networks on purpose.
|
||||||
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
|
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
|
||||||
self.protocol_name = name.into();
|
self.protocol_name = name.into();
|
||||||
self
|
}
|
||||||
|
|
||||||
|
/// Modifies the maximum allowed size of a single Kademlia packet.
|
||||||
|
pub fn set_max_packet_size(&mut self, size: usize) {
|
||||||
|
self.max_packet_size = size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for KademliaProtocolConfig {
|
impl Default for KademliaProtocolConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
KademliaProtocolConfig {
|
KademliaProtocolConfig {
|
||||||
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME)
|
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
|
||||||
|
max_packet_size: 4096,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -179,7 +191,7 @@ where
|
|||||||
|
|
||||||
fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
|
fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
|
||||||
let mut codec = UviBytes::default();
|
let mut codec = UviBytes::default();
|
||||||
codec.set_max_len(4096);
|
codec.set_max_len(self.max_packet_size);
|
||||||
|
|
||||||
future::ok(
|
future::ok(
|
||||||
Framed::new(incoming, codec)
|
Framed::new(incoming, codec)
|
||||||
@ -211,7 +223,7 @@ where
|
|||||||
|
|
||||||
fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
|
fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
|
||||||
let mut codec = UviBytes::default();
|
let mut codec = UviBytes::default();
|
||||||
codec.set_max_len(4096);
|
codec.set_max_len(self.max_packet_size);
|
||||||
|
|
||||||
future::ok(
|
future::ok(
|
||||||
Framed::new(incoming, codec)
|
Framed::new(incoming, codec)
|
||||||
|
Reference in New Issue
Block a user