Merge branch 'master' into rand-feature

This commit is contained in:
Ivan Boldyrev
2023-10-14 11:24:57 +04:00
28 changed files with 240 additions and 241 deletions

View File

@ -87,6 +87,7 @@ struct PeriodicJob<T> {
}
impl<T> PeriodicJob<T> {
#[cfg(test)]
fn is_running(&self) -> bool {
match self.state {
PeriodicJobState::Running(..) => true,
@ -96,6 +97,7 @@ impl<T> PeriodicJob<T> {
/// Cuts short the remaining delay, if the job is currently waiting
/// for the delay to expire.
#[cfg(test)]
fn asap(&mut self) {
if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
let new_deadline = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
@ -169,6 +171,7 @@ impl PutRecordJob {
}
/// Checks whether the job is currently running.
#[cfg(test)]
pub(crate) fn is_running(&self) -> bool {
self.inner.is_running()
}
@ -177,6 +180,7 @@ impl PutRecordJob {
/// for the delay to expire.
///
/// The job is guaranteed to run on the next invocation of `poll`.
#[cfg(test)]
pub(crate) fn asap(&mut self, publish: bool) {
if publish {
self.next_publish = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())
@ -273,6 +277,7 @@ impl AddProviderJob {
}
/// Checks whether the job is currently running.
#[cfg(test)]
pub(crate) fn is_running(&self) -> bool {
self.inner.is_running()
}
@ -281,6 +286,7 @@ impl AddProviderJob {
/// for the delay to expire.
///
/// The job is guaranteed to run on the next invocation of `poll`.
#[cfg(test)]
pub(crate) fn asap(&mut self) {
self.inner.asap()
}

View File

@ -54,10 +54,6 @@ pub enum NodeStatus {
}
impl<TKey, TVal> PendingNode<TKey, TVal> {
pub(crate) fn key(&self) -> &TKey {
&self.node.key
}
pub(crate) fn status(&self) -> NodeStatus {
self.status
}
@ -70,6 +66,7 @@ impl<TKey, TVal> PendingNode<TKey, TVal> {
Instant::now() >= self.replace
}
#[cfg(test)]
pub(crate) fn set_ready_at(&mut self, t: Instant) {
self.replace = t;
}
@ -191,11 +188,6 @@ where
.filter(|p| p.node.key.as_ref() == key.as_ref())
}
/// Returns a reference to a node in the bucket.
pub(crate) fn get(&self, key: &TKey) -> Option<&Node<TKey, TVal>> {
self.position(key).map(|p| &self.nodes[p.0])
}
/// Returns an iterator over the nodes in the bucket, together with their status.
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Node<TKey, TVal>, NodeStatus)> {
self.nodes
@ -398,22 +390,19 @@ where
}
}
/// Checks whether the given position refers to a connected node.
pub(crate) fn is_connected(&self, pos: Position) -> bool {
self.status(pos) == NodeStatus::Connected
}
/// Gets the number of entries currently in the bucket.
pub(crate) fn num_entries(&self) -> usize {
self.nodes.len()
}
/// Gets the number of entries in the bucket that are considered connected.
#[cfg(test)]
pub(crate) fn num_connected(&self) -> usize {
self.first_connected_pos.map_or(0, |i| self.nodes.len() - i)
}
/// Gets the number of entries in the bucket that are considered disconnected.
#[cfg(test)]
pub(crate) fn num_disconnected(&self) -> usize {
self.nodes.len() - self.num_connected()
}

View File

@ -135,20 +135,6 @@ where
}
}
/// Returns the key of the entry.
///
/// Returns `None` if the `Key` used to construct this `Entry` is not a valid
/// key for an entry in a bucket, which is the case for the `local_key` of
/// the `KBucketsTable` referring to the local node.
pub(crate) fn key(&self) -> Option<&TKey> {
match self {
Entry::Present(entry, _) => Some(entry.key()),
Entry::Pending(entry, _) => Some(entry.key()),
Entry::Absent(entry) => Some(entry.key()),
Entry::SelfEntry => None,
}
}
/// Returns the value associated with the entry.
///
/// Returns `None` if the entry is absent from any bucket or refers to the
@ -175,11 +161,6 @@ where
PresentEntry(EntryRef { bucket, key })
}
/// Returns the key of the entry.
pub(crate) fn key(&self) -> &TKey {
self.0.key
}
/// Returns the value associated with the key.
pub(crate) fn value(&mut self) -> &mut TVal {
&mut self
@ -218,11 +199,6 @@ where
PendingEntry(EntryRef { bucket, key })
}
/// Returns the key of the entry.
pub(crate) fn key(&self) -> &TKey {
self.0.key
}
/// Returns the value associated with the key.
pub(crate) fn value(&mut self) -> &mut TVal {
self.0
@ -262,11 +238,6 @@ where
AbsentEntry(EntryRef { bucket, key })
}
/// Returns the key of the entry.
pub(crate) fn key(&self) -> &TKey {
self.0.key
}
/// Attempts to insert the entry into a bucket.
pub(crate) fn insert(self, value: TVal, status: NodeStatus) -> InsertResult<TKey> {
self.0.bucket.insert(

View File

@ -33,9 +33,6 @@
//! existing nodes in the kademlia network cannot obtain the listen addresses
//! of nodes querying them, and thus will not be able to add them to their routing table.
// TODO: we allow dead_code for now because this library contains a lot of unused code that will
// be useful later for record store
#![allow(dead_code)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod record_priv;

View File

@ -326,15 +326,6 @@ impl<TInner> Query<TInner> {
}
}
/// Checks whether the query is currently waiting for a result from `peer`.
pub(crate) fn is_waiting(&self, peer: &PeerId) -> bool {
match &self.peer_iter {
QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
QueryPeerIter::Fixed(iter) => iter.is_waiting(peer),
}
}
/// Advances the state of the underlying peer iterator.
fn next(&mut self, now: Instant) -> PeersIterState<'_> {
let state = match &mut self.peer_iter {

View File

@ -788,6 +788,7 @@ mod tests {
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
#[test]
fn stalled_at_capacity() {
fn prop(mut iter: ClosestPeersIter) {
iter.state = State::Stalled;

View File

@ -31,7 +31,6 @@ use std::{
/// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery
/// path per configured parallelism according to the S/Kademlia paper.
pub(crate) struct ClosestDisjointPeersIter {
config: ClosestPeersIterConfig,
target: KeyBytes,
/// The set of wrapped [`ClosestPeersIter`].
@ -51,6 +50,7 @@ pub(crate) struct ClosestDisjointPeersIter {
impl ClosestDisjointPeersIter {
/// Creates a new iterator with a default configuration.
#[cfg(test)]
pub(crate) fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
@ -88,7 +88,6 @@ impl ClosestDisjointPeersIter {
let iters_len = iters.len();
ClosestDisjointPeersIter {
config,
target: target.into(),
iters,
iter_order: (0..iters_len)
@ -190,10 +189,6 @@ impl ClosestDisjointPeersIter {
updated
}
pub(crate) fn is_waiting(&self, peer: &PeerId) -> bool {
self.iters.iter().any(|i| i.is_waiting(peer))
}
pub(crate) fn next(&mut self, now: Instant) -> PeersIterState<'_> {
let mut state = None;

View File

@ -115,10 +115,6 @@ impl FixedPeersIter {
false
}
pub(crate) fn is_waiting(&self, peer: &PeerId) -> bool {
self.peers.get(peer) == Some(&PeerState::Waiting)
}
pub(crate) fn finish(&mut self) {
if let State::Waiting { .. } = self.state {
self.state = State::Finished

View File

@ -2,8 +2,10 @@ use libp2p_identify as identify;
use libp2p_identity as identity;
use libp2p_kad::store::MemoryStore;
use libp2p_kad::{Behaviour, Config, Event, Mode};
use libp2p_swarm::Swarm;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt;
use Event::*;
use MyBehaviourEvent::*;
#[async_std::test]
async fn server_gets_added_to_routing_table_by_client() {
@ -16,16 +18,16 @@ async fn server_gets_added_to_routing_table_by_client() {
client.connect(&mut server).await;
let server_peer_id = *server.local_peer_id();
async_std::task::spawn(server.loop_on_next());
match libp2p_swarm_test::drive(&mut client, &mut server).await {
(
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(Event::RoutingUpdated { peer, .. })],
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_)],
) => {
assert_eq!(peer, server_peer_id)
}
other => panic!("Unexpected events: {other:?}"),
}
let peer = client
.wait(|e| match e {
SwarmEvent::Behaviour(Kad(RoutingUpdated { peer, .. })) => Some(peer),
_ => None,
})
.await;
assert_eq!(peer, server_peer_id);
}
#[async_std::test]
@ -41,12 +43,10 @@ async fn two_servers_add_each_other_to_routing_table() {
let server1_peer_id = *server1.local_peer_id();
let server2_peer_id = *server2.local_peer_id();
use Event::*;
use MyBehaviourEvent::*;
match libp2p_swarm_test::drive(&mut server1, &mut server2).await {
(
[Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer1, .. })],
[Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer1, .. })]
| [Identify(_), Kad(RoutingUpdated { peer: peer1, .. }), Identify(_)],
[Identify(_), Identify(_)],
) => {
assert_eq!(peer1, server2_peer_id);
@ -57,19 +57,16 @@ async fn two_servers_add_each_other_to_routing_table() {
server1.listen().await;
server2.connect(&mut server1).await;
match libp2p_swarm_test::drive(&mut server2, &mut server1).await {
(
[Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)],
[Identify(_), Identify(_)],
)
| (
[Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })],
[Identify(_), Identify(_)],
) => {
assert_eq!(peer2, server1_peer_id);
}
other => panic!("Unexpected events: {other:?}"),
}
async_std::task::spawn(server1.loop_on_next());
let peer = server2
.wait(|e| match e {
SwarmEvent::Behaviour(Kad(RoutingUpdated { peer, .. })) => Some(peer),
_ => None,
})
.await;
assert_eq!(peer, server1_peer_id);
}
#[async_std::test]
@ -85,17 +82,12 @@ async fn adding_an_external_addresses_activates_server_mode_on_existing_connecti
// Remove memory address to simulate a server that doesn't know its external address.
server.remove_external_address(&memory_addr);
client.dial(memory_addr.clone()).unwrap();
use MyBehaviourEvent::*;
// Do the usual identify send/receive dance.
match libp2p_swarm_test::drive(&mut client, &mut server).await {
([Identify(_), Identify(_)], [Identify(_), Identify(_)]) => {}
other => panic!("Unexpected events: {other:?}"),
}
use Event::*;
// Server learns its external address (this could be through AutoNAT or some other mechanism).
server.add_external_address(memory_addr);
@ -125,34 +117,38 @@ async fn set_client_to_server_mode() {
let server_peer_id = *server.local_peer_id();
match libp2p_swarm_test::drive(&mut client, &mut server).await {
(
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(Event::RoutingUpdated { peer, .. })],
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(identify::Event::Received { info, .. })],
) => {
assert_eq!(peer, server_peer_id);
assert!(info
.protocols
.iter()
.all(|proto| libp2p_kad::PROTOCOL_NAME.ne(proto)))
}
other => panic!("Unexpected events: {other:?}"),
}
let client_event = client.wait(|e| match e {
SwarmEvent::Behaviour(Kad(RoutingUpdated { peer, .. })) => Some(peer),
_ => None,
});
let server_event = server.wait(|e| match e {
SwarmEvent::Behaviour(Identify(identify::Event::Received { info, .. })) => Some(info),
_ => None,
});
let (peer, info) = futures::future::join(client_event, server_event).await;
assert_eq!(peer, server_peer_id);
assert!(info
.protocols
.iter()
.all(|proto| libp2p_kad::PROTOCOL_NAME.ne(proto)));
client.behaviour_mut().kad.set_mode(Some(Mode::Server));
match libp2p_swarm_test::drive(&mut client, &mut server).await {
(
[MyBehaviourEvent::Identify(_)],
[MyBehaviourEvent::Identify(identify::Event::Received { info, .. }), MyBehaviourEvent::Kad(_)],
) => {
assert!(info
.protocols
.iter()
.any(|proto| libp2p_kad::PROTOCOL_NAME.eq(proto)))
}
other => panic!("Unexpected events: {other:?}"),
}
async_std::task::spawn(client.loop_on_next());
let info = server
.wait(|e| match e {
SwarmEvent::Behaviour(Identify(identify::Event::Received { info, .. })) => Some(info),
_ => None,
})
.await;
assert!(info
.protocols
.iter()
.any(|proto| libp2p_kad::PROTOCOL_NAME.eq(proto)));
}
#[derive(libp2p_swarm::NetworkBehaviour)]

View File

@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"]
async-io = { version = "1.13.0", optional = true }
data-encoding = "2.4.0"
futures = "0.3.28"
if-watch = "3.0.1"
if-watch = "3.1.0"
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }