mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-22 14:21:33 +00:00
protocols/kad: Extend emitted events with basic information (#2087)
* Expose kbucket range on RoutingUpdated. * Expose inbound request information. * Expose whether routing update is new peer.
This commit is contained in:
@ -98,7 +98,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
// Called when `kademlia` produces an event.
|
// Called when `kademlia` produces an event.
|
||||||
fn inject_event(&mut self, message: KademliaEvent) {
|
fn inject_event(&mut self, message: KademliaEvent) {
|
||||||
match message {
|
match message {
|
||||||
KademliaEvent::QueryResult { result, .. } => match result {
|
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
|
||||||
QueryResult::GetProviders(Ok(ok)) => {
|
QueryResult::GetProviders(Ok(ok)) => {
|
||||||
for peer in ok.providers {
|
for peer in ok.providers {
|
||||||
println!(
|
println!(
|
||||||
|
@ -93,7 +93,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
task::block_on(async move {
|
task::block_on(async move {
|
||||||
loop {
|
loop {
|
||||||
let event = swarm.select_next_some().await;
|
let event = swarm.select_next_some().await;
|
||||||
if let SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
result: QueryResult::GetClosestPeers(result),
|
result: QueryResult::GetClosestPeers(result),
|
||||||
..
|
..
|
||||||
}) = event {
|
}) = event {
|
||||||
|
@ -2,6 +2,17 @@
|
|||||||
|
|
||||||
- Update dependencies.
|
- Update dependencies.
|
||||||
|
|
||||||
|
- Expose inbound request information (see [PR 2087]). Note:
|
||||||
|
`KademliaEvent::QueryResult` is renamed to
|
||||||
|
`KademliaEvent::OutboundQueryCompleted`.
|
||||||
|
|
||||||
|
- Expose whether `KademliaEvent::RoutingUpdated` is triggered with new peer (see
|
||||||
|
[PR 2087]).
|
||||||
|
|
||||||
|
- Expose kbucket range on `KademliaEvent::RoutingUpdated` (see [PR 2087]).
|
||||||
|
|
||||||
|
[PR 2087]: https://github.com/libp2p/rust-libp2p/pull/2087
|
||||||
|
|
||||||
# 0.30.0 [2021-04-13]
|
# 0.30.0 [2021-04-13]
|
||||||
|
|
||||||
- Update `libp2p-swarm`.
|
- Update `libp2p-swarm`.
|
||||||
|
@ -32,7 +32,7 @@ use crate::handler::{
|
|||||||
KademliaHandlerIn
|
KademliaHandlerIn
|
||||||
};
|
};
|
||||||
use crate::jobs::*;
|
use crate::jobs::*;
|
||||||
use crate::kbucket::{self, KBucketsTable, NodeStatus};
|
use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
|
||||||
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
|
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
|
||||||
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
|
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
|
||||||
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
|
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
|
||||||
@ -469,8 +469,13 @@ where
|
|||||||
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
KademliaEvent::RoutingUpdated {
|
KademliaEvent::RoutingUpdated {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
|
is_new_peer: false,
|
||||||
addresses: entry.value().clone(),
|
addresses: entry.value().clone(),
|
||||||
old_peer: None,
|
old_peer: None,
|
||||||
|
bucket_range: self.kbuckets
|
||||||
|
.bucket(&key)
|
||||||
|
.map(|b| b.range())
|
||||||
|
.expect("Not kbucket::Entry::SelfEntry."),
|
||||||
}
|
}
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
@ -493,8 +498,13 @@ where
|
|||||||
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
KademliaEvent::RoutingUpdated {
|
KademliaEvent::RoutingUpdated {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
|
is_new_peer: true,
|
||||||
addresses,
|
addresses,
|
||||||
old_peer: None,
|
old_peer: None,
|
||||||
|
bucket_range: self.kbuckets
|
||||||
|
.bucket(&key)
|
||||||
|
.map(|b| b.range())
|
||||||
|
.expect("Not kbucket::Entry::SelfEntry."),
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
RoutingUpdate::Success
|
RoutingUpdate::Success
|
||||||
@ -593,7 +603,7 @@ where
|
|||||||
/// Initiates an iterative query for the closest peers to the given key.
|
/// Initiates an iterative query for the closest peers to the given key.
|
||||||
///
|
///
|
||||||
/// The result of the query is delivered in a
|
/// The result of the query is delivered in a
|
||||||
/// [`KademliaEvent::QueryResult{QueryResult::GetClosestPeers}`].
|
/// [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetClosestPeers}`].
|
||||||
pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
|
pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
|
||||||
where
|
where
|
||||||
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone
|
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone
|
||||||
@ -608,7 +618,7 @@ where
|
|||||||
/// Performs a lookup for a record in the DHT.
|
/// Performs a lookup for a record in the DHT.
|
||||||
///
|
///
|
||||||
/// The result of this operation is delivered in a
|
/// The result of this operation is delivered in a
|
||||||
/// [`KademliaEvent::QueryResult{QueryResult::GetRecord}`].
|
/// [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetRecord}`].
|
||||||
pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
|
pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
|
||||||
let quorum = quorum.eval(self.queries.config().replication_factor);
|
let quorum = quorum.eval(self.queries.config().replication_factor);
|
||||||
let mut records = Vec::with_capacity(quorum.get());
|
let mut records = Vec::with_capacity(quorum.get());
|
||||||
@ -647,7 +657,7 @@ where
|
|||||||
/// Returns `Ok` if a record has been stored locally, providing the
|
/// Returns `Ok` if a record has been stored locally, providing the
|
||||||
/// `QueryId` of the initial query that replicates the record in the DHT.
|
/// `QueryId` of the initial query that replicates the record in the DHT.
|
||||||
/// The result of the query is eventually reported as a
|
/// The result of the query is eventually reported as a
|
||||||
/// [`KademliaEvent::QueryResult{QueryResult::PutRecord}`].
|
/// [`KademliaEvent::OutboundQueryCompleted{QueryResult::PutRecord}`].
|
||||||
///
|
///
|
||||||
/// The record is always stored locally with the given expiration. If the record's
|
/// The record is always stored locally with the given expiration. If the record's
|
||||||
/// expiration is `None`, the common case, it does not expire in local storage
|
/// expiration is `None`, the common case, it does not expire in local storage
|
||||||
@ -760,7 +770,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
|
/// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
|
||||||
/// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
|
/// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
|
||||||
/// reported via [`KademliaEvent::QueryResult{QueryResult::Bootstrap}`] events,
|
/// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::Bootstrap}`] events,
|
||||||
/// with one such event per bootstrapping query.
|
/// with one such event per bootstrapping query.
|
||||||
///
|
///
|
||||||
/// Returns `Err` if bootstrapping is impossible due an empty routing table.
|
/// Returns `Err` if bootstrapping is impossible due an empty routing table.
|
||||||
@ -803,7 +813,7 @@ where
|
|||||||
/// of the libp2p Kademlia provider API.
|
/// of the libp2p Kademlia provider API.
|
||||||
///
|
///
|
||||||
/// The results of the (repeated) provider announcements sent by this node are
|
/// The results of the (repeated) provider announcements sent by this node are
|
||||||
/// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`].
|
/// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::StartProviding}`].
|
||||||
pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
|
pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
|
||||||
// Note: We store our own provider records locally without local addresses
|
// Note: We store our own provider records locally without local addresses
|
||||||
// to avoid redundant storage and outdated addresses. Instead these are
|
// to avoid redundant storage and outdated addresses. Instead these are
|
||||||
@ -838,7 +848,7 @@ where
|
|||||||
/// Performs a lookup for providers of a value to the given key.
|
/// Performs a lookup for providers of a value to the given key.
|
||||||
///
|
///
|
||||||
/// The result of this operation is delivered in a
|
/// The result of this operation is delivered in a
|
||||||
/// reported via [`KademliaEvent::QueryResult{QueryResult::GetProviders}`].
|
/// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetProviders}`].
|
||||||
pub fn get_providers(&mut self, key: record::Key) -> QueryId {
|
pub fn get_providers(&mut self, key: record::Key) -> QueryId {
|
||||||
let info = QueryInfo::GetProviders {
|
let info = QueryInfo::GetProviders {
|
||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
@ -961,20 +971,25 @@ where
|
|||||||
let key = kbucket::Key::from(peer);
|
let key = kbucket::Key::from(peer);
|
||||||
match self.kbuckets.entry(&key) {
|
match self.kbuckets.entry(&key) {
|
||||||
kbucket::Entry::Present(mut entry, old_status) => {
|
kbucket::Entry::Present(mut entry, old_status) => {
|
||||||
|
if old_status != new_status {
|
||||||
|
entry.update(new_status)
|
||||||
|
}
|
||||||
if let Some(address) = address {
|
if let Some(address) = address {
|
||||||
if entry.value().insert(address) {
|
if entry.value().insert(address) {
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
KademliaEvent::RoutingUpdated {
|
KademliaEvent::RoutingUpdated {
|
||||||
peer,
|
peer,
|
||||||
|
is_new_peer: false,
|
||||||
addresses: entry.value().clone(),
|
addresses: entry.value().clone(),
|
||||||
old_peer: None,
|
old_peer: None,
|
||||||
|
bucket_range: self.kbuckets
|
||||||
|
.bucket(&key)
|
||||||
|
.map(|b| b.range())
|
||||||
|
.expect("Not kbucket::Entry::SelfEntry."),
|
||||||
}
|
}
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if old_status != new_status {
|
|
||||||
entry.update(new_status);
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
|
||||||
kbucket::Entry::Pending(mut entry, old_status) => {
|
kbucket::Entry::Pending(mut entry, old_status) => {
|
||||||
@ -1008,8 +1023,13 @@ where
|
|||||||
kbucket::InsertResult::Inserted => {
|
kbucket::InsertResult::Inserted => {
|
||||||
let event = KademliaEvent::RoutingUpdated {
|
let event = KademliaEvent::RoutingUpdated {
|
||||||
peer,
|
peer,
|
||||||
|
is_new_peer: true,
|
||||||
addresses,
|
addresses,
|
||||||
old_peer: None,
|
old_peer: None,
|
||||||
|
bucket_range: self.kbuckets
|
||||||
|
.bucket(&key)
|
||||||
|
.map(|b| b.range())
|
||||||
|
.expect("Not kbucket::Entry::SelfEntry."),
|
||||||
};
|
};
|
||||||
self.queued_events.push_back(
|
self.queued_events.push_back(
|
||||||
NetworkBehaviourAction::GenerateEvent(event));
|
NetworkBehaviourAction::GenerateEvent(event));
|
||||||
@ -1097,7 +1117,7 @@ where
|
|||||||
self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
|
self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
|
result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
|
||||||
@ -1105,7 +1125,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
QueryInfo::GetClosestPeers { key, .. } => {
|
QueryInfo::GetClosestPeers { key, .. } => {
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::GetClosestPeers(Ok(
|
result: QueryResult::GetClosestPeers(Ok(
|
||||||
@ -1115,7 +1135,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
QueryInfo::GetProviders { key, providers } => {
|
QueryInfo::GetProviders { key, providers } => {
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::GetProviders(Ok(
|
result: QueryResult::GetProviders(Ok(
|
||||||
@ -1155,14 +1175,14 @@ where
|
|||||||
} => {
|
} => {
|
||||||
match context {
|
match context {
|
||||||
AddProviderContext::Publish => {
|
AddProviderContext::Publish => {
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: get_closest_peers_stats.merge(result.stats),
|
stats: get_closest_peers_stats.merge(result.stats),
|
||||||
result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
|
result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
AddProviderContext::Republish => {
|
AddProviderContext::Republish => {
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: get_closest_peers_stats.merge(result.stats),
|
stats: get_closest_peers_stats.merge(result.stats),
|
||||||
result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
|
result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
|
||||||
@ -1200,7 +1220,7 @@ where
|
|||||||
} else {
|
} else {
|
||||||
Err(GetRecordError::QuorumFailed { key, records, quorum })
|
Err(GetRecordError::QuorumFailed { key, records, quorum })
|
||||||
};
|
};
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::GetRecord(results)
|
result: QueryResult::GetRecord(results)
|
||||||
@ -1242,13 +1262,13 @@ where
|
|||||||
};
|
};
|
||||||
match context {
|
match context {
|
||||||
PutRecordContext::Publish | PutRecordContext::Custom =>
|
PutRecordContext::Publish | PutRecordContext::Custom =>
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: get_closest_peers_stats.merge(result.stats),
|
stats: get_closest_peers_stats.merge(result.stats),
|
||||||
result: QueryResult::PutRecord(mk_result(record.key))
|
result: QueryResult::PutRecord(mk_result(record.key))
|
||||||
}),
|
}),
|
||||||
PutRecordContext::Republish =>
|
PutRecordContext::Republish =>
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: get_closest_peers_stats.merge(result.stats),
|
stats: get_closest_peers_stats.merge(result.stats),
|
||||||
result: QueryResult::RepublishRecord(mk_result(record.key))
|
result: QueryResult::RepublishRecord(mk_result(record.key))
|
||||||
@ -1288,7 +1308,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::Bootstrap(Err(
|
result: QueryResult::Bootstrap(Err(
|
||||||
@ -1300,7 +1320,7 @@ where
|
|||||||
QueryInfo::AddProvider { context, key, .. } =>
|
QueryInfo::AddProvider { context, key, .. } =>
|
||||||
Some(match context {
|
Some(match context {
|
||||||
AddProviderContext::Publish =>
|
AddProviderContext::Publish =>
|
||||||
KademliaEvent::QueryResult {
|
KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::StartProviding(Err(
|
result: QueryResult::StartProviding(Err(
|
||||||
@ -1308,7 +1328,7 @@ where
|
|||||||
))
|
))
|
||||||
},
|
},
|
||||||
AddProviderContext::Republish =>
|
AddProviderContext::Republish =>
|
||||||
KademliaEvent::QueryResult {
|
KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::RepublishProvider(Err(
|
result: QueryResult::RepublishProvider(Err(
|
||||||
@ -1318,7 +1338,7 @@ where
|
|||||||
}),
|
}),
|
||||||
|
|
||||||
QueryInfo::GetClosestPeers { key } => {
|
QueryInfo::GetClosestPeers { key } => {
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::GetClosestPeers(Err(
|
result: QueryResult::GetClosestPeers(Err(
|
||||||
@ -1341,13 +1361,13 @@ where
|
|||||||
});
|
});
|
||||||
match context {
|
match context {
|
||||||
PutRecordContext::Publish | PutRecordContext::Custom =>
|
PutRecordContext::Publish | PutRecordContext::Custom =>
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::PutRecord(err)
|
result: QueryResult::PutRecord(err)
|
||||||
}),
|
}),
|
||||||
PutRecordContext::Republish =>
|
PutRecordContext::Republish =>
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::RepublishRecord(err)
|
result: QueryResult::RepublishRecord(err)
|
||||||
@ -1378,7 +1398,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
QueryInfo::GetRecord { key, records, quorum, .. } =>
|
QueryInfo::GetRecord { key, records, quorum, .. } =>
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::GetRecord(Err(
|
result: QueryResult::GetRecord(Err(
|
||||||
@ -1387,7 +1407,7 @@ where
|
|||||||
}),
|
}),
|
||||||
|
|
||||||
QueryInfo::GetProviders { key, providers } =>
|
QueryInfo::GetProviders { key, providers } =>
|
||||||
Some(KademliaEvent::QueryResult {
|
Some(KademliaEvent::OutboundQueryCompleted {
|
||||||
id: query_id,
|
id: query_id,
|
||||||
stats: result.stats,
|
stats: result.stats,
|
||||||
result: QueryResult::GetProviders(Err(
|
result: QueryResult::GetProviders(Err(
|
||||||
@ -1708,6 +1728,13 @@ where
|
|||||||
|
|
||||||
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
|
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
|
||||||
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
|
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
|
||||||
|
|
||||||
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
|
KademliaEvent::InboundRequestServed{ request: InboundRequest::FindNode {
|
||||||
|
num_closer_peers: closer_peers.len(),
|
||||||
|
}}
|
||||||
|
));
|
||||||
|
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: source,
|
peer_id: source,
|
||||||
handler: NotifyHandler::One(connection),
|
handler: NotifyHandler::One(connection),
|
||||||
@ -1728,6 +1755,14 @@ where
|
|||||||
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
|
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
|
||||||
let provider_peers = self.provider_peers(&key, &source);
|
let provider_peers = self.provider_peers(&key, &source);
|
||||||
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
|
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
|
||||||
|
|
||||||
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
|
KademliaEvent::InboundRequestServed{ request: InboundRequest::GetProvider {
|
||||||
|
num_closer_peers: closer_peers.len(),
|
||||||
|
num_provider_peers: provider_peers.len(),
|
||||||
|
}}
|
||||||
|
));
|
||||||
|
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: source,
|
peer_id: source,
|
||||||
handler: NotifyHandler::One(connection),
|
handler: NotifyHandler::One(connection),
|
||||||
@ -1773,7 +1808,11 @@ where
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
self.provider_received(key, provider)
|
self.provider_received(key, provider);
|
||||||
|
|
||||||
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
|
KademliaEvent::InboundRequestServed{ request: InboundRequest::AddProvider {} }
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
KademliaHandlerEvent::GetRecord { key, request_id } => {
|
KademliaHandlerEvent::GetRecord { key, request_id } => {
|
||||||
@ -1792,6 +1831,13 @@ where
|
|||||||
|
|
||||||
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
|
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
|
||||||
|
|
||||||
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
|
KademliaEvent::InboundRequestServed{ request: InboundRequest::GetRecord {
|
||||||
|
num_closer_peers: closer_peers.len(),
|
||||||
|
present_locally: record.is_some(),
|
||||||
|
}}
|
||||||
|
));
|
||||||
|
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: source,
|
peer_id: source,
|
||||||
handler: NotifyHandler::One(connection),
|
handler: NotifyHandler::One(connection),
|
||||||
@ -1858,6 +1904,10 @@ where
|
|||||||
request_id
|
request_id
|
||||||
} => {
|
} => {
|
||||||
self.record_received(source, connection, request_id, record);
|
self.record_received(source, connection, request_id, record);
|
||||||
|
|
||||||
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
|
KademliaEvent::InboundRequestServed{ request: InboundRequest::PutRecord {} }
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
KademliaHandlerEvent::PutRecordRes {
|
KademliaHandlerEvent::PutRecordRes {
|
||||||
@ -1955,9 +2005,14 @@ where
|
|||||||
if let Some(entry) = self.kbuckets.take_applied_pending() {
|
if let Some(entry) = self.kbuckets.take_applied_pending() {
|
||||||
let kbucket::Node { key, value } = entry.inserted;
|
let kbucket::Node { key, value } = entry.inserted;
|
||||||
let event = KademliaEvent::RoutingUpdated {
|
let event = KademliaEvent::RoutingUpdated {
|
||||||
|
bucket_range: self.kbuckets
|
||||||
|
.bucket(&key)
|
||||||
|
.map(|b| b.range())
|
||||||
|
.expect("Self to never be applied from pending."),
|
||||||
peer: key.into_preimage(),
|
peer: key.into_preimage(),
|
||||||
|
is_new_peer: true,
|
||||||
addresses: value,
|
addresses: value,
|
||||||
old_peer: entry.evicted.map(|n| n.key.into_preimage())
|
old_peer: entry.evicted.map(|n| n.key.into_preimage()),
|
||||||
};
|
};
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
|
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
|
||||||
}
|
}
|
||||||
@ -2054,8 +2109,17 @@ pub struct PeerRecord {
|
|||||||
/// See [`NetworkBehaviour::poll`].
|
/// See [`NetworkBehaviour::poll`].
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum KademliaEvent {
|
pub enum KademliaEvent {
|
||||||
/// A query has produced a result.
|
/// An inbound request has been received and handled.
|
||||||
QueryResult {
|
//
|
||||||
|
// Note on the difference between 'request' and 'query': A request is a
|
||||||
|
// single request-response style exchange with a single remote peer. A query
|
||||||
|
// is made of multiple requests across multiple remote peers.
|
||||||
|
InboundRequestServed {
|
||||||
|
request: InboundRequest,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// An outbound query has produced a result.
|
||||||
|
OutboundQueryCompleted {
|
||||||
/// The ID of the query that finished.
|
/// The ID of the query that finished.
|
||||||
id: QueryId,
|
id: QueryId,
|
||||||
/// The result of the query.
|
/// The result of the query.
|
||||||
@ -2069,8 +2133,14 @@ pub enum KademliaEvent {
|
|||||||
RoutingUpdated {
|
RoutingUpdated {
|
||||||
/// The ID of the peer that was added or updated.
|
/// The ID of the peer that was added or updated.
|
||||||
peer: PeerId,
|
peer: PeerId,
|
||||||
|
/// Whether this is a new peer and was thus just added to the routing
|
||||||
|
/// table, or whether it is an existing peer who's addresses changed.
|
||||||
|
is_new_peer: bool,
|
||||||
/// The full list of known addresses of `peer`.
|
/// The full list of known addresses of `peer`.
|
||||||
addresses: Addresses,
|
addresses: Addresses,
|
||||||
|
/// Returns the minimum inclusive and maximum inclusive [`Distance`] for
|
||||||
|
/// the bucket of the peer.
|
||||||
|
bucket_range: (Distance, Distance),
|
||||||
/// The ID of the peer that was evicted from the routing table to make
|
/// The ID of the peer that was evicted from the routing table to make
|
||||||
/// room for the new peer, if any.
|
/// room for the new peer, if any.
|
||||||
old_peer: Option<PeerId>,
|
old_peer: Option<PeerId>,
|
||||||
@ -2117,6 +2187,36 @@ pub enum KademliaEvent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Information about a received and handled inbound request.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum InboundRequest {
|
||||||
|
/// Request for the list of nodes whose IDs are the closest to `key`.
|
||||||
|
FindNode {
|
||||||
|
num_closer_peers: usize,
|
||||||
|
},
|
||||||
|
/// Same as `FindNode`, but should also return the entries of the local
|
||||||
|
/// providers list for this key.
|
||||||
|
GetProvider {
|
||||||
|
num_closer_peers: usize,
|
||||||
|
num_provider_peers: usize,
|
||||||
|
},
|
||||||
|
/// Request to store a peer as a provider.
|
||||||
|
//
|
||||||
|
// TODO: In the future one might want to use this event, not only to report
|
||||||
|
// a new provider, but to allow the upper layer to validate the incoming
|
||||||
|
// provider record, discarding it or passing it back down to be stored. This
|
||||||
|
// would follow a similar style to the `KademliaBucketInserts` strategy.
|
||||||
|
// Same would be applicable to `PutRecord`.
|
||||||
|
AddProvider {},
|
||||||
|
/// Request to retrieve a record.
|
||||||
|
GetRecord {
|
||||||
|
num_closer_peers: usize,
|
||||||
|
present_locally: bool,
|
||||||
|
},
|
||||||
|
/// Request to store a record.
|
||||||
|
PutRecord {},
|
||||||
|
}
|
||||||
|
|
||||||
/// The results of Kademlia queries.
|
/// The results of Kademlia queries.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum QueryResult {
|
pub enum QueryResult {
|
||||||
|
@ -185,7 +185,7 @@ fn bootstrap() {
|
|||||||
for (i, swarm) in swarms.iter_mut().enumerate() {
|
for (i, swarm) in swarms.iter_mut().enumerate() {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id, result: QueryResult::Bootstrap(Ok(ok)), ..
|
id, result: QueryResult::Bootstrap(Ok(ok)), ..
|
||||||
}))) => {
|
}))) => {
|
||||||
assert_eq!(id, qid);
|
assert_eq!(id, qid);
|
||||||
@ -265,7 +265,7 @@ fn query_iter() {
|
|||||||
for (i, swarm) in swarms.iter_mut().enumerate() {
|
for (i, swarm) in swarms.iter_mut().enumerate() {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id, result: QueryResult::GetClosestPeers(Ok(ok)), ..
|
id, result: QueryResult::GetClosestPeers(Ok(ok)), ..
|
||||||
}))) => {
|
}))) => {
|
||||||
assert_eq!(id, qid);
|
assert_eq!(id, qid);
|
||||||
@ -318,7 +318,7 @@ fn unresponsive_not_returned_direct() {
|
|||||||
for swarm in &mut swarms {
|
for swarm in &mut swarms {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
result: QueryResult::GetClosestPeers(Ok(ok)), ..
|
result: QueryResult::GetClosestPeers(Ok(ok)), ..
|
||||||
}))) => {
|
}))) => {
|
||||||
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
|
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
|
||||||
@ -368,7 +368,7 @@ fn unresponsive_not_returned_indirect() {
|
|||||||
for swarm in &mut swarms {
|
for swarm in &mut swarms {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
result: QueryResult::GetClosestPeers(Ok(ok)), ..
|
result: QueryResult::GetClosestPeers(Ok(ok)), ..
|
||||||
}))) => {
|
}))) => {
|
||||||
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
|
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
|
||||||
@ -412,7 +412,7 @@ fn get_record_not_found() {
|
|||||||
for swarm in &mut swarms {
|
for swarm in &mut swarms {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id, result: QueryResult::GetRecord(Err(e)), ..
|
id, result: QueryResult::GetRecord(Err(e)), ..
|
||||||
}))) => {
|
}))) => {
|
||||||
assert_eq!(id, qid);
|
assert_eq!(id, qid);
|
||||||
@ -519,10 +519,10 @@ fn put_record() {
|
|||||||
for swarm in &mut swarms {
|
for swarm in &mut swarms {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id, result: QueryResult::PutRecord(res), stats
|
id, result: QueryResult::PutRecord(res), stats
|
||||||
}))) |
|
}))) |
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id, result: QueryResult::RepublishRecord(res), stats
|
id, result: QueryResult::RepublishRecord(res), stats
|
||||||
}))) => {
|
}))) => {
|
||||||
assert!(qids.is_empty() || qids.remove(&id));
|
assert!(qids.is_empty() || qids.remove(&id));
|
||||||
@ -652,7 +652,7 @@ fn get_record() {
|
|||||||
for swarm in &mut swarms {
|
for swarm in &mut swarms {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id,
|
id,
|
||||||
result: QueryResult::GetRecord(Ok(GetRecordOk {
|
result: QueryResult::GetRecord(Ok(GetRecordOk {
|
||||||
records, cache_candidates
|
records, cache_candidates
|
||||||
@ -702,7 +702,7 @@ fn get_record_many() {
|
|||||||
for swarm in &mut swarms {
|
for swarm in &mut swarms {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id,
|
id,
|
||||||
result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })),
|
result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })),
|
||||||
..
|
..
|
||||||
@ -784,10 +784,10 @@ fn add_provider() {
|
|||||||
for swarm in &mut swarms {
|
for swarm in &mut swarms {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id, result: QueryResult::StartProviding(res), ..
|
id, result: QueryResult::StartProviding(res), ..
|
||||||
}))) |
|
}))) |
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
id, result: QueryResult::RepublishProvider(res), ..
|
id, result: QueryResult::RepublishProvider(res), ..
|
||||||
}))) => {
|
}))) => {
|
||||||
assert!(qids.is_empty() || qids.remove(&id));
|
assert!(qids.is_empty() || qids.remove(&id));
|
||||||
@ -903,7 +903,7 @@ fn exceed_jobs_max_queries() {
|
|||||||
loop {
|
loop {
|
||||||
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
|
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
|
||||||
match e {
|
match e {
|
||||||
SwarmEvent::Behaviour(KademliaEvent::QueryResult {
|
SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
|
||||||
result: QueryResult::GetClosestPeers(Ok(r)), ..
|
result: QueryResult::GetClosestPeers(Ok(r)), ..
|
||||||
}) => break assert!(r.peers.is_empty()),
|
}) => break assert!(r.peers.is_empty()),
|
||||||
SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e),
|
SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e),
|
||||||
@ -972,7 +972,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
|
|||||||
for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() {
|
for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult{
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted{
|
||||||
result: QueryResult::GetRecord(result),
|
result: QueryResult::GetRecord(result),
|
||||||
..
|
..
|
||||||
}))) => {
|
}))) => {
|
||||||
@ -1025,7 +1025,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
|
|||||||
for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() {
|
for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() {
|
||||||
loop {
|
loop {
|
||||||
match swarm.poll_next_unpin(ctx) {
|
match swarm.poll_next_unpin(ctx) {
|
||||||
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult{
|
Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted{
|
||||||
result: QueryResult::GetRecord(result),
|
result: QueryResult::GetRecord(result),
|
||||||
..
|
..
|
||||||
}))) => {
|
}))) => {
|
||||||
|
@ -189,10 +189,9 @@ where
|
|||||||
.value
|
.value
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the status of the entry to `NodeStatus::Disconnected`.
|
/// Sets the status of the entry to the provided [`NodeStatus`].
|
||||||
pub fn update(self, status: NodeStatus) -> Self {
|
pub fn update(&mut self, status: NodeStatus) {
|
||||||
self.0.bucket.update(self.0.key, status);
|
self.0.bucket.update(self.0.key, status);
|
||||||
Self::new(self.0.bucket, self.0.key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes the entry from the bucket.
|
/// Removes the entry from the bucket.
|
||||||
@ -274,4 +273,3 @@ where
|
|||||||
}, status)
|
}, status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,6 +48,8 @@ pub use behaviour::{
|
|||||||
Quorum
|
Quorum
|
||||||
};
|
};
|
||||||
pub use behaviour::{
|
pub use behaviour::{
|
||||||
|
InboundRequest,
|
||||||
|
|
||||||
QueryRef,
|
QueryRef,
|
||||||
QueryMut,
|
QueryMut,
|
||||||
|
|
||||||
|
@ -569,7 +569,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
|
|||||||
let query_id = dst_swarm.behaviour_mut().kad.bootstrap().unwrap();
|
let query_id = dst_swarm.behaviour_mut().kad.bootstrap().unwrap();
|
||||||
loop {
|
loop {
|
||||||
match dst_swarm.select_next_some().await {
|
match dst_swarm.select_next_some().await {
|
||||||
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::QueryResult {
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::OutboundQueryCompleted {
|
||||||
id,
|
id,
|
||||||
result: QueryResult::Bootstrap(Ok(_)),
|
result: QueryResult::Bootstrap(Ok(_)),
|
||||||
..
|
..
|
||||||
@ -647,7 +647,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
|
|||||||
SwarmEvent::Dialing(peer_id)
|
SwarmEvent::Dialing(peer_id)
|
||||||
if peer_id == relay_peer_id || peer_id == dst_peer_id => {}
|
if peer_id == relay_peer_id || peer_id == dst_peer_id => {}
|
||||||
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
||||||
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::QueryResult {
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::OutboundQueryCompleted {
|
||||||
id,
|
id,
|
||||||
result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { .. })),
|
result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { .. })),
|
||||||
..
|
..
|
||||||
|
Reference in New Issue
Block a user