diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index f2c31c40..00000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,114 +0,0 @@ -version: 2 - -workflows: - version: 2 - build: - jobs: - - test - - test-wasm - - check-rustdoc-links - - integration-test - -jobs: - test: - machine: - enabled: true - steps: - - checkout - - run: - name: Enable ipv6 - command: | - cat <<'EOF' | sudo tee /etc/docker/daemon.json - { - "ipv6": true, - "fixed-cidr-v6": "2001:db8:1::/64" - } - EOF - sudo service docker restart - - restore_cache: - key: test-cache - - run: - name: Prepare docker container for building - command: docker build --pull --no-cache -t rust-libp2p -f .circleci/images/rust-libp2p/Dockerfile . - - run: - name: Run tests, inside a docker image, with no feature - command: docker run --rm -v "/cache/cargo/registry:/usr/local/cargo/registry" -v "/cache/target:/app/target" -it rust-libp2p cargo test --all --no-default-features - - run: - name: Run tests, inside a docker image, with all features - command: docker run --rm -v "/cache/cargo/registry:/usr/local/cargo/registry" -v "/cache/target:/app/target" -it rust-libp2p cargo test --all --all-features - - save_cache: - key: test-cache - paths: - - "/cache" - - check-rustdoc-links: - docker: - - image: rust:latest - steps: - - checkout - - restore_cache: - key: test-rustdoc-cache-{{ epoch }} - - run: - name: Install nightly Rust - # TODO: intra-doc links are available on nightly only - # see https://doc.rust-lang.org/nightly/rustdoc/lints.html#intra_doc_link_resolution_failure - command: rustup default nightly - - run: - name: Print Rust version - command: | - rustc --version - - run: - name: Check Rustdoc links - command: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo +nightly doc --verbose --workspace --no-deps --document-private-items - - save_cache: - key: test-rustdoc-cache-{{ epoch }} - paths: - - ./target - - /usr/local/cargo - - test-wasm: - docker: - - image: parity/rust-builder:latest - steps: - - checkout - - restore_cache: - keys: - - test-wasm-cache-{{ epoch }} - - run: - name: Print Rust version - command: | - rustc --version - - run: - name: Build for wasm32 - # TODO: also run tests but with --no-run; important to detect linking errors - command: | - sccache -s - cargo web build - sccache -s - - save_cache: - key: test-wasm-cache-{{ epoch }} - paths: - - ./target - - /usr/local/cargo - - /root/.cache/sccache - - integration-test: - docker: - - image: rust - - image: ipfs/go-ipfs - steps: - - checkout - - restore_cache: - key: integration-test-cache-{{ epoch }} - - run: - name: Print Rust version - command: | - rustc --version - - run: - command: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad - - save_cache: - key: integration-test-cache-{{ epoch }} - paths: - - "~/.cargo" - - "./target" - diff --git a/.circleci/images/rust-libp2p/Dockerfile b/.circleci/images/rust-libp2p/Dockerfile deleted file mode 100644 index 7fa62ecb..00000000 --- a/.circleci/images/rust-libp2p/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM rust - -RUN mkdir /app -WORKDIR /app -COPY . /app \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f8f3f1c..e708f594 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: Continuous integration on: pull_request: push: - branches: + branches: - master jobs: @@ -88,3 +88,28 @@ jobs: run: rustup default nightly - name: Check rustdoc links run: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo +nightly doc --verbose --workspace --no-deps --document-private-items + + integration-test: + name: Integration tests + runs-on: ubuntu-latest + container: + image: rust + steps: + - uses: actions/checkout@v1 + - name: Cache cargo registry + uses: actions/cache@v1 + with: + path: ~/.cargo/registry + key: cargo-registry-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo index + uses: actions/cache@v1 + with: + path: ~/.cargo/git + key: cargo-index-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo build + uses: actions/cache@v1 + with: + path: target + key: cargo-build-target-${{ hashFiles('Cargo.toml') }} + - name: Run ipfs-kad example + run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad diff --git a/core/Cargo.toml b/core/Cargo.toml index f5085534..254828c9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -42,7 +42,6 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal async-std = "1.0" libp2p-mplex = { version = "0.16.0", path = "../muxers/mplex" } libp2p-secio = { version = "0.16.0", path = "../protocols/secio" } -libp2p-swarm = { version = "0.16.0", path = "../swarm" } libp2p-tcp = { version = "0.16.0", path = "../transports/tcp" } quickcheck = "0.9.0" wasm-timer = "0.2" diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index c703095d..b905b444 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -148,6 +148,8 @@ where Closed { /// The ID of the listener that closed. listener_id: ListenerId, + /// The addresses that the listener was listening on. + addresses: Vec, /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err` /// if the stream produced an error. reason: Result<(), TTrans::Error>, @@ -283,12 +285,14 @@ where Poll::Ready(None) => { return Poll::Ready(ListenersEvent::Closed { listener_id: *listener_project.id, + addresses: listener_project.addresses.drain(..).collect(), reason: Ok(()), }) } Poll::Ready(Some(Err(err))) => { return Poll::Ready(ListenersEvent::Closed { listener_id: *listener_project.id, + addresses: listener_project.addresses.drain(..).collect(), reason: Err(err), }) } @@ -351,9 +355,10 @@ where .field("listener_id", listener_id) .field("local_addr", local_addr) .finish(), - ListenersEvent::Closed { listener_id, reason } => f + ListenersEvent::Closed { listener_id, addresses, reason } => f .debug_struct("ListenersEvent::Closed") .field("listener_id", listener_id) + .field("addresses", addresses) .field("reason", reason) .finish(), ListenersEvent::Error { listener_id, error } => f diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 7f92422d..fcfee926 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -871,7 +871,7 @@ impl PoolLimits { { if let Some(limit) = limit { let current = current(); - if limit >= current { + if current >= limit { return Err(ConnectionLimit { limit, current }) } } diff --git a/core/src/network.rs b/core/src/network.rs index 8063d912..94b7df2d 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -356,8 +356,8 @@ where Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => { return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) } - Poll::Ready(ListenersEvent::Closed { listener_id, reason }) => { - return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, reason }) + Poll::Ready(ListenersEvent::Closed { listener_id, addresses, reason }) => { + return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) } Poll::Ready(ListenersEvent::Error { listener_id, error }) => { return Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) diff --git a/core/src/network/event.rs b/core/src/network/event.rs index 692ba6ea..835b6e64 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -54,6 +54,8 @@ where ListenerClosed { /// The listener ID that closed. listener_id: ListenerId, + /// The addresses that the listener was listening on. + addresses: Vec, /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err` /// if the stream produced an error. reason: Result<(), TTrans::Error>, @@ -182,9 +184,10 @@ where .field("listen_addr", listen_addr) .finish() } - NetworkEvent::ListenerClosed { listener_id, reason } => { + NetworkEvent::ListenerClosed { listener_id, addresses, reason } => { f.debug_struct("ListenerClosed") .field("listener_id", listener_id) + .field("addresses", addresses) .field("reason", reason) .finish() } @@ -342,4 +345,3 @@ where self.info().to_connected_point() } } - diff --git a/core/src/peer_id.rs b/core/src/peer_id.rs index 95b7312c..a4a90096 100644 --- a/core/src/peer_id.rs +++ b/core/src/peer_id.rs @@ -27,7 +27,7 @@ use std::{convert::TryFrom, borrow::Borrow, fmt, hash, str::FromStr}; /// Public keys with byte-lengths smaller than `MAX_INLINE_KEY_LENGTH` will be /// automatically used as the peer id using an identity multihash. -const _MAX_INLINE_KEY_LENGTH: usize = 42; +const MAX_INLINE_KEY_LENGTH: usize = 42; /// Identifier of a peer of the network. /// @@ -70,11 +70,11 @@ impl PeerId { // will switch to not hashing the key (i.e. the correct behaviour). // In other words, rust-libp2p 0.16 is compatible with all versions of rust-libp2p. // Rust-libp2p 0.12 and below is **NOT** compatible with rust-libp2p 0.17 and above. - let (hash_algorithm, canonical_algorithm): (_, Option) = /*if key_enc.len() <= MAX_INLINE_KEY_LENGTH { - (multihash::Hash::Identity, Some(multihash::Hash::SHA2256)) - } else {*/ - (Code::Sha2_256, None); - //}; + let (hash_algorithm, canonical_algorithm) = if key_enc.len() <= MAX_INLINE_KEY_LENGTH { + (Code::Identity, Some(Code::Sha2_256)) + } else { + (Code::Sha2_256, None) + }; let canonical = canonical_algorithm.map(|alg| alg.hasher().expect("SHA2-256 hasher is always supported").digest(&key_enc)); diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 10e7f4b9..e4603f34 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -32,65 +32,17 @@ use libp2p_core::{ network::NetworkEvent, upgrade, }; -use libp2p_swarm::{ - NegotiatedSubstream, - ProtocolsHandler, - KeepAlive, - SubstreamProtocol, - ProtocolsHandlerEvent, - ProtocolsHandlerUpgrErr, - protocols_handler::NodeHandlerWrapperBuilder -}; use rand::seq::SliceRandom; -use std::{io, task::Context, task::Poll}; +use std::{io, task::Poll}; +use util::TestHandler; -// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ? -#[derive(Default)] -struct TestHandler; - -impl ProtocolsHandler for TestHandler { - type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) - type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) - type Error = io::Error; - type InboundProtocol = upgrade::DeniedUpgrade; - type OutboundProtocol = upgrade::DeniedUpgrade; - type OutboundOpenInfo = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(upgrade::DeniedUpgrade) - } - - fn inject_fully_negotiated_inbound( - &mut self, - _: >::Output - ) { panic!() } - - fn inject_fully_negotiated_outbound( - &mut self, - _: >::Output, - _: Self::OutboundOpenInfo - ) { panic!() } - - fn inject_event(&mut self, _: Self::InEvent) { - panic!() - } - - fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) { - - } - - fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No } - - fn poll(&mut self, _: &mut Context) -> Poll> { - Poll::Pending - } -} +type TestNetwork = Network; #[test] fn deny_incoming_connec() { // Checks whether refusing an incoming connection on a swarm triggers the correct events. - let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder, _, _> = { + let mut swarm1 = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() @@ -98,7 +50,7 @@ fn deny_incoming_connec() { .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - Network::new(transport, local_public_key.into(), Default::default()) + TestNetwork::new(transport, local_public_key.into(), Default::default()) }; let mut swarm2 = { @@ -109,7 +61,7 @@ fn deny_incoming_connec() { .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - Network::new(transport, local_public_key.into(), Default::default()) + TestNetwork::new(transport, local_public_key.into(), Default::default()) }; swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); @@ -125,7 +77,7 @@ fn deny_incoming_connec() { swarm2 .peer(swarm1.local_peer_id().clone()) .into_disconnected().unwrap() - .connect(address.clone(), Vec::new(), TestHandler::default().into_node_handler_builder()) + .connect(address.clone(), Vec::new(), TestHandler()) .unwrap(); async_std::task::block_on(future::poll_fn(|cx| -> Poll> { @@ -180,7 +132,7 @@ fn dial_self() { util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }) .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - Network::new(transport, local_public_key.into(), Default::default()) + TestNetwork::new(transport, local_public_key.into(), Default::default()) }; swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); @@ -195,7 +147,7 @@ fn dial_self() { })) .unwrap(); - swarm.dial(&local_address, TestHandler::default().into_node_handler_builder()).unwrap(); + swarm.dial(&local_address, TestHandler()).unwrap(); let mut got_dial_err = false; let mut got_inc_err = false; @@ -226,7 +178,7 @@ fn dial_self() { }, Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { assert_eq!(*inc.local_addr(), local_address); - inc.accept(TestHandler::default().into_node_handler_builder()).unwrap(); + inc.accept(TestHandler()).unwrap(); }, Poll::Ready(ev) => { panic!("Unexpected event: {:?}", ev) @@ -242,7 +194,7 @@ fn dial_self_by_id() { // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // place. - let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder> = { + let mut swarm = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() @@ -250,7 +202,7 @@ fn dial_self_by_id() { .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - Network::new(transport, local_public_key.into(), Default::default()) + TestNetwork::new(transport, local_public_key.into(), Default::default()) }; let peer_id = swarm.local_peer_id().clone(); @@ -269,7 +221,7 @@ fn multiple_addresses_err() { .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))); - Network::new(transport, local_public_key.into(), Default::default()) + TestNetwork::new(transport, local_public_key.into(), Default::default()) }; let mut addresses = Vec::new(); @@ -287,7 +239,7 @@ fn multiple_addresses_err() { let target = PeerId::random(); swarm.peer(target.clone()) .into_disconnected().unwrap() - .connect(first, rest, TestHandler::default().into_node_handler_builder()) + .connect(first, rest, TestHandler()) .unwrap(); async_std::task::block_on(future::poll_fn(|cx| -> Poll> { diff --git a/core/tests/util.rs b/core/tests/util.rs index 395e0d9c..95accf1b 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -3,7 +3,38 @@ use futures::prelude::*; use libp2p_core::muxing::StreamMuxer; -use std::{pin::Pin, task::Context, task::Poll}; +use libp2p_core::{ + connection::{ + ConnectionHandler, + ConnectionHandlerEvent, + Substream, + SubstreamEndpoint, + }, + muxing::StreamMuxerBox, +}; +use std::{io, pin::Pin, task::Context, task::Poll}; + +pub struct TestHandler(); + +impl ConnectionHandler for TestHandler { + type InEvent = (); + type OutEvent = (); + type Error = io::Error; + type Substream = Substream; + type OutboundOpenInfo = (); + + fn inject_substream(&mut self, _: Self::Substream, _: SubstreamEndpoint) + {} + + fn inject_event(&mut self, _: Self::InEvent) + {} + + fn poll(&mut self, _: &mut Context) + -> Poll, Self::Error>> + { + Poll::Ready(Ok(ConnectionHandlerEvent::Custom(()))) + } +} pub struct CloseMuxer { state: CloseMuxerState, diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 790d0636..6c59f932 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -14,4 +14,4 @@ futures = "0.3.1" libp2p-core = { version = "0.16.0", path = "../../core" } parking_lot = "0.10" thiserror = "1.0" -yamux = "0.4.4" +yamux = "0.4.5" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 888bd742..3e644c2d 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -24,10 +24,10 @@ mod test; use crate::K_VALUE; use crate::addresses::{Addresses, Remove}; -use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; +use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; use crate::jobs::*; use crate::kbucket::{self, KBucketsTable, NodeStatus}; -use crate::protocol::{KadConnectionType, KadPeer}; +use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer}; use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState}; use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord}; use crate::contact::Contact; @@ -50,16 +50,13 @@ use wasm_timer::Instant; use libp2p_core::identity::ed25519::{Keypair, PublicKey}; use trust_graph::TrustGraph; -// TODO: how Kademlia knows hers PeerId? It's stored in KBucketsTable -// TODO: add there hers PublicKey, and exchange it on the network - /// Network behaviour that handles Kademlia. pub struct Kademlia { /// The Kademlia routing table. kbuckets: KBucketsTable, Contact>, - /// An optional protocol name override to segregate DHTs in the network. - protocol_name_override: Option>, + /// Configuration of the wire protocol. + protocol_config: KademliaProtocolConfig, /// The currently active (i.e. in-progress) queries. queries: QueryPool, @@ -83,6 +80,9 @@ pub struct Kademlia { /// The TTL of provider records. provider_record_ttl: Option, + /// How long to keep connections alive when they're idle. + connection_idle_timeout: Duration, + /// Queued events to return when the behaviour is being polled. queued_events: VecDeque, KademliaEvent>>, @@ -99,12 +99,13 @@ pub struct Kademlia { pub struct KademliaConfig { kbucket_pending_timeout: Duration, query_config: QueryConfig, - protocol_name_override: Option>, + protocol_config: KademliaProtocolConfig, record_ttl: Option, record_replication_interval: Option, record_publication_interval: Option, provider_record_ttl: Option, provider_publication_interval: Option, + connection_idle_timeout: Duration, } impl Default for KademliaConfig { @@ -112,12 +113,13 @@ impl Default for KademliaConfig { KademliaConfig { kbucket_pending_timeout: Duration::from_secs(60), query_config: QueryConfig::default(), - protocol_name_override: None, + protocol_config: Default::default(), record_ttl: Some(Duration::from_secs(36 * 60 * 60)), record_replication_interval: Some(Duration::from_secs(60 * 60)), record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)), provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)), provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)), + connection_idle_timeout: Duration::from_secs(10), } } } @@ -128,7 +130,7 @@ impl KademliaConfig { /// Kademlia nodes only communicate with other nodes using the same protocol name. Using a /// custom name therefore allows to segregate the DHT from others, if that is desired. pub fn set_protocol_name(&mut self, name: impl Into>) -> &mut Self { - self.protocol_name_override = Some(name.into()); + self.protocol_config.set_protocol_name(name); self } @@ -225,6 +227,20 @@ impl KademliaConfig { self.provider_publication_interval = interval; self } + + /// Sets the amount of time to keep connections alive when they're idle. + pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self { + self.connection_idle_timeout = duration; + self + } + + /// Modifies the maximum allowed size of individual Kademlia packets. + /// + /// It might be necessary to increase this value if trying to put large records. + pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self { + self.protocol_config.set_max_packet_size(size); + self + } } impl Kademlia @@ -238,9 +254,7 @@ where /// Get the protocol name of this kademlia instance. pub fn protocol_name(&self) -> &[u8] { - self.protocol_name_override - .as_ref() - .map_or(crate::protocol::DEFAULT_PROTO_NAME.as_ref(), AsRef::as_ref) + self.protocol_config.protocol_name() } /// Creates a new `Kademlia` network behaviour with the given configuration. @@ -264,7 +278,7 @@ where Kademlia { store, kbuckets: KBucketsTable::new(kp, local_key, config.kbucket_pending_timeout), - protocol_name_override: config.protocol_name_override, + protocol_config: config.protocol_config, queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()), queries: QueryPool::new(config.query_config), connected_peers: Default::default(), @@ -272,7 +286,8 @@ where put_record_job, record_ttl: config.record_ttl, provider_record_ttl: config.provider_record_ttl, - trust + connection_idle_timeout: config.connection_idle_timeout, + trust, } } @@ -977,9 +992,7 @@ where let num_between = self.kbuckets.count_nodes_between(&target); let k = self.queries.config().replication_factor.get(); let num_beyond_k = (usize::max(k, num_between) - k) as u32; - let expiration = self.record_ttl.map(|ttl| - now + Duration::from_secs(ttl.as_secs() >> num_beyond_k) - ); + let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k)); // The smaller TTL prevails. Only if neither TTL is set is the record // stored "forever". record.expires = record.expires.or(expiration).min(expiration); @@ -1002,31 +1015,41 @@ where // overridden as it avoids having to load the existing record in the // first place. - // The record is cloned because of the weird libp2p protocol requirement - // to send back the value in the response, although this is a waste of - // resources. - match self.store.put(record.clone()) { - Ok(()) => { - debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()); - self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::PutRecordRes { - key: record.key, - value: record.value, - request_id, - }, - }) - } - Err(e) => { - info!("Record not stored: {:?}", e); - self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::Reset(request_id) - }) + if !record.is_expired(now) { + // The record is cloned because of the weird libp2p protocol + // requirement to send back the value in the response, although this + // is a waste of resources. + match self.store.put(record.clone()) { + Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()), + Err(e) => { + info!("Record not stored: {:?}", e); + self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::Reset(request_id) + }); + + return + } } } + + // The remote receives a [`KademliaHandlerIn::PutRecordRes`] even in the + // case where the record is discarded due to being expired. Given that + // the remote sent the local node a [`KademliaHandlerEvent::PutRecord`] + // request, the remote perceives the local node as one node among the k + // closest nodes to the target. In addition returning + // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal + // information to a possibly malicious remote node. + self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::PutRecordRes { + key: record.key, + value: record.value, + request_id, + }, + }) } /// Processes a provider record received from a peer. @@ -1053,6 +1076,11 @@ where } } +/// Exponentially decrease the given duration (base 2). +fn exp_decrease(ttl: Duration, exp: u32) -> Duration { + Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0)) +} + impl NetworkBehaviour for Kademlia where for<'a> TStore: RecordStore<'a>, @@ -1062,11 +1090,11 @@ where type OutEvent = KademliaEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - let mut handler = KademliaHandler::dial_and_listen(); - if let Some(name) = self.protocol_name_override.as_ref() { - handler = handler.with_protocol_name(name.clone()); - } - handler + KademliaHandler::new(KademliaHandlerConfig { + protocol_config: self.protocol_config.clone(), + allow_listening: true, + idle_timeout: self.connection_idle_timeout, + }) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -1356,7 +1384,7 @@ where fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll< NetworkBehaviourAction< - ::InEvent, + as ProtocolsHandler>::InEvent, Self::OutEvent, >, > { @@ -1967,4 +1995,3 @@ impl QueryInfo { } } } - diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index d3a0f7e4..22436811 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -/* #![cfg(test)] use super::*; @@ -683,4 +682,15 @@ fn exceed_jobs_max_queries() { }) ) } -*/ \ No newline at end of file + +#[test] +fn exp_decr_expiration_overflow() { + fn prop_no_panic(ttl: Duration, factor: u32) { + exp_decrease(ttl, factor); + } + + // Right shifting a u64 by >63 results in a panic. + prop_no_panic(KademliaConfig::default().record_ttl.unwrap(), 64); + + quickcheck(prop_no_panic as fn(_, _)) +} diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index ac2430f0..57f3c55b 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -37,7 +37,7 @@ use libp2p_core::{ upgrade::{self, InboundUpgrade, OutboundUpgrade} }; use log::trace; -use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; /// Protocol handler that handles Kademlia communications with the remote. @@ -48,10 +48,7 @@ use wasm_timer::Instant; /// It also handles requests made by the remote. pub struct KademliaHandler { /// Configuration for the Kademlia protocol. - config: KademliaProtocolConfig, - - /// If false, we always refuse incoming Kademlia substreams. - allow_listening: bool, + config: KademliaHandlerConfig, /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, @@ -63,6 +60,19 @@ pub struct KademliaHandler { keep_alive: KeepAlive, } +/// Configuration of a [`KademliaHandler`]. +#[derive(Debug, Clone)] +pub struct KademliaHandlerConfig { + /// Configuration of the wire protocol. + pub protocol_config: KademliaProtocolConfig, + + /// If false, we deny incoming requests. + pub allow_listening: bool, + + /// Time after which we close an idle connection. + pub idle_timeout: Duration, +} + /// State of an active substream, opened either by us or by the remote. enum SubstreamState { /// We haven't started opening the outgoing substream yet. @@ -369,42 +379,22 @@ pub struct KademliaRequestId { struct UniqueConnecId(u64); impl KademliaHandler { - /// Create a `KademliaHandler` that only allows sending messages to the remote but denying - /// incoming connections. - pub fn dial_only() -> Self { - KademliaHandler::with_allow_listening(false) - } + /// Create a [`KademliaHandler`] using the given configuration. + pub fn new(config: KademliaHandlerConfig) -> Self { + let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); - /// Create a `KademliaHandler` that only allows sending messages but also receive incoming - /// requests. - /// - /// The `Default` trait implementation wraps around this function. - pub fn dial_and_listen() -> Self { - KademliaHandler::with_allow_listening(true) - } - - fn with_allow_listening(allow_listening: bool) -> Self { KademliaHandler { - config: Default::default(), - allow_listening, + config, next_connec_unique_id: UniqueConnecId(0), substreams: Vec::new(), - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(10)), + keep_alive, } } - - /// Modifies the protocol name used on the wire. Can be used to create incompatibilities - /// between networks on purpose. - pub fn with_protocol_name(mut self, name: impl Into>) -> Self { - self.config = self.config.with_protocol_name(name); - self - } } impl Default for KademliaHandler { - #[inline] fn default() -> Self { - KademliaHandler::dial_and_listen() + KademliaHandler::new(Default::default()) } } @@ -422,8 +412,8 @@ where #[inline] fn listen_protocol(&self) -> SubstreamProtocol { - if self.allow_listening { - SubstreamProtocol::new(self.config.clone()).map_upgrade(upgrade::EitherUpgrade::A) + if self.config.allow_listening { + SubstreamProtocol::new(self.config.protocol_config.clone()).map_upgrade(upgrade::EitherUpgrade::A) } else { SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)) } @@ -449,7 +439,7 @@ where EitherOutput::Second(p) => void::unreachable(p), }; - debug_assert!(self.allow_listening); + debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; self.substreams @@ -635,7 +625,7 @@ where let mut substream = self.substreams.swap_remove(n); loop { - match advance_substream(substream, self.config.clone(), cx) { + match advance_substream(substream, self.config.protocol_config.clone(), cx) { (Some(new_state), Some(event), _) => { self.substreams.push(new_state); return Poll::Ready(event); @@ -672,6 +662,16 @@ where } } +impl Default for KademliaHandlerConfig { + fn default() -> Self { + KademliaHandlerConfig { + protocol_config: Default::default(), + allow_listening: true, + idle_timeout: Duration::from_secs(10), + } + } +} + /// Advances one substream. /// /// Returns the new state for that substream, an event to generate, and whether the substream diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 55bd8ea5..0639544f 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -153,21 +153,33 @@ impl Into for KadPeer { #[derive(Debug, Clone)] pub struct KademliaProtocolConfig { protocol_name: Cow<'static, [u8]>, + /// Maximum allowed size of a packet. + max_packet_size: usize, } impl KademliaProtocolConfig { + /// Returns the configured protocol name. + pub fn protocol_name(&self) -> &[u8] { + &self.protocol_name + } + /// Modifies the protocol name used on the wire. Can be used to create incompatibilities /// between networks on purpose. - pub fn with_protocol_name(mut self, name: impl Into>) -> Self { + pub fn set_protocol_name(&mut self, name: impl Into>) { self.protocol_name = name.into(); - self + } + + /// Modifies the maximum allowed size of a single Kademlia packet. + pub fn set_max_packet_size(&mut self, size: usize) { + self.max_packet_size = size; } } impl Default for KademliaProtocolConfig { fn default() -> Self { KademliaProtocolConfig { - protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME) + protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME), + max_packet_size: 4096, } } } @@ -191,7 +203,7 @@ where fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { let mut codec = UviBytes::default(); - codec.set_max_len(4096); + codec.set_max_len(self.max_packet_size); future::ok( Framed::new(incoming, codec) @@ -223,7 +235,7 @@ where fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { let mut codec = UviBytes::default(); - codec.set_max_len(4096); + codec.set_max_len(self.max_packet_size); future::ok( Framed::new(incoming, codec) diff --git a/protocols/kad/src/record/store.rs b/protocols/kad/src/record/store.rs index 30e023fd..c050b35e 100644 --- a/protocols/kad/src/record/store.rs +++ b/protocols/kad/src/record/store.rs @@ -20,7 +20,7 @@ mod memory; -pub use memory::MemoryStore; +pub use memory::{MemoryStore, MemoryStoreConfig}; use crate::K_VALUE; use super::*; diff --git a/protocols/mdns/src/service.rs b/protocols/mdns/src/service.rs index cf48e68e..3f8f3dfc 100644 --- a/protocols/mdns/src/service.rs +++ b/protocols/mdns/src/service.rs @@ -127,7 +127,7 @@ impl MdnsService { Self::new_inner(false) } - /// Same as `new`, but we don't send automatically send queries on the network. + /// Same as `new`, but we don't automatically send queries on the network. pub fn silent() -> io::Result { Self::new_inner(true) } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 8ef9eecb..6e92aab7 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -445,8 +445,11 @@ where TBehaviour: NetworkBehaviour, this.behaviour.inject_expired_listen_addr(&listen_addr); return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr)); } - Poll::Ready(NetworkEvent::ListenerClosed { listener_id, reason }) => { + Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => { log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); + for addr in addresses.iter() { + this.behaviour.inject_expired_listen_addr(addr); + } this.behaviour.inject_listener_closed(listener_id); } Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => @@ -678,9 +681,9 @@ where /// Notify all of the given connections of a peer of an event. /// /// Returns `Some` with the given event and a new list of connections if -/// at least one of the given connections is not currently able to receive the event -/// but is not closing, in which case the current task is scheduled to be woken up. -/// The returned connections are those which are not closing. +/// at least one of the given connections is currently not able to receive +/// the event, in which case the current task is scheduled to be woken up and +/// the returned connections are those which still need to receive the event. /// /// Returns `None` if all connections are either closing or the event /// was successfully sent to all handlers whose connections are not closing, @@ -704,26 +707,23 @@ where } } - { - let mut pending = SmallVec::new(); - for id in ids.iter() { - if let Some(mut conn) = peer.connection(*id) { // (*) - if conn.poll_ready_notify_handler(cx).is_pending() { - pending.push(*id) - } + let mut pending = SmallVec::new(); + for id in ids.into_iter() { + if let Some(mut conn) = peer.connection(id) { + match conn.poll_ready_notify_handler(cx) { + Poll::Pending => pending.push(id), + Poll::Ready(Ok(())) => { + // Can now only fail due to the connection suddenly closing, + // which we ignore. + let _ = conn.notify_handler(event.clone()); + }, + Poll::Ready(Err(())) => {} // connection is closing } } - if !pending.is_empty() { - return Some((event, pending)) - } } - for id in ids.into_iter() { - if let Some(mut conn) = peer.connection(id) { - // All connections were ready. Can now only fail due - // to a connection suddenly closing, which we ignore. - let _ = conn.notify_handler(event.clone()); - } + if !pending.is_empty() { + return Some((event, pending)) } None