refactor(kad): rename to follow naming convention across repository

Renamed the following

`kad::Kademlia` -> `kad::Behaviour`
`kad::KademliaEvent` -> `kad::Event`
`kad::KademliaBucketInserts` -> `kad::BucketInserts`
`kad::KademliaStoreInserts` -> `kad::StoreInserts`
`kad::KademliaConfig` -> `kad::Config`
`kad::KademliaCaching` -> `kad::Caching`
`kad::KademliaEvent` -> `kad::Event`
`kad::KademliaConnectionType` -> `kad::ConnectionType`
`KademliaHandler` -> `Handler`
`KademliaHandlerEvent` -> `HandlerEvent`
`KademliaProtocolConfig` -> `ProtocolConfig`
`KademliaHandlerIn` -> `HandlerIn`
`KademliaRequestId` -> `RequestId`
`KademliaHandlerQueryErr` -> `HandlerQueryErr`

Resolves: #4485

Pull-Request: #4547.
This commit is contained in:
Panagiotis Ganelis
2023-09-27 22:25:22 +02:00
committed by GitHub
parent fffd47b69f
commit c8b5f49ec2
19 changed files with 525 additions and 530 deletions

2
Cargo.lock generated
View File

@ -2661,7 +2661,7 @@ dependencies = [
[[package]]
name = "libp2p-kad"
version = "0.44.5"
version = "0.44.6"
dependencies = [
"arrayvec",
"async-std",

View File

@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.43.1", path = "protocols/identify" }
libp2p-identity = { version = "0.2.3" }
libp2p-kad = { version = "0.44.5", path = "protocols/kad" }
libp2p-kad = { version = "0.44.6", path = "protocols/kad" }
libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" }
libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" }
libp2p-metrics = { version = "0.13.1", path = "misc/metrics" }

View File

@ -23,12 +23,9 @@
use async_std::io;
use futures::{prelude::*, select};
use libp2p::core::upgrade::Version;
use libp2p::kad;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::Mode;
use libp2p::kad::{
record::Key, AddProviderOk, GetProvidersOk, GetRecordOk, Kademlia, KademliaEvent, PeerRecord,
PutRecordOk, QueryResult, Quorum, Record,
};
use libp2p::{
identity, mdns, noise,
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
@ -54,18 +51,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "MyBehaviourEvent")]
struct MyBehaviour {
kademlia: Kademlia<MemoryStore>,
kademlia: kad::Behaviour<MemoryStore>,
mdns: mdns::async_io::Behaviour,
}
#[allow(clippy::large_enum_variant)]
enum MyBehaviourEvent {
Kademlia(KademliaEvent),
Kademlia(kad::Event),
Mdns(mdns::Event),
}
impl From<KademliaEvent> for MyBehaviourEvent {
fn from(event: KademliaEvent) -> Self {
impl From<kad::Event> for MyBehaviourEvent {
fn from(event: kad::Event) -> Self {
MyBehaviourEvent::Kademlia(event)
}
}
@ -80,7 +77,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut swarm = {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id);
let kademlia = Kademlia::new(local_peer_id, store);
let kademlia = kad::Behaviour::new(local_peer_id, store);
let mdns = mdns::async_io::Behaviour::new(mdns::Config::default(), local_peer_id)?;
let behaviour = MyBehaviour { kademlia, mdns };
SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build()
@ -107,9 +104,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key, providers, .. })) => {
kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => {
for peer in providers {
println!(
"Peer {peer:?} provides key {:?}",
@ -117,12 +114,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
);
}
}
QueryResult::GetProviders(Err(err)) => {
kad::QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {err:?}");
}
QueryResult::GetRecord(Ok(
GetRecordOk::FoundRecord(PeerRecord {
record: Record { key, value, .. },
kad::QueryResult::GetRecord(Ok(
kad::GetRecordOk::FoundRecord(kad::PeerRecord {
record: kad::Record { key, value, .. },
..
})
)) => {
@ -132,26 +129,26 @@ async fn main() -> Result<(), Box<dyn Error>> {
std::str::from_utf8(&value).unwrap(),
);
}
QueryResult::GetRecord(Ok(_)) => {}
QueryResult::GetRecord(Err(err)) => {
kad::QueryResult::GetRecord(Ok(_)) => {}
kad::QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {err:?}");
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
println!(
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::PutRecord(Err(err)) => {
kad::QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {err:?}");
}
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => {
println!(
"Successfully put provider record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::StartProviding(Err(err)) => {
kad::QueryResult::StartProviding(Err(err)) => {
eprintln!("Failed to put provider record: {err:?}");
}
_ => {}
@ -163,14 +160,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
fn handle_input_line(kademlia: &mut kad::Behaviour<MemoryStore>, line: String) {
let mut args = line.split(' ');
match args.next() {
Some("GET") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
Some(key) => kad::record::Key::new(&key),
None => {
eprintln!("Expected key");
return;
@ -182,7 +179,7 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
Some("GET_PROVIDERS") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
Some(key) => kad::record::Key::new(&key),
None => {
eprintln!("Expected key");
return;
@ -194,7 +191,7 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
Some("PUT") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
Some(key) => kad::record::Key::new(&key),
None => {
eprintln!("Expected key");
return;
@ -210,20 +207,20 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
}
}
};
let record = Record {
let record = kad::Record {
key,
value,
publisher: None,
expires: None,
};
kademlia
.put_record(record, Quorum::One)
.put_record(record, kad::Quorum::One)
.expect("Failed to store record locally.");
}
Some("PUT_PROVIDER") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
Some(key) => kad::record::Key::new(&key),
None => {
eprintln!("Expected key");
return;

View File

@ -5,11 +5,7 @@ use futures::prelude::*;
use libp2p::{
core::Multiaddr,
identity,
kad::{
self, record::store::MemoryStore, GetProvidersOk, Kademlia, KademliaEvent, QueryId,
QueryResult,
},
identity, kad,
multiaddr::Protocol,
noise,
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
@ -56,7 +52,7 @@ pub(crate) async fn new(
let mut swarm = SwarmBuilder::with_async_std_executor(
transport,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
kademlia: kad::Behaviour::new(peer_id, kad::record::store::MemoryStore::new(peer_id)),
request_response: request_response::cbor::Behaviour::new(
[(
StreamProtocol::new("/file-exchange/1"),
@ -179,8 +175,8 @@ pub(crate) struct EventLoop {
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_start_providing: HashMap<kad::QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<kad::QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
}
@ -221,9 +217,9 @@ impl EventLoop {
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
kad::Event::OutboundQueryProgressed {
id,
result: QueryResult::StartProviding(_),
result: kad::QueryResult::StartProviding(_),
..
},
)) => {
@ -234,11 +230,12 @@ impl EventLoop {
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
kad::Event::OutboundQueryProgressed {
id,
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
providers, ..
kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
providers,
..
})),
..
},
@ -256,11 +253,11 @@ impl EventLoop {
}
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
kad::Event::OutboundQueryProgressed {
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
..
})),
kad::QueryResult::GetProviders(Ok(
kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
)),
..
},
)) => {}
@ -412,13 +409,13 @@ impl EventLoop {
#[behaviour(to_swarm = "ComposedEvent")]
struct ComposedBehaviour {
request_response: request_response::cbor::Behaviour<FileRequest, FileResponse>,
kademlia: Kademlia<MemoryStore>,
kademlia: kad::Behaviour<kad::record::store::MemoryStore>,
}
#[derive(Debug)]
enum ComposedEvent {
RequestResponse(request_response::Event<FileRequest, FileResponse>),
Kademlia(KademliaEvent),
Kademlia(kad::Event),
}
impl From<request_response::Event<FileRequest, FileResponse>> for ComposedEvent {
@ -427,8 +424,8 @@ impl From<request_response::Event<FileRequest, FileResponse>> for ComposedEvent
}
}
impl From<KademliaEvent> for ComposedEvent {
fn from(event: KademliaEvent) -> Self {
impl From<kad::Event> for ComposedEvent {
fn from(event: kad::Event) -> Self {
ComposedEvent::Kademlia(event)
}
}

View File

@ -21,8 +21,8 @@
#![doc = include_str!("../README.md")]
use futures::StreamExt;
use libp2p::kad;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{GetClosestPeersError, Kademlia, KademliaConfig, KademliaEvent, QueryResult};
use libp2p::{
development_transport, identity,
swarm::{SwarmBuilder, SwarmEvent},
@ -51,10 +51,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
let mut cfg = KademliaConfig::default();
let mut cfg = kad::Config::default();
cfg.set_query_timeout(Duration::from_secs(5 * 60));
let store = MemoryStore::new(local_peer_id);
let mut behaviour = Kademlia::with_config(local_peer_id, store, cfg);
let mut behaviour = kad::Behaviour::with_config(local_peer_id, store, cfg);
// Add the bootnodes to the local routing table. `libp2p-dns` built
// into the `transport` resolves the `dnsaddr` when Kademlia tries
@ -78,8 +78,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
let event = swarm.select_next_some().await;
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(result),
if let SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
result: kad::QueryResult::GetClosestPeers(result),
..
}) = event
{
@ -93,7 +93,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Query finished with no closest peers.")
}
}
Err(GetClosestPeersError::Timeout { peers, .. }) => {
Err(kad::GetClosestPeersError::Timeout { peers, .. }) => {
if !peers.is_empty() {
println!("Query timed out with closest peers: {peers:#?}")
} else {

View File

@ -159,10 +159,10 @@ impl Metrics {
}
}
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
impl super::Recorder<libp2p_kad::Event> for Metrics {
fn record(&self, event: &libp2p_kad::Event) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => {
libp2p_kad::Event::OutboundQueryProgressed { result, stats, .. } => {
self.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
@ -217,7 +217,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
_ => {}
}
}
libp2p_kad::KademliaEvent::RoutingUpdated {
libp2p_kad::Event::RoutingUpdated {
is_new_peer,
old_peer,
bucket_range: (low, _high),
@ -250,7 +250,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
}
}
libp2p_kad::KademliaEvent::InboundRequest { request } => {
libp2p_kad::Event::InboundRequest { request } => {
self.inbound_requests.get_or_create(&request.into()).inc();
}
_ => {}

View File

@ -118,8 +118,8 @@ impl Recorder<libp2p_identify::Event> for Metrics {
}
#[cfg(feature = "kad")]
impl Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
impl Recorder<libp2p_kad::Event> for Metrics {
fn record(&self, event: &libp2p_kad::Event) {
self.kad.record(event)
}
}

View File

@ -1,6 +1,6 @@
use libp2p::autonat;
use libp2p::identify;
use libp2p::kad::{record::store::MemoryStore, Kademlia, KademliaConfig};
use libp2p::kad;
use libp2p::ping;
use libp2p::relay;
use libp2p::swarm::behaviour::toggle::Toggle;
@ -20,7 +20,7 @@ pub(crate) struct Behaviour {
relay: relay::Behaviour,
ping: ping::Behaviour,
identify: identify::Behaviour,
pub(crate) kademlia: Toggle<Kademlia<MemoryStore>>,
pub(crate) kademlia: Toggle<kad::Behaviour<kad::record::store::MemoryStore>>,
autonat: Toggle<autonat::Behaviour>,
}
@ -31,15 +31,15 @@ impl Behaviour {
enable_autonat: bool,
) -> Self {
let kademlia = if enable_kademlia {
let mut kademlia_config = KademliaConfig::default();
let mut kademlia_config = kad::Config::default();
// Instantly remove records and provider records.
//
// TODO: Replace hack with option to disable both.
kademlia_config.set_record_ttl(Some(Duration::from_secs(0)));
kademlia_config.set_provider_record_ttl(Some(Duration::from_secs(0)));
let mut kademlia = Kademlia::with_config(
let mut kademlia = kad::Behaviour::with_config(
pub_key.to_peer_id(),
MemoryStore::new(pub_key.to_peer_id()),
kad::record::store::MemoryStore::new(pub_key.to_peer_id()),
kademlia_config,
);
let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io").unwrap();

View File

@ -1,3 +1,9 @@
## 0.44.6 - unreleased
- Rename `Kademlia` symbols to follow naming convention.
See [PR 4547].
[PR 4547]: https://github.com/libp2p/rust-libp2p/pull/4547
## 0.44.5
- Migrate to `quick-protobuf-codec` crate for codec logic.
See [PR 4501].

View File

@ -3,7 +3,7 @@ name = "libp2p-kad"
edition = "2021"
rust-version = { workspace = true }
description = "Kademlia protocol for libp2p"
version = "0.44.5"
version = "0.44.6"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

File diff suppressed because it is too large Load Diff

View File

@ -48,13 +48,13 @@ use std::{
u64,
};
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
type TestSwarm = Swarm<Behaviour<MemoryStore>>;
fn build_node() -> (Multiaddr, TestSwarm) {
build_node_with_config(Default::default())
}
fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
fn build_node_with_config(cfg: Config) -> (Multiaddr, TestSwarm) {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
@ -65,7 +65,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
let local_id = local_public_key.to_peer_id();
let store = MemoryStore::new(local_id);
let behaviour = Kademlia::with_config(local_id, store, cfg);
let behaviour = Behaviour::with_config(local_id, store, cfg);
let mut swarm = SwarmBuilder::without_executor(transport, behaviour, local_id).build();
@ -82,7 +82,7 @@ fn build_nodes(num: usize) -> Vec<(Multiaddr, TestSwarm)> {
}
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Multiaddr, TestSwarm)> {
fn build_nodes_with_config(num: usize, cfg: Config) -> Vec<(Multiaddr, TestSwarm)> {
(0..num)
.map(|_| build_node_with_config(cfg.clone()))
.collect()
@ -95,7 +95,7 @@ fn build_connected_nodes(total: usize, step: usize) -> Vec<(Multiaddr, TestSwarm
fn build_connected_nodes_with_config(
total: usize,
step: usize,
cfg: KademliaConfig,
cfg: Config,
) -> Vec<(Multiaddr, TestSwarm)> {
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms
@ -121,7 +121,7 @@ fn build_connected_nodes_with_config(
fn build_fully_connected_nodes_with_config(
total: usize,
cfg: KademliaConfig,
cfg: Config,
) -> Vec<(Multiaddr, TestSwarm)> {
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_addr_and_peer_id: Vec<_> = swarms
@ -166,7 +166,7 @@ fn bootstrap() {
// or smaller than K_VALUE.
let num_group = rng.gen_range(1..(num_total % K_VALUE.get()) + 2);
let mut cfg = KademliaConfig::default();
let mut cfg = Config::default();
if rng.gen() {
cfg.disjoint_query_paths(true);
}
@ -190,7 +190,7 @@ fn bootstrap() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Event::OutboundQueryProgressed {
id,
result: QueryResult::Bootstrap(Ok(ok)),
..
@ -280,7 +280,7 @@ fn query_iter() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Event::OutboundQueryProgressed {
id,
result: QueryResult::GetClosestPeers(Ok(ok)),
..
@ -338,12 +338,10 @@ fn unresponsive_not_returned_direct() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(ok)),
..
},
))) => {
}))) => {
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
assert_eq!(ok.peers.len(), 0);
return Poll::Ready(());
@ -398,12 +396,10 @@ fn unresponsive_not_returned_indirect() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(ok)),
..
},
))) => {
}))) => {
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id);
@ -453,13 +449,11 @@ fn get_record_not_found() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Err(e)),
..
},
))) => {
}))) => {
assert_eq!(id, qid);
if let GetRecordError::NotFound { key, closest_peers } = e {
assert_eq!(key, target_key);
@ -495,14 +489,14 @@ fn put_record() {
// At least 4 nodes, 1 under test + 3 bootnodes.
let num_total = usize::max(4, replication_factor.get() * 2);
let mut config = KademliaConfig::default();
let mut config = Config::default();
config.set_replication_factor(replication_factor);
if rng.gen() {
config.disjoint_query_paths(true);
}
if filter_records {
config.set_record_filtering(KademliaStoreInserts::FilterBoth);
config.set_record_filtering(StoreInserts::FilterBoth);
}
let mut swarms = {
@ -574,7 +568,7 @@ fn put_record() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Event::OutboundQueryProgressed {
id,
result: QueryResult::PutRecord(res),
stats,
@ -582,7 +576,7 @@ fn put_record() {
},
)))
| Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Event::OutboundQueryProgressed {
id,
result: QueryResult::RepublishRecord(res),
stats,
@ -605,16 +599,14 @@ fn put_record() {
}
}
}
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::InboundRequest {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::InboundRequest {
request: InboundRequest::PutRecord { record, .. },
},
))) => {
}))) => {
if !drop_records {
if let Some(record) = record {
assert_eq!(
swarm.behaviour().record_filtering,
KademliaStoreInserts::FilterBoth
StoreInserts::FilterBoth
);
// Accept the record
swarm
@ -625,7 +617,7 @@ fn put_record() {
} else {
assert_eq!(
swarm.behaviour().record_filtering,
KademliaStoreInserts::Unfiltered
StoreInserts::Unfiltered
);
}
}
@ -684,7 +676,7 @@ fn put_record() {
})
.collect::<HashSet<_>>();
if swarms[0].behaviour().record_filtering != KademliaStoreInserts::Unfiltered
if swarms[0].behaviour().record_filtering != StoreInserts::Unfiltered
&& drop_records
{
assert_eq!(actual.len(), 0);
@ -765,14 +757,12 @@ fn get_record() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(r)),
step: ProgressStep { count, last },
..
},
))) => {
}))) => {
assert_eq!(id, qid);
if usize::from(count) == 1 {
assert!(!last);
@ -829,14 +819,12 @@ fn get_record_many() {
swarm.behaviour_mut().query_mut(&qid).unwrap().finish();
}
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(r)),
step: ProgressStep { count: _, last },
..
},
))) => {
}))) => {
assert_eq!(id, qid);
if let GetRecordOk::FoundRecord(r) = r {
assert_eq!(r.record, record);
@ -870,7 +858,7 @@ fn add_provider() {
// At least 4 nodes, 1 under test + 3 bootnodes.
let num_total = usize::max(4, replication_factor.get() * 2);
let mut config = KademliaConfig::default();
let mut config = Config::default();
config.set_replication_factor(replication_factor);
if rng.gen() {
config.disjoint_query_paths(true);
@ -924,14 +912,14 @@ fn add_provider() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Event::OutboundQueryProgressed {
id,
result: QueryResult::StartProviding(res),
..
},
)))
| Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Event::OutboundQueryProgressed {
id,
result: QueryResult::RepublishProvider(res),
..
@ -1062,7 +1050,7 @@ fn exceed_jobs_max_queries() {
loop {
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
match e {
SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(r)),
..
}) => break assert!(r.peers.is_empty()),
@ -1085,14 +1073,14 @@ fn exp_decr_expiration_overflow() {
}
// Right shifting a u64 by >63 results in a panic.
prop_no_panic(KademliaConfig::default().record_ttl.unwrap(), 64);
prop_no_panic(Config::default().record_ttl.unwrap(), 64);
quickcheck(prop_no_panic as fn(_, _))
}
#[test]
fn disjoint_query_does_not_finish_before_all_paths_did() {
let mut config = KademliaConfig::default();
let mut config = Config::default();
config.disjoint_query_paths(true);
// I.e. setting the amount disjoint paths to be explored to 2.
config.set_parallelism(NonZeroUsize::new(2).unwrap());
@ -1140,13 +1128,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::GetRecord(result),
step,
..
},
))) => {
}))) => {
if i != 0 {
panic!("Expected `QueryResult` from Alice.")
}
@ -1197,13 +1183,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::GetRecord(result),
step,
..
},
))) => {
}))) => {
if i != 0 {
panic!("Expected `QueryResult` from Alice.")
}
@ -1241,11 +1225,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
}
/// Tests that peers are not automatically inserted into
/// the routing table with `KademliaBucketInserts::Manual`.
/// the routing table with `BucketInserts::Manual`.
#[test]
fn manual_bucket_inserts() {
let mut cfg = KademliaConfig::default();
cfg.set_kbucket_inserts(KademliaBucketInserts::Manual);
let mut cfg = Config::default();
cfg.set_kbucket_inserts(BucketInserts::Manual);
// 1 -> 2 -> [3 -> ...]
let mut swarms = build_connected_nodes_with_config(3, 1, cfg);
// The peers and their addresses for which we expect `RoutablePeer` events.
@ -1271,7 +1255,7 @@ fn manual_bucket_inserts() {
for (_, swarm) in swarms.iter_mut() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::RoutablePeer {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::RoutablePeer {
peer,
address,
}))) => {
@ -1303,7 +1287,7 @@ fn network_behaviour_on_address_change() {
let old_address: Multiaddr = Protocol::Memory(1).into();
let new_address: Multiaddr = Protocol::Memory(2).into();
let mut kademlia = Kademlia::new(local_peer_id, MemoryStore::new(local_peer_id));
let mut kademlia = Behaviour::new(local_peer_id, MemoryStore::new(local_peer_id));
let endpoint = ConnectedPoint::Dialer {
address: old_address.clone(),
@ -1337,7 +1321,7 @@ fn network_behaviour_on_address_change() {
kademlia.on_connection_handler_event(
remote_peer_id,
connection_id,
KademliaHandlerEvent::ProtocolConfirmed { endpoint },
HandlerEvent::ProtocolConfirmed { endpoint },
);
assert_eq!(
@ -1389,7 +1373,7 @@ fn get_providers_single() {
block_on(async {
match single_swarm.next().await.unwrap() {
SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
result: QueryResult::StartProviding(Ok(_)),
..
}) => {}
@ -1403,7 +1387,7 @@ fn get_providers_single() {
block_on(async {
loop {
match single_swarm.next().await.unwrap() {
SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
SwarmEvent::Behaviour(Event::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(ok)),
step: index,
@ -1469,7 +1453,7 @@ fn get_providers_limit<const N: usize>() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
Event::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(ok)),
step: index,

View File

@ -20,8 +20,7 @@
use crate::behaviour::Mode;
use crate::protocol::{
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
KademliaProtocolConfig,
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, ProtocolConfig,
};
use crate::record_priv::{self, Record};
use crate::QueryId;
@ -54,9 +53,9 @@ const MAX_NUM_SUBSTREAMS: usize = 32;
/// make.
///
/// It also handles requests made by the remote.
pub struct KademliaHandler {
pub struct Handler {
/// Configuration of the wire protocol.
protocol_config: KademliaProtocolConfig,
protocol_config: ProtocolConfig,
/// In client mode, we don't accept inbound substreams.
mode: Mode,
@ -126,7 +125,7 @@ enum OutboundSubstreamState {
// TODO: add timeout
WaitingAnswer(KadOutStreamSink<Stream>, QueryId),
/// An error happened on the substream and we should report the error to the user.
ReportError(KademliaHandlerQueryErr, QueryId),
ReportError(HandlerQueryErr, QueryId),
/// The substream is being closed.
Closing(KadOutStreamSink<Stream>),
/// The substream is complete and will not perform any more work.
@ -143,7 +142,7 @@ enum InboundSubstreamState {
connection_id: UniqueConnecId,
substream: KadInStreamSink<Stream>,
},
/// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response.
/// Waiting for the behaviour to send a [`HandlerIn`] event containing the response.
WaitingBehaviour(UniqueConnecId, KadInStreamSink<Stream>, Option<Waker>),
/// Waiting to send an answer back to the remote.
PendingSend(UniqueConnecId, KadInStreamSink<Stream>, KadResponseMsg),
@ -162,7 +161,7 @@ enum InboundSubstreamState {
impl InboundSubstreamState {
fn try_answer_with(
&mut self,
id: KademliaRequestId,
id: RequestId,
msg: KadResponseMsg,
) -> Result<(), KadResponseMsg> {
match std::mem::replace(
@ -214,7 +213,7 @@ impl InboundSubstreamState {
/// Event produced by the Kademlia handler.
#[derive(Debug)]
pub enum KademliaHandlerEvent {
pub enum HandlerEvent {
/// The configured protocol name has been confirmed by the peer through
/// a successfully negotiated substream or by learning the supported protocols of the remote.
ProtocolConfirmed { endpoint: ConnectedPoint },
@ -228,10 +227,10 @@ pub enum KademliaHandlerEvent {
/// The key for which to locate the closest nodes.
key: Vec<u8>,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
request_id: RequestId,
},
/// Response to an `KademliaHandlerIn::FindNodeReq`.
/// Response to an `HandlerIn::FindNodeReq`.
FindNodeRes {
/// Results of the request.
closer_peers: Vec<KadPeer>,
@ -245,10 +244,10 @@ pub enum KademliaHandlerEvent {
/// The key for which providers are requested.
key: record_priv::Key,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
request_id: RequestId,
},
/// Response to an `KademliaHandlerIn::GetProvidersReq`.
/// Response to an `HandlerIn::GetProvidersReq`.
GetProvidersRes {
/// Nodes closest to the key.
closer_peers: Vec<KadPeer>,
@ -261,7 +260,7 @@ pub enum KademliaHandlerEvent {
/// An error happened when performing a query.
QueryError {
/// The error that happened.
error: KademliaHandlerQueryErr,
error: HandlerQueryErr,
/// The user data passed to the query.
query_id: QueryId,
},
@ -279,10 +278,10 @@ pub enum KademliaHandlerEvent {
/// Key for which we should look in the dht
key: record_priv::Key,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
request_id: RequestId,
},
/// Response to a `KademliaHandlerIn::GetRecord`.
/// Response to a `HandlerIn::GetRecord`.
GetRecordRes {
/// The result is present if the key has been found
record: Option<Record>,
@ -296,7 +295,7 @@ pub enum KademliaHandlerEvent {
PutRecord {
record: Record,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
request_id: RequestId,
},
/// Response to a request to store a record.
@ -312,7 +311,7 @@ pub enum KademliaHandlerEvent {
/// Error that can happen when requesting an RPC query.
#[derive(Debug)]
pub enum KademliaHandlerQueryErr {
pub enum HandlerQueryErr {
/// Error while trying to perform the query.
Upgrade(StreamUpgradeError<io::Error>),
/// Received an answer that doesn't correspond to the request.
@ -321,44 +320,44 @@ pub enum KademliaHandlerQueryErr {
Io(io::Error),
}
impl fmt::Display for KademliaHandlerQueryErr {
impl fmt::Display for HandlerQueryErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
KademliaHandlerQueryErr::Upgrade(err) => {
HandlerQueryErr::Upgrade(err) => {
write!(f, "Error while performing Kademlia query: {err}")
}
KademliaHandlerQueryErr::UnexpectedMessage => {
HandlerQueryErr::UnexpectedMessage => {
write!(
f,
"Remote answered our Kademlia RPC query with the wrong message type"
)
}
KademliaHandlerQueryErr::Io(err) => {
HandlerQueryErr::Io(err) => {
write!(f, "I/O error during a Kademlia RPC query: {err}")
}
}
}
}
impl error::Error for KademliaHandlerQueryErr {
impl error::Error for HandlerQueryErr {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
KademliaHandlerQueryErr::Upgrade(err) => Some(err),
KademliaHandlerQueryErr::UnexpectedMessage => None,
KademliaHandlerQueryErr::Io(err) => Some(err),
HandlerQueryErr::Upgrade(err) => Some(err),
HandlerQueryErr::UnexpectedMessage => None,
HandlerQueryErr::Io(err) => Some(err),
}
}
}
impl From<StreamUpgradeError<io::Error>> for KademliaHandlerQueryErr {
impl From<StreamUpgradeError<io::Error>> for HandlerQueryErr {
fn from(err: StreamUpgradeError<io::Error>) -> Self {
KademliaHandlerQueryErr::Upgrade(err)
HandlerQueryErr::Upgrade(err)
}
}
/// Event to send to the handler.
#[derive(Debug)]
pub enum KademliaHandlerIn {
pub enum HandlerIn {
/// Resets the (sub)stream associated with the given request ID,
/// thus signaling an error to the remote.
///
@ -366,7 +365,7 @@ pub enum KademliaHandlerIn {
/// can be used as an alternative to letting requests simply time
/// out on the remote peer, thus potentially avoiding some delay
/// for the query on the remote.
Reset(KademliaRequestId),
Reset(RequestId),
/// Change the connection to the specified mode.
ReconfigureMode { new_mode: Mode },
@ -387,7 +386,7 @@ pub enum KademliaHandlerIn {
/// Identifier of the request that was made by the remote.
///
/// It is a logic error to use an id of the handler of a different node.
request_id: KademliaRequestId,
request_id: RequestId,
},
/// Same as `FindNodeReq`, but should also return the entries of the local providers list for
@ -408,7 +407,7 @@ pub enum KademliaHandlerIn {
/// Identifier of the request that was made by the remote.
///
/// It is a logic error to use an id of the handler of a different node.
request_id: KademliaRequestId,
request_id: RequestId,
},
/// Indicates that this provider is known for this key.
@ -437,7 +436,7 @@ pub enum KademliaHandlerIn {
/// Nodes that are closer to the key we were searching for.
closer_peers: Vec<KadPeer>,
/// Identifier of the request that was made by the remote.
request_id: KademliaRequestId,
request_id: RequestId,
},
/// Put a value into the dht records.
@ -454,14 +453,14 @@ pub enum KademliaHandlerIn {
/// Value that was put.
value: Vec<u8>,
/// Identifier of the request that was made by the remote.
request_id: KademliaRequestId,
request_id: RequestId,
},
}
/// Unique identifier for a request. Must be passed back in order to answer a request from
/// the remote.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct KademliaRequestId {
pub struct RequestId {
/// Unique identifier for an incoming connection.
connec_unique_id: UniqueConnecId,
}
@ -470,9 +469,9 @@ pub struct KademliaRequestId {
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct UniqueConnecId(u64);
impl KademliaHandler {
impl Handler {
pub fn new(
protocol_config: KademliaProtocolConfig,
protocol_config: ProtocolConfig,
idle_timeout: Duration,
endpoint: ConnectedPoint,
remote_peer_id: PeerId,
@ -494,7 +493,7 @@ impl KademliaHandler {
let keep_alive = KeepAlive::Until(Instant::now() + idle_timeout);
KademliaHandler {
Handler {
protocol_config,
mode,
idle_timeout,
@ -612,12 +611,12 @@ impl KademliaHandler {
}
}
impl ConnectionHandler for KademliaHandler {
type FromBehaviour = KademliaHandlerIn;
type ToBehaviour = KademliaHandlerEvent;
impl ConnectionHandler for Handler {
type FromBehaviour = HandlerIn;
type ToBehaviour = HandlerEvent;
type Error = io::Error; // TODO: better error type?
type InboundProtocol = Either<KademliaProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = KademliaProtocolConfig;
type InboundProtocol = Either<ProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = ProtocolConfig;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
@ -628,9 +627,9 @@ impl ConnectionHandler for KademliaHandler {
}
}
fn on_behaviour_event(&mut self, message: KademliaHandlerIn) {
fn on_behaviour_event(&mut self, message: HandlerIn) {
match message {
KademliaHandlerIn::Reset(request_id) => {
HandlerIn::Reset(request_id) => {
if let Some(state) = self
.inbound_substreams
.iter_mut()
@ -644,19 +643,19 @@ impl ConnectionHandler for KademliaHandler {
state.close();
}
}
KademliaHandlerIn::FindNodeReq { key, query_id } => {
HandlerIn::FindNodeReq { key, query_id } => {
let msg = KadRequestMsg::FindNode { key };
self.pending_messages.push_back((msg, Some(query_id)));
}
KademliaHandlerIn::FindNodeRes {
HandlerIn::FindNodeRes {
closer_peers,
request_id,
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
KademliaHandlerIn::GetProvidersReq { key, query_id } => {
HandlerIn::GetProvidersReq { key, query_id } => {
let msg = KadRequestMsg::GetProviders { key };
self.pending_messages.push_back((msg, Some(query_id)));
}
KademliaHandlerIn::GetProvidersRes {
HandlerIn::GetProvidersRes {
closer_peers,
provider_peers,
request_id,
@ -667,19 +666,19 @@ impl ConnectionHandler for KademliaHandler {
provider_peers,
},
),
KademliaHandlerIn::AddProvider { key, provider } => {
HandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.pending_messages.push_back((msg, None));
}
KademliaHandlerIn::GetRecord { key, query_id } => {
HandlerIn::GetRecord { key, query_id } => {
let msg = KadRequestMsg::GetValue { key };
self.pending_messages.push_back((msg, Some(query_id)));
}
KademliaHandlerIn::PutRecord { record, query_id } => {
HandlerIn::PutRecord { record, query_id } => {
let msg = KadRequestMsg::PutValue { record };
self.pending_messages.push_back((msg, Some(query_id)));
}
KademliaHandlerIn::GetRecordRes {
HandlerIn::GetRecordRes {
record,
closer_peers,
request_id,
@ -692,14 +691,14 @@ impl ConnectionHandler for KademliaHandler {
},
);
}
KademliaHandlerIn::PutRecordRes {
HandlerIn::PutRecordRes {
key,
request_id,
value,
} => {
self.answer_pending_request(request_id, KadResponseMsg::PutValue { key, value });
}
KademliaHandlerIn::ReconfigureMode { new_mode } => {
HandlerIn::ReconfigureMode { new_mode } => {
let peer = self.remote_peer_id;
match &self.endpoint {
@ -736,7 +735,7 @@ impl ConnectionHandler for KademliaHandler {
if let ProtocolStatus::Confirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Reported;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::ProtocolConfirmed {
HandlerEvent::ProtocolConfirmed {
endpoint: self.endpoint.clone(),
},
));
@ -833,8 +832,8 @@ impl ConnectionHandler for KademliaHandler {
}
}
impl KademliaHandler {
fn answer_pending_request(&mut self, request_id: KademliaRequestId, mut msg: KadResponseMsg) {
impl Handler {
fn answer_pending_request(&mut self, request_id: RequestId, mut msg: KadResponseMsg) {
for state in self.inbound_substreams.iter_mut() {
match state.try_answer_with(request_id, msg) {
Ok(()) => return,
@ -849,7 +848,7 @@ impl KademliaHandler {
}
impl futures::Stream for OutboundSubstreamState {
type Item = ConnectionHandlerEvent<KademliaProtocolConfig, (), KademliaHandlerEvent, io::Error>;
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
@ -866,8 +865,8 @@ impl futures::Stream for OutboundSubstreamState {
*this = OutboundSubstreamState::Done;
let event = query_id.map(|query_id| {
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
HandlerEvent::QueryError {
error: HandlerQueryErr::Io(error),
query_id,
},
)
@ -883,12 +882,10 @@ impl futures::Stream for OutboundSubstreamState {
Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done;
let event = query_id.map(|query_id| {
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError {
error: HandlerQueryErr::Io(error),
query_id,
},
)
})
});
return Poll::Ready(event);
@ -911,12 +908,10 @@ impl futures::Stream for OutboundSubstreamState {
Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done;
let event = query_id.map(|query_id| {
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError {
error: HandlerQueryErr::Io(error),
query_id,
},
)
})
});
return Poll::Ready(event);
@ -939,8 +934,8 @@ impl futures::Stream for OutboundSubstreamState {
}
Poll::Ready(Some(Err(error))) => {
*this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
let event = HandlerEvent::QueryError {
error: HandlerQueryErr::Io(error),
query_id,
};
@ -950,10 +945,8 @@ impl futures::Stream for OutboundSubstreamState {
}
Poll::Ready(None) => {
*this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(
io::ErrorKind::UnexpectedEof.into(),
),
let event = HandlerEvent::QueryError {
error: HandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()),
query_id,
};
@ -965,7 +958,7 @@ impl futures::Stream for OutboundSubstreamState {
}
OutboundSubstreamState::ReportError(error, query_id) => {
*this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::QueryError { error, query_id };
let event = HandlerEvent::QueryError { error, query_id };
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event)));
}
@ -987,7 +980,7 @@ impl futures::Stream for OutboundSubstreamState {
}
impl futures::Stream for InboundSubstreamState {
type Item = ConnectionHandlerEvent<KademliaProtocolConfig, (), KademliaHandlerEvent, io::Error>;
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
@ -1013,9 +1006,9 @@ impl futures::Stream for InboundSubstreamState {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::FindNodeReq {
HandlerEvent::FindNodeReq {
key,
request_id: KademliaRequestId {
request_id: RequestId {
connec_unique_id: connection_id,
},
},
@ -1025,9 +1018,9 @@ impl futures::Stream for InboundSubstreamState {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetProvidersReq {
HandlerEvent::GetProvidersReq {
key,
request_id: KademliaRequestId {
request_id: RequestId {
connec_unique_id: connection_id,
},
},
@ -1040,16 +1033,16 @@ impl futures::Stream for InboundSubstreamState {
substream,
};
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::AddProvider { key, provider },
HandlerEvent::AddProvider { key, provider },
)));
}
Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetRecord {
HandlerEvent::GetRecord {
key,
request_id: KademliaRequestId {
request_id: RequestId {
connec_unique_id: connection_id,
},
},
@ -1059,9 +1052,9 @@ impl futures::Stream for InboundSubstreamState {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::PutRecord {
HandlerEvent::PutRecord {
record,
request_id: KademliaRequestId {
request_id: RequestId {
connec_unique_id: connection_id,
},
},
@ -1138,24 +1131,24 @@ impl futures::Stream for InboundSubstreamState {
}
/// Process a Kademlia message that's supposed to be a response to one of our requests.
fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> KademliaHandlerEvent {
fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> HandlerEvent {
// TODO: must check that the response corresponds to the request
match event {
KadResponseMsg::Pong => {
// We never send out pings.
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::UnexpectedMessage,
HandlerEvent::QueryError {
error: HandlerQueryErr::UnexpectedMessage,
query_id,
}
}
KadResponseMsg::FindNode { closer_peers } => KademliaHandlerEvent::FindNodeRes {
KadResponseMsg::FindNode { closer_peers } => HandlerEvent::FindNodeRes {
closer_peers,
query_id,
},
KadResponseMsg::GetProviders {
closer_peers,
provider_peers,
} => KademliaHandlerEvent::GetProvidersRes {
} => HandlerEvent::GetProvidersRes {
closer_peers,
provider_peers,
query_id,
@ -1163,12 +1156,12 @@ fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> KademliaHan
KadResponseMsg::GetValue {
record,
closer_peers,
} => KademliaHandlerEvent::GetRecordRes {
} => HandlerEvent::GetRecordRes {
record,
closer_peers,
query_id,
},
KadResponseMsg::PutValue { key, value, .. } => KademliaHandlerEvent::PutRecordRes {
KadResponseMsg::PutValue { key, value, .. } => HandlerEvent::PutRecordRes {
key,
value,
query_id,

View File

@ -74,10 +74,10 @@ use std::vec;
/// The maximum number of queries towards which background jobs
/// are allowed to start new queries on an invocation of
/// `Kademlia::poll`.
/// `Behaviour::poll`.
pub(crate) const JOBS_MAX_QUERIES: usize = 100;
/// The maximum number of new queries started by a background job
/// per invocation of `Kademlia::poll`.
/// per invocation of `Behaviour::poll`.
pub(crate) const JOBS_MAX_NEW_QUERIES: usize = 10;
/// A background job run periodically.
#[derive(Debug)]

View File

@ -26,7 +26,7 @@
//! [Identify](https://github.com/libp2p/specs/tree/master/identify) protocol might be seen as a core protocol. Rust-libp2p
//! tries to stay as generic as possible, and does not make this assumption.
//! This means that the Identify protocol must be manually hooked up to Kademlia through calls
//! to [`Kademlia::add_address`].
//! to [`Behaviour::add_address`].
//! If you choose not to use the Identify protocol, and do not provide an alternative peer
//! discovery mechanism, a Kademlia node will not discover nodes beyond the network's
//! [boot nodes](https://docs.libp2p.io/concepts/glossary/#boot-node). Without the Identify protocol,
@ -73,11 +73,10 @@ pub use behaviour::{
QueryResult, QueryStats, RoutingUpdate,
};
pub use behaviour::{
Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent,
KademliaStoreInserts, ProgressStep, Quorum,
Behaviour, BucketInserts, Caching, Config, Event, ProgressStep, Quorum, StoreInserts,
};
pub use kbucket::{Distance as KBucketDistance, EntryView, KBucketRef, Key as KBucketKey};
pub use protocol::KadConnectionType;
pub use protocol::ConnectionType;
pub use query::QueryId;
pub use record_priv::{store, Key as RecordKey, ProviderRecord, Record};
@ -115,3 +114,30 @@ pub const PROTOCOL_NAME: StreamProtocol = protocol::DEFAULT_PROTO_NAME;
/// Constant shared across tests for the [`Multihash`](libp2p_core::multihash::Multihash) type.
#[cfg(test)]
const SHA_256_MH: u64 = 0x12;
#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Behaviour`.")]
pub type Kademlia<TStore> = Behaviour<TStore>;
#[deprecated(
note = "Import the `kad` module instead and refer to this type as `kad::BucketInserts`."
)]
pub type KademliaBucketInserts = BucketInserts;
#[deprecated(
note = "Import the `kad` module instead and refer to this type as `kad::StoreInserts`."
)]
pub type KademliaStoreInserts = StoreInserts;
#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Config`.")]
pub type KademliaConfig = Config;
#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Caching`.")]
pub type KademliaCaching = Caching;
#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Event`.")]
pub type KademliaEvent = Event;
#[deprecated(
note = "Import the `kad` module instead and refer to this type as `kad::ConnectionType`."
)]
pub type KadConnectionType = ConnectionType;

View File

@ -20,7 +20,7 @@
//! The Kademlia connection protocol upgrade and associated message types.
//!
//! The connection protocol upgrade is provided by [`KademliaProtocolConfig`], with the
//! The connection protocol upgrade is provided by [`ProtocolConfig`], with the
//! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively.
//! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used
//! to poll the underlying transport for incoming messages, and the `Sink` component
@ -46,7 +46,7 @@ pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs
pub(crate) const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
/// Status of our connection to a node reported by the Kademlia protocol.
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub enum KadConnectionType {
pub enum ConnectionType {
/// Sender hasn't tried to connect to peer.
NotConnected = 0,
/// Sender is currently connected to peer.
@ -57,26 +57,26 @@ pub enum KadConnectionType {
CannotConnect = 3,
}
impl From<proto::ConnectionType> for KadConnectionType {
fn from(raw: proto::ConnectionType) -> KadConnectionType {
impl From<proto::ConnectionType> for ConnectionType {
fn from(raw: proto::ConnectionType) -> ConnectionType {
use proto::ConnectionType::*;
match raw {
NOT_CONNECTED => KadConnectionType::NotConnected,
CONNECTED => KadConnectionType::Connected,
CAN_CONNECT => KadConnectionType::CanConnect,
CANNOT_CONNECT => KadConnectionType::CannotConnect,
NOT_CONNECTED => ConnectionType::NotConnected,
CONNECTED => ConnectionType::Connected,
CAN_CONNECT => ConnectionType::CanConnect,
CANNOT_CONNECT => ConnectionType::CannotConnect,
}
}
}
impl From<KadConnectionType> for proto::ConnectionType {
fn from(val: KadConnectionType) -> Self {
impl From<ConnectionType> for proto::ConnectionType {
fn from(val: ConnectionType) -> Self {
use proto::ConnectionType::*;
match val {
KadConnectionType::NotConnected => NOT_CONNECTED,
KadConnectionType::Connected => CONNECTED,
KadConnectionType::CanConnect => CAN_CONNECT,
KadConnectionType::CannotConnect => CANNOT_CONNECT,
ConnectionType::NotConnected => NOT_CONNECTED,
ConnectionType::Connected => CONNECTED,
ConnectionType::CanConnect => CAN_CONNECT,
ConnectionType::CannotConnect => CANNOT_CONNECT,
}
}
}
@ -89,7 +89,7 @@ pub struct KadPeer {
/// The multiaddresses that the sender think can be used in order to reach the peer.
pub multiaddrs: Vec<Multiaddr>,
/// How the sender is connected to that remote.
pub connection_ty: KadConnectionType,
pub connection_ty: ConnectionType,
}
// Builds a `KadPeer` from a corresponding protobuf message.
@ -135,13 +135,13 @@ impl From<KadPeer> for proto::Peer {
// only one request, then we can change the output of the `InboundUpgrade` and
// `OutboundUpgrade` to be just a single message
#[derive(Debug, Clone)]
pub struct KademliaProtocolConfig {
pub struct ProtocolConfig {
protocol_names: Vec<StreamProtocol>,
/// Maximum allowed size of a packet.
max_packet_size: usize,
}
impl KademliaProtocolConfig {
impl ProtocolConfig {
/// Returns the configured protocol name.
pub fn protocol_names(&self) -> &[StreamProtocol] {
&self.protocol_names
@ -159,16 +159,16 @@ impl KademliaProtocolConfig {
}
}
impl Default for KademliaProtocolConfig {
impl Default for ProtocolConfig {
fn default() -> Self {
KademliaProtocolConfig {
ProtocolConfig {
protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(),
max_packet_size: DEFAULT_MAX_PACKET_SIZE,
}
}
}
impl UpgradeInfo for KademliaProtocolConfig {
impl UpgradeInfo for ProtocolConfig {
type Info = StreamProtocol;
type InfoIter = std::vec::IntoIter<Self::Info>;
@ -213,7 +213,7 @@ pub(crate) type KadInStreamSink<S> = Framed<S, Codec<KadResponseMsg, KadRequestM
/// Sink of requests and stream of responses.
pub(crate) type KadOutStreamSink<S> = Framed<S, Codec<KadRequestMsg, KadResponseMsg>>;
impl<C> InboundUpgrade<C> for KademliaProtocolConfig
impl<C> InboundUpgrade<C> for ProtocolConfig
where
C: AsyncRead + AsyncWrite + Unpin,
{
@ -228,7 +228,7 @@ where
}
}
impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
impl<C> OutboundUpgrade<C> for ProtocolConfig
where
C: AsyncRead + AsyncWrite + Unpin,
{
@ -624,7 +624,7 @@ mod tests {
use futures::{Future, Sink, Stream};
use libp2p_core::{PeerId, PublicKey, Transport};
use multihash::{encode, Hash};
use protocol::{KadConnectionType, KadPeer, KademliaProtocolConfig};
use protocol::{ConnectionType, KadPeer, ProtocolConfig};
use std::sync::mpsc;
use std::thread;
@ -641,7 +641,7 @@ mod tests {
closer_peers: vec![KadPeer {
node_id: PeerId::random(),
multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
connection_ty: KadConnectionType::Connected,
connection_ty: ConnectionType::Connected,
}],
});
test_one(KadMsg::GetProvidersReq {
@ -651,12 +651,12 @@ mod tests {
closer_peers: vec![KadPeer {
node_id: PeerId::random(),
multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
connection_ty: KadConnectionType::Connected,
connection_ty: ConnectionType::Connected,
}],
provider_peers: vec![KadPeer {
node_id: PeerId::random(),
multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()],
connection_ty: KadConnectionType::NotConnected,
connection_ty: ConnectionType::NotConnected,
}],
});
test_one(KadMsg::AddProvider {
@ -664,7 +664,7 @@ mod tests {
provider_peer: KadPeer {
node_id: PeerId::random(),
multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()],
connection_ty: KadConnectionType::Connected,
connection_ty: ConnectionType::Connected,
},
});
// TODO: all messages
@ -674,7 +674,7 @@ mod tests {
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let transport = TcpTransport::default().with_upgrade(KademliaProtocolConfig);
let transport = TcpTransport::default().with_upgrade(ProtocolConfig);
let (listener, addr) = transport
.listen_on( "/ip4/127.0.0.1/tcp/0".parse().unwrap())
@ -694,7 +694,7 @@ mod tests {
let _ = rt.block_on(future).unwrap();
});
let transport = TcpTransport::default().with_upgrade(KademliaProtocolConfig);
let transport = TcpTransport::default().with_upgrade(ProtocolConfig);
let future = transport
.dial(rx.recv().unwrap())

View File

@ -230,19 +230,19 @@ pub struct QueryId(usize);
pub(crate) struct QueryConfig {
/// Timeout of a single query.
///
/// See [`crate::behaviour::KademliaConfig::set_query_timeout`] for details.
/// See [`crate::behaviour::Config::set_query_timeout`] for details.
pub(crate) timeout: Duration,
/// The replication factor to use.
///
/// See [`crate::behaviour::KademliaConfig::set_replication_factor`] for details.
/// See [`crate::behaviour::Config::set_replication_factor`] for details.
pub(crate) replication_factor: NonZeroUsize,
/// Allowed level of parallelism for iterative queries.
///
/// See [`crate::behaviour::KademliaConfig::set_parallelism`] for details.
/// See [`crate::behaviour::Config::set_parallelism`] for details.
pub(crate) parallelism: NonZeroUsize,
/// Whether to use disjoint paths on iterative lookups.
///
/// See [`crate::behaviour::KademliaConfig::disjoint_query_paths`] for details.
/// See [`crate::behaviour::Config::disjoint_query_paths`] for details.
pub(crate) disjoint_query_paths: bool,
}

View File

@ -1,7 +1,7 @@
use libp2p_identify as identify;
use libp2p_identity as identity;
use libp2p_kad::store::MemoryStore;
use libp2p_kad::{Kademlia, KademliaConfig, KademliaEvent, Mode};
use libp2p_kad::{Behaviour, Config, Event, Mode};
use libp2p_swarm::Swarm;
use libp2p_swarm_test::SwarmExt;
@ -19,7 +19,7 @@ async fn server_gets_added_to_routing_table_by_client() {
match libp2p_swarm_test::drive(&mut client, &mut server).await {
(
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(KademliaEvent::RoutingUpdated { peer, .. })],
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(Event::RoutingUpdated { peer, .. })],
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_)],
) => {
assert_eq!(peer, server_peer_id)
@ -41,7 +41,7 @@ async fn two_servers_add_each_other_to_routing_table() {
let server1_peer_id = *server1.local_peer_id();
let server2_peer_id = *server2.local_peer_id();
use KademliaEvent::*;
use Event::*;
use MyBehaviourEvent::*;
match libp2p_swarm_test::drive(&mut server1, &mut server2).await {
@ -94,7 +94,7 @@ async fn adding_an_external_addresses_activates_server_mode_on_existing_connecti
other => panic!("Unexpected events: {other:?}"),
}
use KademliaEvent::*;
use Event::*;
// Server learns its external address (this could be through AutoNAT or some other mechanism).
server.add_external_address(memory_addr);
@ -127,7 +127,7 @@ async fn set_client_to_server_mode() {
match libp2p_swarm_test::drive(&mut client, &mut server).await {
(
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(KademliaEvent::RoutingUpdated { peer, .. })],
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(Event::RoutingUpdated { peer, .. })],
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(identify::Event::Received { info, .. })],
) => {
assert_eq!(peer, server_peer_id);
@ -159,7 +159,7 @@ async fn set_client_to_server_mode() {
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct MyBehaviour {
identify: identify::Behaviour,
kad: Kademlia<MemoryStore>,
kad: Behaviour<MemoryStore>,
}
impl MyBehaviour {
@ -171,10 +171,10 @@ impl MyBehaviour {
"/test/1.0.0".to_owned(),
k.public(),
)),
kad: Kademlia::with_config(
kad: Behaviour::with_config(
local_peer_id,
MemoryStore::new(local_peer_id),
KademliaConfig::default(),
Config::default(),
),
}
}

View File

@ -98,7 +98,7 @@ fn three_fields() {
struct Foo {
ping: ping::Behaviour,
identify: identify::Behaviour,
kad: libp2p_kad::Kademlia<libp2p_kad::record::store::MemoryStore>,
kad: libp2p_kad::Behaviour<libp2p_kad::record::store::MemoryStore>,
}
#[allow(
@ -115,7 +115,7 @@ fn three_fields() {
let _: identify::Event = event;
}
FooEvent::Kad(event) => {
let _: libp2p_kad::KademliaEvent = event;
let _: libp2p_kad::Event = event;
}
}
}
@ -327,7 +327,7 @@ fn with_either() {
#[derive(NetworkBehaviour)]
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct Foo {
kad: libp2p_kad::Kademlia<libp2p_kad::record::store::MemoryStore>,
kad: libp2p_kad::Behaviour<libp2p_kad::record::store::MemoryStore>,
ping_or_identify: Either<ping::Behaviour, identify::Behaviour>,
}
@ -351,7 +351,7 @@ fn with_generics() {
fn foo() {
require_net_behaviour::<
Foo<
libp2p_kad::Kademlia<libp2p_kad::record::store::MemoryStore>,
libp2p_kad::Behaviour<libp2p_kad::record::store::MemoryStore>,
libp2p_ping::Behaviour,
>,
>();
@ -370,7 +370,7 @@ fn with_generics_mixed() {
#[allow(dead_code)]
fn foo() {
require_net_behaviour::<Foo<libp2p_kad::Kademlia<libp2p_kad::record::store::MemoryStore>>>(
require_net_behaviour::<Foo<libp2p_kad::Behaviour<libp2p_kad::record::store::MemoryStore>>>(
);
}
}
@ -381,12 +381,12 @@ fn custom_event_with_either() {
#[allow(clippy::large_enum_variant)]
enum BehaviourOutEvent {
Kad(libp2p_kad::KademliaEvent),
Kad(libp2p_kad::Event),
PingOrIdentify(Either<ping::Event, identify::Event>),
}
impl From<libp2p_kad::KademliaEvent> for BehaviourOutEvent {
fn from(event: libp2p_kad::KademliaEvent) -> Self {
impl From<libp2p_kad::Event> for BehaviourOutEvent {
fn from(event: libp2p_kad::Event) -> Self {
BehaviourOutEvent::Kad(event)
}
}
@ -404,7 +404,7 @@ fn custom_event_with_either() {
prelude = "libp2p_swarm::derive_prelude"
)]
struct Foo {
kad: libp2p_kad::Kademlia<libp2p_kad::record::store::MemoryStore>,
kad: libp2p_kad::Behaviour<libp2p_kad::record::store::MemoryStore>,
ping_or_identify: Either<ping::Behaviour, identify::Behaviour>,
}