Merge branch 'master' into weighted_bucket

# Conflicts:
#	protocols/kad/src/behaviour.rs
#	protocols/kad/src/behaviour/test.rs
This commit is contained in:
folex
2020-03-25 17:52:38 +03:00
19 changed files with 253 additions and 309 deletions

View File

@ -1,114 +0,0 @@
version: 2
workflows:
version: 2
build:
jobs:
- test
- test-wasm
- check-rustdoc-links
- integration-test
jobs:
test:
machine:
enabled: true
steps:
- checkout
- run:
name: Enable ipv6
command: |
cat <<'EOF' | sudo tee /etc/docker/daemon.json
{
"ipv6": true,
"fixed-cidr-v6": "2001:db8:1::/64"
}
EOF
sudo service docker restart
- restore_cache:
key: test-cache
- run:
name: Prepare docker container for building
command: docker build --pull --no-cache -t rust-libp2p -f .circleci/images/rust-libp2p/Dockerfile .
- run:
name: Run tests, inside a docker image, with no feature
command: docker run --rm -v "/cache/cargo/registry:/usr/local/cargo/registry" -v "/cache/target:/app/target" -it rust-libp2p cargo test --all --no-default-features
- run:
name: Run tests, inside a docker image, with all features
command: docker run --rm -v "/cache/cargo/registry:/usr/local/cargo/registry" -v "/cache/target:/app/target" -it rust-libp2p cargo test --all --all-features
- save_cache:
key: test-cache
paths:
- "/cache"
check-rustdoc-links:
docker:
- image: rust:latest
steps:
- checkout
- restore_cache:
key: test-rustdoc-cache-{{ epoch }}
- run:
name: Install nightly Rust
# TODO: intra-doc links are available on nightly only
# see https://doc.rust-lang.org/nightly/rustdoc/lints.html#intra_doc_link_resolution_failure
command: rustup default nightly
- run:
name: Print Rust version
command: |
rustc --version
- run:
name: Check Rustdoc links
command: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo +nightly doc --verbose --workspace --no-deps --document-private-items
- save_cache:
key: test-rustdoc-cache-{{ epoch }}
paths:
- ./target
- /usr/local/cargo
test-wasm:
docker:
- image: parity/rust-builder:latest
steps:
- checkout
- restore_cache:
keys:
- test-wasm-cache-{{ epoch }}
- run:
name: Print Rust version
command: |
rustc --version
- run:
name: Build for wasm32
# TODO: also run tests but with --no-run; important to detect linking errors
command: |
sccache -s
cargo web build
sccache -s
- save_cache:
key: test-wasm-cache-{{ epoch }}
paths:
- ./target
- /usr/local/cargo
- /root/.cache/sccache
integration-test:
docker:
- image: rust
- image: ipfs/go-ipfs
steps:
- checkout
- restore_cache:
key: integration-test-cache-{{ epoch }}
- run:
name: Print Rust version
command: |
rustc --version
- run:
command: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad
- save_cache:
key: integration-test-cache-{{ epoch }}
paths:
- "~/.cargo"
- "./target"

View File

@ -1,5 +0,0 @@
FROM rust
RUN mkdir /app
WORKDIR /app
COPY . /app

View File

@ -3,7 +3,7 @@ name: Continuous integration
on:
pull_request:
push:
branches:
branches:
- master
jobs:
@ -88,3 +88,28 @@ jobs:
run: rustup default nightly
- name: Check rustdoc links
run: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo +nightly doc --verbose --workspace --no-deps --document-private-items
integration-test:
name: Integration tests
runs-on: ubuntu-latest
container:
image: rust
steps:
- uses: actions/checkout@v1
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: cargo-registry-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: cargo-index-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: cargo-build-target-${{ hashFiles('Cargo.toml') }}
- name: Run ipfs-kad example
run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad

View File

@ -42,7 +42,6 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal
async-std = "1.0"
libp2p-mplex = { version = "0.16.0", path = "../muxers/mplex" }
libp2p-secio = { version = "0.16.0", path = "../protocols/secio" }
libp2p-swarm = { version = "0.16.0", path = "../swarm" }
libp2p-tcp = { version = "0.16.0", path = "../transports/tcp" }
quickcheck = "0.9.0"
wasm-timer = "0.2"

View File

@ -148,6 +148,8 @@ where
Closed {
/// The ID of the listener that closed.
listener_id: ListenerId,
/// The addresses that the listener was listening on.
addresses: Vec<Multiaddr>,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), TTrans::Error>,
@ -283,12 +285,14 @@ where
Poll::Ready(None) => {
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses: listener_project.addresses.drain(..).collect(),
reason: Ok(()),
})
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses: listener_project.addresses.drain(..).collect(),
reason: Err(err),
})
}
@ -351,9 +355,10 @@ where
.field("listener_id", listener_id)
.field("local_addr", local_addr)
.finish(),
ListenersEvent::Closed { listener_id, reason } => f
ListenersEvent::Closed { listener_id, addresses, reason } => f
.debug_struct("ListenersEvent::Closed")
.field("listener_id", listener_id)
.field("addresses", addresses)
.field("reason", reason)
.finish(),
ListenersEvent::Error { listener_id, error } => f

View File

@ -871,7 +871,7 @@ impl PoolLimits {
{
if let Some(limit) = limit {
let current = current();
if limit >= current {
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}

View File

@ -356,8 +356,8 @@ where
Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
}
Poll::Ready(ListenersEvent::Closed { listener_id, reason }) => {
return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, reason })
Poll::Ready(ListenersEvent::Closed { listener_id, addresses, reason }) => {
return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason })
}
Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })

View File

@ -54,6 +54,8 @@ where
ListenerClosed {
/// The listener ID that closed.
listener_id: ListenerId,
/// The addresses that the listener was listening on.
addresses: Vec<Multiaddr>,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), TTrans::Error>,
@ -182,9 +184,10 @@ where
.field("listen_addr", listen_addr)
.finish()
}
NetworkEvent::ListenerClosed { listener_id, reason } => {
NetworkEvent::ListenerClosed { listener_id, addresses, reason } => {
f.debug_struct("ListenerClosed")
.field("listener_id", listener_id)
.field("addresses", addresses)
.field("reason", reason)
.finish()
}
@ -342,4 +345,3 @@ where
self.info().to_connected_point()
}
}

View File

@ -27,7 +27,7 @@ use std::{convert::TryFrom, borrow::Borrow, fmt, hash, str::FromStr};
/// Public keys with byte-lengths smaller than `MAX_INLINE_KEY_LENGTH` will be
/// automatically used as the peer id using an identity multihash.
const _MAX_INLINE_KEY_LENGTH: usize = 42;
const MAX_INLINE_KEY_LENGTH: usize = 42;
/// Identifier of a peer of the network.
///
@ -70,11 +70,11 @@ impl PeerId {
// will switch to not hashing the key (i.e. the correct behaviour).
// In other words, rust-libp2p 0.16 is compatible with all versions of rust-libp2p.
// Rust-libp2p 0.12 and below is **NOT** compatible with rust-libp2p 0.17 and above.
let (hash_algorithm, canonical_algorithm): (_, Option<Code>) = /*if key_enc.len() <= MAX_INLINE_KEY_LENGTH {
(multihash::Hash::Identity, Some(multihash::Hash::SHA2256))
} else {*/
(Code::Sha2_256, None);
//};
let (hash_algorithm, canonical_algorithm) = if key_enc.len() <= MAX_INLINE_KEY_LENGTH {
(Code::Identity, Some(Code::Sha2_256))
} else {
(Code::Sha2_256, None)
};
let canonical = canonical_algorithm.map(|alg|
alg.hasher().expect("SHA2-256 hasher is always supported").digest(&key_enc));

View File

@ -32,65 +32,17 @@ use libp2p_core::{
network::NetworkEvent,
upgrade,
};
use libp2p_swarm::{
NegotiatedSubstream,
ProtocolsHandler,
KeepAlive,
SubstreamProtocol,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr,
protocols_handler::NodeHandlerWrapperBuilder
};
use rand::seq::SliceRandom;
use std::{io, task::Context, task::Poll};
use std::{io, task::Poll};
use util::TestHandler;
// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ?
#[derive(Default)]
struct TestHandler;
impl ProtocolsHandler for TestHandler {
type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type Error = io::Error;
type InboundProtocol = upgrade::DeniedUpgrade;
type OutboundProtocol = upgrade::DeniedUpgrade;
type OutboundOpenInfo = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(upgrade::DeniedUpgrade)
}
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output
) { panic!() }
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) { panic!() }
fn inject_event(&mut self, _: Self::InEvent) {
panic!()
}
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Error>) {
}
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No }
fn poll(&mut self, _: &mut Context) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
Poll::Pending
}
}
type TestNetwork<TTrans> = Network<TTrans, (), (), TestHandler>;
#[test]
fn deny_incoming_connec() {
// Checks whether refusing an incoming connection on a swarm triggers the correct events.
let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler>, _, _> = {
let mut swarm1 = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
@ -98,7 +50,7 @@ fn deny_incoming_connec() {
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), Default::default())
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let mut swarm2 = {
@ -109,7 +61,7 @@ fn deny_incoming_connec() {
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), Default::default())
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
@ -125,7 +77,7 @@ fn deny_incoming_connec() {
swarm2
.peer(swarm1.local_peer_id().clone())
.into_disconnected().unwrap()
.connect(address.clone(), Vec::new(), TestHandler::default().into_node_handler_builder())
.connect(address.clone(), Vec::new(), TestHandler())
.unwrap();
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
@ -180,7 +132,7 @@ fn dial_self() {
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
})
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), Default::default())
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
@ -195,7 +147,7 @@ fn dial_self() {
}))
.unwrap();
swarm.dial(&local_address, TestHandler::default().into_node_handler_builder()).unwrap();
swarm.dial(&local_address, TestHandler()).unwrap();
let mut got_dial_err = false;
let mut got_inc_err = false;
@ -226,7 +178,7 @@ fn dial_self() {
},
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => {
assert_eq!(*inc.local_addr(), local_address);
inc.accept(TestHandler::default().into_node_handler_builder()).unwrap();
inc.accept(TestHandler()).unwrap();
},
Poll::Ready(ev) => {
panic!("Unexpected event: {:?}", ev)
@ -242,7 +194,7 @@ fn dial_self_by_id() {
// Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
// place.
let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler>> = {
let mut swarm = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
@ -250,7 +202,7 @@ fn dial_self_by_id() {
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), Default::default())
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let peer_id = swarm.local_peer_id().clone();
@ -269,7 +221,7 @@ fn multiple_addresses_err() {
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), Default::default())
TestNetwork::new(transport, local_public_key.into(), Default::default())
};
let mut addresses = Vec::new();
@ -287,7 +239,7 @@ fn multiple_addresses_err() {
let target = PeerId::random();
swarm.peer(target.clone())
.into_disconnected().unwrap()
.connect(first, rest, TestHandler::default().into_node_handler_builder())
.connect(first, rest, TestHandler())
.unwrap();
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {

View File

@ -3,7 +3,38 @@
use futures::prelude::*;
use libp2p_core::muxing::StreamMuxer;
use std::{pin::Pin, task::Context, task::Poll};
use libp2p_core::{
connection::{
ConnectionHandler,
ConnectionHandlerEvent,
Substream,
SubstreamEndpoint,
},
muxing::StreamMuxerBox,
};
use std::{io, pin::Pin, task::Context, task::Poll};
pub struct TestHandler();
impl ConnectionHandler for TestHandler {
type InEvent = ();
type OutEvent = ();
type Error = io::Error;
type Substream = Substream<StreamMuxerBox>;
type OutboundOpenInfo = ();
fn inject_substream(&mut self, _: Self::Substream, _: SubstreamEndpoint<Self::OutboundOpenInfo>)
{}
fn inject_event(&mut self, _: Self::InEvent)
{}
fn poll(&mut self, _: &mut Context)
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>
{
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(())))
}
}
pub struct CloseMuxer<M> {
state: CloseMuxerState<M>,

View File

@ -14,4 +14,4 @@ futures = "0.3.1"
libp2p-core = { version = "0.16.0", path = "../../core" }
parking_lot = "0.10"
thiserror = "1.0"
yamux = "0.4.4"
yamux = "0.4.5"

View File

@ -24,10 +24,10 @@ mod test;
use crate::K_VALUE;
use crate::addresses::{Addresses, Remove};
use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use crate::contact::Contact;
@ -50,16 +50,13 @@ use wasm_timer::Instant;
use libp2p_core::identity::ed25519::{Keypair, PublicKey};
use trust_graph::TrustGraph;
// TODO: how Kademlia knows hers PeerId? It's stored in KBucketsTable
// TODO: add there hers PublicKey, and exchange it on the network
/// Network behaviour that handles Kademlia.
pub struct Kademlia<TStore> {
/// The Kademlia routing table.
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Contact>,
/// An optional protocol name override to segregate DHTs in the network.
protocol_name_override: Option<Cow<'static, [u8]>>,
/// Configuration of the wire protocol.
protocol_config: KademliaProtocolConfig,
/// The currently active (i.e. in-progress) queries.
queries: QueryPool<QueryInner>,
@ -83,6 +80,9 @@ pub struct Kademlia<TStore> {
/// The TTL of provider records.
provider_record_ttl: Option<Duration>,
/// How long to keep connections alive when they're idle.
connection_idle_timeout: Duration,
/// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
@ -99,12 +99,13 @@ pub struct Kademlia<TStore> {
pub struct KademliaConfig {
kbucket_pending_timeout: Duration,
query_config: QueryConfig,
protocol_name_override: Option<Cow<'static, [u8]>>,
protocol_config: KademliaProtocolConfig,
record_ttl: Option<Duration>,
record_replication_interval: Option<Duration>,
record_publication_interval: Option<Duration>,
provider_record_ttl: Option<Duration>,
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
}
impl Default for KademliaConfig {
@ -112,12 +113,13 @@ impl Default for KademliaConfig {
KademliaConfig {
kbucket_pending_timeout: Duration::from_secs(60),
query_config: QueryConfig::default(),
protocol_name_override: None,
protocol_config: Default::default(),
record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
record_replication_interval: Some(Duration::from_secs(60 * 60)),
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
}
}
}
@ -128,7 +130,7 @@ impl KademliaConfig {
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
/// custom name therefore allows to segregate the DHT from others, if that is desired.
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_name_override = Some(name.into());
self.protocol_config.set_protocol_name(name);
self
}
@ -225,6 +227,20 @@ impl KademliaConfig {
self.provider_publication_interval = interval;
self
}
/// Sets the amount of time to keep connections alive when they're idle.
pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
self.connection_idle_timeout = duration;
self
}
/// Modifies the maximum allowed size of individual Kademlia packets.
///
/// It might be necessary to increase this value if trying to put large records.
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
self.protocol_config.set_max_packet_size(size);
self
}
}
impl<TStore> Kademlia<TStore>
@ -238,9 +254,7 @@ where
/// Get the protocol name of this kademlia instance.
pub fn protocol_name(&self) -> &[u8] {
self.protocol_name_override
.as_ref()
.map_or(crate::protocol::DEFAULT_PROTO_NAME.as_ref(), AsRef::as_ref)
self.protocol_config.protocol_name()
}
/// Creates a new `Kademlia` network behaviour with the given configuration.
@ -264,7 +278,7 @@ where
Kademlia {
store,
kbuckets: KBucketsTable::new(kp, local_key, config.kbucket_pending_timeout),
protocol_name_override: config.protocol_name_override,
protocol_config: config.protocol_config,
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
connected_peers: Default::default(),
@ -272,7 +286,8 @@ where
put_record_job,
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
trust
connection_idle_timeout: config.connection_idle_timeout,
trust,
}
}
@ -977,9 +992,7 @@ where
let num_between = self.kbuckets.count_nodes_between(&target);
let k = self.queries.config().replication_factor.get();
let num_beyond_k = (usize::max(k, num_between) - k) as u32;
let expiration = self.record_ttl.map(|ttl|
now + Duration::from_secs(ttl.as_secs() >> num_beyond_k)
);
let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k));
// The smaller TTL prevails. Only if neither TTL is set is the record
// stored "forever".
record.expires = record.expires.or(expiration).min(expiration);
@ -1002,31 +1015,41 @@ where
// overridden as it avoids having to load the existing record in the
// first place.
// The record is cloned because of the weird libp2p protocol requirement
// to send back the value in the response, although this is a waste of
// resources.
match self.store.put(record.clone()) {
Ok(()) => {
debug!("Record stored: {:?}; {} bytes", record.key, record.value.len());
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::PutRecordRes {
key: record.key,
value: record.value,
request_id,
},
})
}
Err(e) => {
info!("Record not stored: {:?}", e);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::Reset(request_id)
})
if !record.is_expired(now) {
// The record is cloned because of the weird libp2p protocol
// requirement to send back the value in the response, although this
// is a waste of resources.
match self.store.put(record.clone()) {
Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()),
Err(e) => {
info!("Record not stored: {:?}", e);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::Reset(request_id)
});
return
}
}
}
// The remote receives a [`KademliaHandlerIn::PutRecordRes`] even in the
// case where the record is discarded due to being expired. Given that
// the remote sent the local node a [`KademliaHandlerEvent::PutRecord`]
// request, the remote perceives the local node as one node among the k
// closest nodes to the target. In addition returning
// [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal
// information to a possibly malicious remote node.
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::PutRecordRes {
key: record.key,
value: record.value,
request_id,
},
})
}
/// Processes a provider record received from a peer.
@ -1053,6 +1076,11 @@ where
}
}
/// Exponentially decrease the given duration (base 2).
fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
}
impl<TStore> NetworkBehaviour for Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>,
@ -1062,11 +1090,11 @@ where
type OutEvent = KademliaEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
let mut handler = KademliaHandler::dial_and_listen();
if let Some(name) = self.protocol_name_override.as_ref() {
handler = handler.with_protocol_name(name.clone());
}
handler
KademliaHandler::new(KademliaHandlerConfig {
protocol_config: self.protocol_config.clone(),
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
})
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@ -1356,7 +1384,7 @@ where
fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
<KademliaHandler<QueryId> as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
@ -1967,4 +1995,3 @@ impl QueryInfo {
}
}
}

View File

@ -18,7 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
/*
#![cfg(test)]
use super::*;
@ -683,4 +682,15 @@ fn exceed_jobs_max_queries() {
})
)
}
*/
#[test]
fn exp_decr_expiration_overflow() {
fn prop_no_panic(ttl: Duration, factor: u32) {
exp_decrease(ttl, factor);
}
// Right shifting a u64 by >63 results in a panic.
prop_no_panic(KademliaConfig::default().record_ttl.unwrap(), 64);
quickcheck(prop_no_panic as fn(_, _))
}

View File

@ -37,7 +37,7 @@ use libp2p_core::{
upgrade::{self, InboundUpgrade, OutboundUpgrade}
};
use log::trace;
use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;
/// Protocol handler that handles Kademlia communications with the remote.
@ -48,10 +48,7 @@ use wasm_timer::Instant;
/// It also handles requests made by the remote.
pub struct KademliaHandler<TUserData> {
/// Configuration for the Kademlia protocol.
config: KademliaProtocolConfig,
/// If false, we always refuse incoming Kademlia substreams.
allow_listening: bool,
config: KademliaHandlerConfig,
/// Next unique ID of a connection.
next_connec_unique_id: UniqueConnecId,
@ -63,6 +60,19 @@ pub struct KademliaHandler<TUserData> {
keep_alive: KeepAlive,
}
/// Configuration of a [`KademliaHandler`].
#[derive(Debug, Clone)]
pub struct KademliaHandlerConfig {
/// Configuration of the wire protocol.
pub protocol_config: KademliaProtocolConfig,
/// If false, we deny incoming requests.
pub allow_listening: bool,
/// Time after which we close an idle connection.
pub idle_timeout: Duration,
}
/// State of an active substream, opened either by us or by the remote.
enum SubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet.
@ -369,42 +379,22 @@ pub struct KademliaRequestId {
struct UniqueConnecId(u64);
impl<TUserData> KademliaHandler<TUserData> {
/// Create a `KademliaHandler` that only allows sending messages to the remote but denying
/// incoming connections.
pub fn dial_only() -> Self {
KademliaHandler::with_allow_listening(false)
}
/// Create a [`KademliaHandler`] using the given configuration.
pub fn new(config: KademliaHandlerConfig) -> Self {
let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);
/// Create a `KademliaHandler` that only allows sending messages but also receive incoming
/// requests.
///
/// The `Default` trait implementation wraps around this function.
pub fn dial_and_listen() -> Self {
KademliaHandler::with_allow_listening(true)
}
fn with_allow_listening(allow_listening: bool) -> Self {
KademliaHandler {
config: Default::default(),
allow_listening,
config,
next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(10)),
keep_alive,
}
}
/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
self.config = self.config.with_protocol_name(name);
self
}
}
impl<TUserData> Default for KademliaHandler<TUserData> {
#[inline]
fn default() -> Self {
KademliaHandler::dial_and_listen()
KademliaHandler::new(Default::default())
}
}
@ -422,8 +412,8 @@ where
#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
if self.allow_listening {
SubstreamProtocol::new(self.config.clone()).map_upgrade(upgrade::EitherUpgrade::A)
if self.config.allow_listening {
SubstreamProtocol::new(self.config.protocol_config.clone()).map_upgrade(upgrade::EitherUpgrade::A)
} else {
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade))
}
@ -449,7 +439,7 @@ where
EitherOutput::Second(p) => void::unreachable(p),
};
debug_assert!(self.allow_listening);
debug_assert!(self.config.allow_listening);
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.substreams
@ -635,7 +625,7 @@ where
let mut substream = self.substreams.swap_remove(n);
loop {
match advance_substream(substream, self.config.clone(), cx) {
match advance_substream(substream, self.config.protocol_config.clone(), cx) {
(Some(new_state), Some(event), _) => {
self.substreams.push(new_state);
return Poll::Ready(event);
@ -672,6 +662,16 @@ where
}
}
impl Default for KademliaHandlerConfig {
fn default() -> Self {
KademliaHandlerConfig {
protocol_config: Default::default(),
allow_listening: true,
idle_timeout: Duration::from_secs(10),
}
}
}
/// Advances one substream.
///
/// Returns the new state for that substream, an event to generate, and whether the substream

View File

@ -153,21 +153,33 @@ impl Into<proto::message::Peer> for KadPeer {
#[derive(Debug, Clone)]
pub struct KademliaProtocolConfig {
protocol_name: Cow<'static, [u8]>,
/// Maximum allowed size of a packet.
max_packet_size: usize,
}
impl KademliaProtocolConfig {
/// Returns the configured protocol name.
pub fn protocol_name(&self) -> &[u8] {
&self.protocol_name
}
/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
self.protocol_name = name.into();
self
}
/// Modifies the maximum allowed size of a single Kademlia packet.
pub fn set_max_packet_size(&mut self, size: usize) {
self.max_packet_size = size;
}
}
impl Default for KademliaProtocolConfig {
fn default() -> Self {
KademliaProtocolConfig {
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME)
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
max_packet_size: 4096,
}
}
}
@ -191,7 +203,7 @@ where
fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(4096);
codec.set_max_len(self.max_packet_size);
future::ok(
Framed::new(incoming, codec)
@ -223,7 +235,7 @@ where
fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(4096);
codec.set_max_len(self.max_packet_size);
future::ok(
Framed::new(incoming, codec)

View File

@ -20,7 +20,7 @@
mod memory;
pub use memory::MemoryStore;
pub use memory::{MemoryStore, MemoryStoreConfig};
use crate::K_VALUE;
use super::*;

View File

@ -127,7 +127,7 @@ impl MdnsService {
Self::new_inner(false)
}
/// Same as `new`, but we don't send automatically send queries on the network.
/// Same as `new`, but we don't automatically send queries on the network.
pub fn silent() -> io::Result<MdnsService> {
Self::new_inner(true)
}

View File

@ -445,8 +445,11 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
this.behaviour.inject_expired_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, reason }) => {
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(addr);
}
this.behaviour.inject_listener_closed(listener_id);
}
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) =>
@ -678,9 +681,9 @@ where
/// Notify all of the given connections of a peer of an event.
///
/// Returns `Some` with the given event and a new list of connections if
/// at least one of the given connections is not currently able to receive the event
/// but is not closing, in which case the current task is scheduled to be woken up.
/// The returned connections are those which are not closing.
/// at least one of the given connections is currently not able to receive
/// the event, in which case the current task is scheduled to be woken up and
/// the returned connections are those which still need to receive the event.
///
/// Returns `None` if all connections are either closing or the event
/// was successfully sent to all handlers whose connections are not closing,
@ -704,26 +707,23 @@ where
}
}
{
let mut pending = SmallVec::new();
for id in ids.iter() {
if let Some(mut conn) = peer.connection(*id) { // (*)
if conn.poll_ready_notify_handler(cx).is_pending() {
pending.push(*id)
}
let mut pending = SmallVec::new();
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Ok(())) => {
// Can now only fail due to the connection suddenly closing,
// which we ignore.
let _ = conn.notify_handler(event.clone());
},
Poll::Ready(Err(())) => {} // connection is closing
}
}
if !pending.is_empty() {
return Some((event, pending))
}
}
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
// All connections were ready. Can now only fail due
// to a connection suddenly closing, which we ignore.
let _ = conn.notify_handler(event.clone());
}
if !pending.is_empty() {
return Some((event, pending))
}
None