mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 03:02:12 +00:00
[swarm] Configurable and "infinite" scores for external addresses. (#1842)
* Add "infinite" scores for external addresses. Extend address scores with an infinite cardinal, permitting addresses to be retained "forever" or until explicitly removed. Expose (external) address scores on the API. * Update swarm/src/registry.rs Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * Fix compilation. * Update CHANGELOG Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
parent
edb99eded6
commit
1bd013c843
@ -450,8 +450,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
|
|||||||
event: #wrapped_event,
|
event: #wrapped_event,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }) => {
|
std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address, score }) => {
|
||||||
return std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address });
|
return std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address, score });
|
||||||
}
|
}
|
||||||
std::task::Poll::Pending => break,
|
std::task::Poll::Pending => break,
|
||||||
}
|
}
|
||||||
|
@ -1412,8 +1412,8 @@ impl NetworkBehaviour for Gossipsub {
|
|||||||
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
|
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
|
||||||
NetworkBehaviourAction::DialPeer { peer_id, condition }
|
NetworkBehaviourAction::DialPeer { peer_id, condition }
|
||||||
}
|
}
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address } => {
|
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address }
|
NetworkBehaviourAction::ReportObservedAddr { address, score }
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@ use libp2p_core::{
|
|||||||
upgrade::{ReadOneError, UpgradeError}
|
upgrade::{ReadOneError, UpgradeError}
|
||||||
};
|
};
|
||||||
use libp2p_swarm::{
|
use libp2p_swarm::{
|
||||||
|
AddressScore,
|
||||||
NegotiatedSubstream,
|
NegotiatedSubstream,
|
||||||
NetworkBehaviour,
|
NetworkBehaviour,
|
||||||
NetworkBehaviourAction,
|
NetworkBehaviourAction,
|
||||||
@ -47,6 +48,10 @@ use std::{
|
|||||||
|
|
||||||
/// Network behaviour that automatically identifies nodes periodically, returns information
|
/// Network behaviour that automatically identifies nodes periodically, returns information
|
||||||
/// about them, and answers identify queries from other nodes.
|
/// about them, and answers identify queries from other nodes.
|
||||||
|
///
|
||||||
|
/// All external addresses of the local node supposedly observed by remotes
|
||||||
|
/// are reported via [`NetworkBehaviourAction::ReportObservedAddr`] with a
|
||||||
|
/// [score](AddressScore) of `1`.
|
||||||
pub struct Identify {
|
pub struct Identify {
|
||||||
/// Protocol version to send back to remotes.
|
/// Protocol version to send back to remotes.
|
||||||
protocol_version: String,
|
protocol_version: String,
|
||||||
@ -143,6 +148,7 @@ impl NetworkBehaviour for Identify {
|
|||||||
self.events.push_back(
|
self.events.push_back(
|
||||||
NetworkBehaviourAction::ReportObservedAddr {
|
NetworkBehaviourAction::ReportObservedAddr {
|
||||||
address: remote.observed_addr,
|
address: remote.observed_addr,
|
||||||
|
score: AddressScore::Finite(1),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
IdentifyHandlerEvent::Identify(sender) => {
|
IdentifyHandlerEvent::Identify(sender) => {
|
||||||
@ -187,7 +193,7 @@ impl NetworkBehaviour for Identify {
|
|||||||
.map(|p| String::from_utf8_lossy(&p).to_string())
|
.map(|p| String::from_utf8_lossy(&p).to_string())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut listen_addrs: Vec<_> = params.external_addresses().collect();
|
let mut listen_addrs: Vec<_> = params.external_addresses().map(|r| r.addr).collect();
|
||||||
listen_addrs.extend(params.listened_addresses());
|
listen_addrs.extend(params.listened_addresses());
|
||||||
|
|
||||||
let mut sending = 0;
|
let mut sending = 0;
|
||||||
|
@ -1046,7 +1046,7 @@ where
|
|||||||
phase: AddProviderPhase::GetClosestPeers
|
phase: AddProviderPhase::GetClosestPeers
|
||||||
} => {
|
} => {
|
||||||
let provider_id = params.local_peer_id().clone();
|
let provider_id = params.local_peer_id().clone();
|
||||||
let external_addresses = params.external_addresses().collect();
|
let external_addresses = params.external_addresses().map(|r| r.addr).collect();
|
||||||
let inner = QueryInner::new(QueryInfo::AddProvider {
|
let inner = QueryInner::new(QueryInfo::AddProvider {
|
||||||
context,
|
context,
|
||||||
key,
|
key,
|
||||||
|
@ -553,8 +553,8 @@ where
|
|||||||
NetworkBehaviourAction::DialPeer { peer_id, condition },
|
NetworkBehaviourAction::DialPeer { peer_id, condition },
|
||||||
| NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
| NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
||||||
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
|
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
|
||||||
| NetworkBehaviourAction::ReportObservedAddr { address } =>
|
| NetworkBehaviourAction::ReportObservedAddr { address, score } =>
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address }
|
NetworkBehaviourAction::ReportObservedAddr { address, score }
|
||||||
};
|
};
|
||||||
|
|
||||||
return Poll::Ready(event)
|
return Poll::Ready(event)
|
||||||
|
@ -2,6 +2,11 @@
|
|||||||
|
|
||||||
- Update `libp2p-core`.
|
- Update `libp2p-core`.
|
||||||
|
|
||||||
|
- Expose configurable scores for external addresses, as well as
|
||||||
|
the ability to remove them and to add addresses that are
|
||||||
|
retained "forever" (or until explicitly removed).
|
||||||
|
[PR 1842](https://github.com/libp2p/rust-libp2p/pull/1842).
|
||||||
|
|
||||||
# 0.24.0 [2020-11-09]
|
# 0.24.0 [2020-11-09]
|
||||||
|
|
||||||
- Update dependencies.
|
- Update dependencies.
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use crate::{AddressScore, AddressRecord};
|
||||||
use crate::protocols_handler::{IntoProtocolsHandler, ProtocolsHandler};
|
use crate::protocols_handler::{IntoProtocolsHandler, ProtocolsHandler};
|
||||||
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::{ConnectionId, ListenerId}};
|
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::{ConnectionId, ListenerId}};
|
||||||
use std::{error, task::Context, task::Poll};
|
use std::{error, task::Context, task::Poll};
|
||||||
@ -182,7 +183,7 @@ pub trait PollParameters {
|
|||||||
/// Iterator returned by [`listened_addresses`](PollParameters::listened_addresses).
|
/// Iterator returned by [`listened_addresses`](PollParameters::listened_addresses).
|
||||||
type ListenedAddressesIter: ExactSizeIterator<Item = Multiaddr>;
|
type ListenedAddressesIter: ExactSizeIterator<Item = Multiaddr>;
|
||||||
/// Iterator returned by [`external_addresses`](PollParameters::external_addresses).
|
/// Iterator returned by [`external_addresses`](PollParameters::external_addresses).
|
||||||
type ExternalAddressesIter: ExactSizeIterator<Item = Multiaddr>;
|
type ExternalAddressesIter: ExactSizeIterator<Item = AddressRecord>;
|
||||||
|
|
||||||
/// Returns the list of protocol the behaviour supports when a remote negotiates a protocol on
|
/// Returns the list of protocol the behaviour supports when a remote negotiates a protocol on
|
||||||
/// an inbound substream.
|
/// an inbound substream.
|
||||||
@ -269,8 +270,9 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
|
|||||||
event: TInEvent,
|
event: TInEvent,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Informs the `Swarm` about a multi-address observed by a remote for
|
/// Informs the `Swarm` about an address observed by a remote for
|
||||||
/// the local node.
|
/// the local node by which the local node is supposedly publicly
|
||||||
|
/// reachable.
|
||||||
///
|
///
|
||||||
/// It is advisable to issue `ReportObservedAddr` actions at a fixed frequency
|
/// It is advisable to issue `ReportObservedAddr` actions at a fixed frequency
|
||||||
/// per node. This way address information will be more accurate over time
|
/// per node. This way address information will be more accurate over time
|
||||||
@ -278,6 +280,10 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
|
|||||||
ReportObservedAddr {
|
ReportObservedAddr {
|
||||||
/// The observed address of the local node.
|
/// The observed address of the local node.
|
||||||
address: Multiaddr,
|
address: Multiaddr,
|
||||||
|
/// The score to associate with this observation, i.e.
|
||||||
|
/// an indicator for the trusworthiness of this address
|
||||||
|
/// relative to other observed addresses.
|
||||||
|
score: AddressScore,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -297,8 +303,8 @@ impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
|
|||||||
handler,
|
handler,
|
||||||
event: f(event)
|
event: f(event)
|
||||||
},
|
},
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address }
|
NetworkBehaviourAction::ReportObservedAddr { address, score }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -313,8 +319,8 @@ impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
|
|||||||
NetworkBehaviourAction::DialPeer { peer_id, condition },
|
NetworkBehaviourAction::DialPeer { peer_id, condition },
|
||||||
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
||||||
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
|
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address }
|
NetworkBehaviourAction::ReportObservedAddr { address, score }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,6 +82,7 @@ pub use protocols_handler::{
|
|||||||
OneShotHandlerConfig,
|
OneShotHandlerConfig,
|
||||||
SubstreamProtocol
|
SubstreamProtocol
|
||||||
};
|
};
|
||||||
|
pub use registry::{AddressScore, AddressRecord, AddAddressResult};
|
||||||
|
|
||||||
use protocols_handler::{
|
use protocols_handler::{
|
||||||
NodeHandlerWrapperBuilder,
|
NodeHandlerWrapperBuilder,
|
||||||
@ -397,34 +398,44 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
me.network.listen_addrs()
|
me.network.listen_addrs()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an iterator that produces the list of addresses that other nodes can use to reach
|
|
||||||
/// us.
|
|
||||||
pub fn external_addresses(me: &Self) -> impl Iterator<Item = &Multiaddr> {
|
|
||||||
me.external_addrs.iter()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the peer ID of the swarm passed as parameter.
|
/// Returns the peer ID of the swarm passed as parameter.
|
||||||
pub fn local_peer_id(me: &Self) -> &PeerId {
|
pub fn local_peer_id(me: &Self) -> &PeerId {
|
||||||
&me.network.local_peer_id()
|
&me.network.local_peer_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds an external address.
|
/// Returns an iterator for [`AddressRecord`]s of external addresses
|
||||||
///
|
/// of the local node, in decreasing order of their current
|
||||||
/// An external address is an address we are listening on but that accounts for things such as
|
/// [score](AddressScore).
|
||||||
/// NAT traversal.
|
pub fn external_addresses(me: &Self) -> impl Iterator<Item = &AddressRecord> {
|
||||||
pub fn add_external_address(me: &mut Self, addr: Multiaddr) {
|
me.external_addrs.iter()
|
||||||
me.external_addrs.add(addr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the connection info for an arbitrary connection with the peer, or `None`
|
/// Adds an external address record for the local node.
|
||||||
/// if there is no connection to that peer.
|
///
|
||||||
// TODO: should take &self instead of &mut self, but the API in network requires &mut
|
/// An external address is an address of the local node known to
|
||||||
pub fn connection_info(me: &mut Self, peer_id: &PeerId) -> Option<PeerId> {
|
/// be (likely) reachable for other nodes, possibly taking into
|
||||||
if let Some(mut n) = me.network.peer(peer_id.clone()).into_connected() {
|
/// account NAT. The external addresses of the local node may be
|
||||||
Some(n.some_connection().info().clone())
|
/// shared with other nodes by the `NetworkBehaviour`.
|
||||||
} else {
|
///
|
||||||
None
|
/// The associated score determines both the position of the address
|
||||||
}
|
/// in the list of external addresses (which can determine the
|
||||||
|
/// order in which addresses are used to connect to) as well as
|
||||||
|
/// how long the address is retained in the list, depending on
|
||||||
|
/// how frequently it is reported by the `NetworkBehaviour` via
|
||||||
|
/// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
|
||||||
|
/// through this method.
|
||||||
|
pub fn add_external_address(me: &mut Self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
|
||||||
|
me.external_addrs.add(a, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes an external address of the local node, regardless of
|
||||||
|
/// its current score. See [`ExpandedSwarm::add_external_address`]
|
||||||
|
/// for details.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the address existed and was removed, `false`
|
||||||
|
/// otherwise.
|
||||||
|
pub fn remove_external_address(me: &mut Self, addr: &Multiaddr) -> bool {
|
||||||
|
me.external_addrs.remove(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bans a peer by its peer ID.
|
/// Bans a peer by its peer ID.
|
||||||
@ -732,12 +743,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
|
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
|
||||||
for addr in this.network.address_translation(&address) {
|
for addr in this.network.address_translation(&address) {
|
||||||
if this.external_addrs.iter().all(|a| *a != addr) {
|
if this.external_addrs.iter().all(|a| &a.addr != &addr) {
|
||||||
this.behaviour.inject_new_external_addr(&addr);
|
this.behaviour.inject_new_external_addr(&addr);
|
||||||
}
|
}
|
||||||
this.external_addrs.add(addr);
|
this.external_addrs.add(addr, score);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -20,29 +20,143 @@
|
|||||||
|
|
||||||
use libp2p_core::Multiaddr;
|
use libp2p_core::Multiaddr;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{collections::VecDeque, num::NonZeroUsize};
|
use std::{collections::VecDeque, cmp::Ordering, num::NonZeroUsize};
|
||||||
|
use std::ops::{Add, Sub};
|
||||||
|
|
||||||
/// Hold a ranked collection of [`Multiaddr`] values.
|
/// A ranked collection of [`Multiaddr`] values.
|
||||||
|
///
|
||||||
|
/// Every address has an associated [score](`AddressScore`) and iterating
|
||||||
|
/// over the addresses will return them in order from highest to lowest score.
|
||||||
|
///
|
||||||
|
/// In addition to the currently held addresses and their score, the collection
|
||||||
|
/// keeps track of a limited history of the most-recently added addresses.
|
||||||
|
/// This history determines how address scores are reduced over time as old
|
||||||
|
/// scores expire in the context of new addresses being added:
|
||||||
|
///
|
||||||
|
/// * An address's score is increased by a given amount whenever it is
|
||||||
|
/// [(re-)added](Addresses::add) to the collection.
|
||||||
|
/// * An address's score is decreased by the same amount used when it
|
||||||
|
/// was added when the least-recently seen addition is (as per the
|
||||||
|
/// limited history) for this address in the context of [`Addresses::add`].
|
||||||
|
/// * If an address's score reaches 0 in the context of [`Addresses::add`],
|
||||||
|
/// it is removed from the collection.
|
||||||
///
|
///
|
||||||
/// Every address has an associated score and iterating over addresses will return them
|
|
||||||
/// in order from highest to lowest. When reaching the limit, addresses with the lowest
|
|
||||||
/// score will be dropped first.
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Addresses {
|
pub struct Addresses {
|
||||||
/// The ranked sequence of addresses.
|
/// The ranked sequence of addresses, from highest to lowest score.
|
||||||
registry: SmallVec<[Record; 8]>,
|
///
|
||||||
/// Number of historical reports. Similar to `reports.capacity()`.
|
/// By design, the number of finitely scored addresses stored here is
|
||||||
|
/// never larger (but may be smaller) than the number of historic `reports`
|
||||||
|
/// at any time.
|
||||||
|
registry: SmallVec<[AddressRecord; 8]>,
|
||||||
|
/// The configured limit of the `reports` history of added addresses,
|
||||||
|
/// and thus also of the size of the `registry` w.r.t. finitely scored
|
||||||
|
/// addresses.
|
||||||
limit: NonZeroUsize,
|
limit: NonZeroUsize,
|
||||||
/// Queue of last reports. Every new report is added to the queue. If the queue reaches its
|
/// The limited history of added addresses. If the queue reaches the `limit`,
|
||||||
/// capacity, we also pop the first element.
|
/// the first record, i.e. the least-recently added, is removed in the
|
||||||
reports: VecDeque<Multiaddr>,
|
/// context of [`Addresses::add`] and the corresponding record in the
|
||||||
|
/// `registry` has its score reduced accordingly.
|
||||||
|
reports: VecDeque<Report>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// An address record associates a score to a Multiaddr.
|
/// An record in a prioritised list of addresses.
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
struct Record {
|
#[non_exhaustive]
|
||||||
|
pub struct AddressRecord {
|
||||||
|
pub addr: Multiaddr,
|
||||||
|
pub score: AddressScore,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A report tracked for a finitely scored address.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct Report {
|
||||||
|
addr: Multiaddr,
|
||||||
score: u32,
|
score: u32,
|
||||||
addr: Multiaddr
|
}
|
||||||
|
|
||||||
|
impl AddressRecord {
|
||||||
|
fn new(addr: Multiaddr, score: AddressScore) -> Self {
|
||||||
|
AddressRecord {
|
||||||
|
addr, score,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The "score" of an address w.r.t. an ordered collection of addresses.
|
||||||
|
///
|
||||||
|
/// A score is a measure of the trusworthyness of a particular
|
||||||
|
/// observation of an address. The same address may be repeatedly
|
||||||
|
/// reported with the same or differing scores.
|
||||||
|
#[derive(PartialEq, Eq, Debug, Clone, Copy, Hash)]
|
||||||
|
pub enum AddressScore {
|
||||||
|
/// The score is "infinite", i.e. an address with this score is never
|
||||||
|
/// purged from the associated address records and remains sorted at
|
||||||
|
/// the beginning (possibly with other `Infinite`ly scored addresses).
|
||||||
|
Infinite,
|
||||||
|
/// The score is finite, i.e. an address with this score has
|
||||||
|
/// its score increased and decreased as per the frequency of
|
||||||
|
/// reports (i.e. additions) of the same address relative to
|
||||||
|
/// the reports of other addresses.
|
||||||
|
Finite(u32),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AddressScore {
|
||||||
|
fn is_zero(&self) -> bool {
|
||||||
|
&AddressScore::Finite(0) == self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialOrd for AddressScore {
|
||||||
|
fn partial_cmp(&self, other: &AddressScore) -> Option<Ordering> {
|
||||||
|
Some(self.cmp(other))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ord for AddressScore {
|
||||||
|
fn cmp(&self, other: &AddressScore) -> Ordering {
|
||||||
|
// Semantics of cardinal numbers with a single infinite cardinal.
|
||||||
|
match (self, other) {
|
||||||
|
(AddressScore::Infinite, AddressScore::Infinite) =>
|
||||||
|
Ordering::Equal,
|
||||||
|
(AddressScore::Infinite, AddressScore::Finite(_)) =>
|
||||||
|
Ordering::Greater,
|
||||||
|
(AddressScore::Finite(_), AddressScore::Infinite) =>
|
||||||
|
Ordering::Less,
|
||||||
|
(AddressScore::Finite(a), AddressScore::Finite(b)) =>
|
||||||
|
a.cmp(b),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Add for AddressScore {
|
||||||
|
type Output = AddressScore;
|
||||||
|
|
||||||
|
fn add(self, rhs: AddressScore) -> Self::Output {
|
||||||
|
// Semantics of cardinal numbers with a single infinite cardinal.
|
||||||
|
match (self, rhs) {
|
||||||
|
(AddressScore::Infinite, AddressScore::Infinite) =>
|
||||||
|
AddressScore::Infinite,
|
||||||
|
(AddressScore::Infinite, AddressScore::Finite(_)) =>
|
||||||
|
AddressScore::Infinite,
|
||||||
|
(AddressScore::Finite(_), AddressScore::Infinite) =>
|
||||||
|
AddressScore::Infinite,
|
||||||
|
(AddressScore::Finite(a), AddressScore::Finite(b)) =>
|
||||||
|
AddressScore::Finite(a.saturating_add(b))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sub<u32> for AddressScore {
|
||||||
|
type Output = AddressScore;
|
||||||
|
|
||||||
|
fn sub(self, rhs: u32) -> Self::Output {
|
||||||
|
// Semantics of cardinal numbers with a single infinite cardinal.
|
||||||
|
match self {
|
||||||
|
AddressScore::Infinite => AddressScore::Infinite,
|
||||||
|
AddressScore::Finite(score) => AddressScore::Finite(score.saturating_sub(rhs))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Addresses {
|
impl Default for Addresses {
|
||||||
@ -51,8 +165,16 @@ impl Default for Addresses {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The result of adding an address to an ordered list of
|
||||||
|
/// addresses with associated scores.
|
||||||
|
pub enum AddAddressResult {
|
||||||
|
Inserted,
|
||||||
|
Updated,
|
||||||
|
}
|
||||||
|
|
||||||
impl Addresses {
|
impl Addresses {
|
||||||
/// Create a new address collection of bounded length.
|
/// Create a new ranked address collection with the given size limit
|
||||||
|
/// for [finitely scored](AddressScore::Finite) addresses.
|
||||||
pub fn new(limit: NonZeroUsize) -> Self {
|
pub fn new(limit: NonZeroUsize) -> Self {
|
||||||
Addresses {
|
Addresses {
|
||||||
registry: SmallVec::new(),
|
registry: SmallVec::new(),
|
||||||
@ -63,40 +185,61 @@ impl Addresses {
|
|||||||
|
|
||||||
/// Add a [`Multiaddr`] to the collection.
|
/// Add a [`Multiaddr`] to the collection.
|
||||||
///
|
///
|
||||||
/// Adding an existing address is interpreted as additional
|
/// If the given address already exists in the collection,
|
||||||
/// confirmation and thus increases its score.
|
/// the given score is added to the current score of the address.
|
||||||
pub fn add(&mut self, a: Multiaddr) {
|
///
|
||||||
|
/// If the collection has already observed the configured
|
||||||
let oldest = if self.reports.len() == self.limit.get() {
|
/// number of address additions, the least-recently added address
|
||||||
self.reports.pop_front()
|
/// as per this limited history has its score reduced by the amount
|
||||||
} else {
|
/// used in this prior report, with removal from the collection
|
||||||
None
|
/// occurring when the score drops to 0.
|
||||||
};
|
pub fn add(&mut self, addr: Multiaddr, score: AddressScore) -> AddAddressResult {
|
||||||
|
// If enough reports (i.e. address additions) occurred, reduce
|
||||||
if let Some(oldest) = oldest {
|
// the score of the least-recently added address.
|
||||||
if let Some(in_registry) = self.registry.iter_mut().find(|r| r.addr == oldest) {
|
if self.reports.len() == self.limit.get() {
|
||||||
in_registry.score = in_registry.score.saturating_sub(1);
|
let old_report = self.reports.pop_front().expect("len = limit > 0");
|
||||||
|
// If the address is still in the collection, decrease its score.
|
||||||
|
if let Some(record) = self.registry.iter_mut().find(|r| r.addr == old_report.addr) {
|
||||||
|
record.score = record.score - old_report.score;
|
||||||
isort(&mut self.registry);
|
isort(&mut self.registry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove addresses that have a score of 0.
|
// Remove addresses that have a score of 0.
|
||||||
while self.registry.last().map(|e| e.score == 0).unwrap_or(false) {
|
while self.registry.last().map(|e| e.score.is_zero()).unwrap_or(false) {
|
||||||
self.registry.pop();
|
self.registry.pop();
|
||||||
}
|
}
|
||||||
|
|
||||||
self.reports.push_back(a.clone());
|
// If the address score is finite, remember this report.
|
||||||
|
if let AddressScore::Finite(score) = score {
|
||||||
|
self.reports.push_back(Report { addr: addr.clone(), score });
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the address is already in the collection, increase its score.
|
||||||
for r in &mut self.registry {
|
for r in &mut self.registry {
|
||||||
if r.addr == a {
|
if r.addr == addr {
|
||||||
r.score = r.score.saturating_add(1);
|
r.score = r.score + score;
|
||||||
isort(&mut self.registry);
|
isort(&mut self.registry);
|
||||||
return
|
return AddAddressResult::Updated
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let r = Record { score: 1, addr: a };
|
// It is a new record.
|
||||||
self.registry.push(r)
|
self.registry.push(AddressRecord::new(addr, score));
|
||||||
|
AddAddressResult::Inserted
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Explicitly remove an address from the collection.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the address existed in the collection
|
||||||
|
/// and was thus removed, false otherwise.
|
||||||
|
pub fn remove(&mut self, addr: &Multiaddr) -> bool {
|
||||||
|
if let Some(pos) = self.registry.iter().position(|r| &r.addr == addr) {
|
||||||
|
self.registry.remove(pos);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return an iterator over all [`Multiaddr`] values.
|
/// Return an iterator over all [`Multiaddr`] values.
|
||||||
@ -117,12 +260,12 @@ impl Addresses {
|
|||||||
/// An iterator over [`Multiaddr`] values.
|
/// An iterator over [`Multiaddr`] values.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AddressIter<'a> {
|
pub struct AddressIter<'a> {
|
||||||
items: &'a [Record],
|
items: &'a [AddressRecord],
|
||||||
offset: usize
|
offset: usize
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Iterator for AddressIter<'a> {
|
impl<'a> Iterator for AddressIter<'a> {
|
||||||
type Item = &'a Multiaddr;
|
type Item = &'a AddressRecord;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
if self.offset == self.items.len() {
|
if self.offset == self.items.len() {
|
||||||
@ -130,7 +273,7 @@ impl<'a> Iterator for AddressIter<'a> {
|
|||||||
}
|
}
|
||||||
let item = &self.items[self.offset];
|
let item = &self.items[self.offset];
|
||||||
self.offset += 1;
|
self.offset += 1;
|
||||||
Some(&item.addr)
|
Some(&item)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
@ -144,15 +287,15 @@ impl<'a> ExactSizeIterator for AddressIter<'a> {}
|
|||||||
/// An iterator over [`Multiaddr`] values.
|
/// An iterator over [`Multiaddr`] values.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AddressIntoIter {
|
pub struct AddressIntoIter {
|
||||||
items: SmallVec<[Record; 8]>,
|
items: SmallVec<[AddressRecord; 8]>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Iterator for AddressIntoIter {
|
impl Iterator for AddressIntoIter {
|
||||||
type Item = Multiaddr;
|
type Item = AddressRecord;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
if !self.items.is_empty() {
|
if !self.items.is_empty() {
|
||||||
Some(self.items.remove(0).addr)
|
Some(self.items.remove(0))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@ -167,7 +310,7 @@ impl Iterator for AddressIntoIter {
|
|||||||
impl ExactSizeIterator for AddressIntoIter {}
|
impl ExactSizeIterator for AddressIntoIter {}
|
||||||
|
|
||||||
// Reverse insertion sort.
|
// Reverse insertion sort.
|
||||||
fn isort(xs: &mut [Record]) {
|
fn isort(xs: &mut [AddressRecord]) {
|
||||||
for i in 1 .. xs.len() {
|
for i in 1 .. xs.len() {
|
||||||
for j in (1 ..= i).rev() {
|
for j in (1 ..= i).rev() {
|
||||||
if xs[j].score <= xs[j - 1].score {
|
if xs[j].score <= xs[j - 1].score {
|
||||||
@ -181,16 +324,34 @@ fn isort(xs: &mut [Record]) {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use libp2p_core::multiaddr::{Multiaddr, Protocol};
|
use libp2p_core::multiaddr::{Multiaddr, Protocol};
|
||||||
use quickcheck::{Arbitrary, Gen, QuickCheck};
|
use quickcheck::*;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use std::num::NonZeroUsize;
|
use std::num::{NonZeroUsize, NonZeroU8};
|
||||||
use super::{isort, Addresses, Record};
|
use super::*;
|
||||||
|
|
||||||
|
impl Arbitrary for AddressScore {
|
||||||
|
fn arbitrary<G: Gen>(g: &mut G) -> AddressScore {
|
||||||
|
if g.gen_range(0, 10) == 0 { // ~10% "Infinitely" scored addresses
|
||||||
|
AddressScore::Infinite
|
||||||
|
} else {
|
||||||
|
AddressScore::Finite(g.gen())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Arbitrary for AddressRecord {
|
||||||
|
fn arbitrary<G: Gen>(g: &mut G) -> Self {
|
||||||
|
let addr = Protocol::Tcp(g.gen::<u16>() % 256).into();
|
||||||
|
let score = AddressScore::arbitrary(g);
|
||||||
|
AddressRecord::new(addr, score)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn isort_sorts() {
|
fn isort_sorts() {
|
||||||
fn property(xs: Vec<u32>) -> bool {
|
fn property(xs: Vec<AddressScore>) {
|
||||||
let mut xs = xs.into_iter()
|
let mut xs = xs.into_iter()
|
||||||
.map(|s| Record { score: s, addr: Multiaddr::empty() })
|
.map(|score| AddressRecord::new(Multiaddr::empty(), score))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
isort(&mut xs);
|
isort(&mut xs);
|
||||||
@ -198,61 +359,98 @@ mod tests {
|
|||||||
for i in 1 .. xs.len() {
|
for i in 1 .. xs.len() {
|
||||||
assert!(xs[i - 1].score >= xs[i].score)
|
assert!(xs[i - 1].score >= xs[i].score)
|
||||||
}
|
}
|
||||||
|
|
||||||
true
|
|
||||||
}
|
}
|
||||||
QuickCheck::new().quickcheck(property as fn(Vec<u32>) -> bool)
|
|
||||||
|
quickcheck(property as fn(_));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn old_reports_disappear() {
|
fn score_retention() {
|
||||||
let mut addresses = Addresses::default();
|
fn prop(first: AddressRecord, other: AddressRecord) -> TestResult {
|
||||||
|
if first.addr == other.addr {
|
||||||
|
return TestResult::discard()
|
||||||
|
}
|
||||||
|
|
||||||
// Add an address a single time.
|
let mut addresses = Addresses::default();
|
||||||
let single: Multiaddr = "/tcp/2108".parse().unwrap();
|
|
||||||
addresses.add(single.clone());
|
|
||||||
assert!(addresses.iter().find(|a| **a == single).is_some());
|
|
||||||
|
|
||||||
// Then fill `addresses` with random stuff.
|
// Add the first address.
|
||||||
let other: Multiaddr = "/tcp/120".parse().unwrap();
|
addresses.add(first.addr.clone(), first.score);
|
||||||
for _ in 0 .. 2000 {
|
assert!(addresses.iter().any(|a| &a.addr == &first.addr));
|
||||||
addresses.add(other.clone());
|
|
||||||
|
// Add another address so often that the initial report of
|
||||||
|
// the first address may be purged and, since it was the
|
||||||
|
// only report, the address removed.
|
||||||
|
for _ in 0 .. addresses.limit.get() + 1 {
|
||||||
|
addresses.add(other.addr.clone(), other.score);
|
||||||
|
}
|
||||||
|
|
||||||
|
let exists = addresses.iter().any(|a| &a.addr == &first.addr);
|
||||||
|
|
||||||
|
match (first.score, other.score) {
|
||||||
|
// Only finite scores push out other finite scores.
|
||||||
|
(AddressScore::Finite(_), AddressScore::Finite(_)) => assert!(!exists),
|
||||||
|
_ => assert!(exists),
|
||||||
|
}
|
||||||
|
|
||||||
|
TestResult::passed()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that `single` disappeared from the list.
|
quickcheck(prop as fn(_,_) -> _);
|
||||||
assert!(addresses.iter().find(|a| **a == single).is_none());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn record_score_equals_last_n_reports() {
|
fn finitely_scored_address_limit() {
|
||||||
#[derive(PartialEq, Eq, Clone, Hash, Debug)]
|
fn prop(reports: Vec<AddressRecord>, limit: NonZeroU8) {
|
||||||
struct Ma(Multiaddr);
|
let mut addresses = Addresses::new(limit.into());
|
||||||
|
|
||||||
impl Arbitrary for Ma {
|
// Add all reports.
|
||||||
fn arbitrary<G: Gen>(g: &mut G) -> Self {
|
for r in reports {
|
||||||
Ma(Protocol::Tcp(g.gen::<u16>() % 16).into())
|
addresses.add(r.addr, r.score);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Count the finitely scored addresses.
|
||||||
|
let num_finite = addresses.iter().filter(|r| match r {
|
||||||
|
AddressRecord { score: AddressScore::Finite(_), .. } => true,
|
||||||
|
_ => false,
|
||||||
|
}).count();
|
||||||
|
|
||||||
|
// Check against the limit.
|
||||||
|
assert!(num_finite <= limit.get() as usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn property(xs: Vec<Ma>, n: u8) -> bool {
|
quickcheck(prop as fn(_,_));
|
||||||
let n = std::cmp::max(n, 1);
|
}
|
||||||
let mut addresses = Addresses::new(NonZeroUsize::new(usize::from(n)).unwrap());
|
|
||||||
for Ma(a) in &xs {
|
#[test]
|
||||||
addresses.add(a.clone())
|
fn record_score_sum() {
|
||||||
|
fn prop(records: Vec<AddressRecord>) -> bool {
|
||||||
|
// Make sure the address collection can hold all reports.
|
||||||
|
let n = std::cmp::max(records.len(), 1);
|
||||||
|
let mut addresses = Addresses::new(NonZeroUsize::new(n).unwrap());
|
||||||
|
|
||||||
|
// Add all address reports to the collection.
|
||||||
|
for r in records.iter() {
|
||||||
|
addresses.add(r.addr.clone(), r.score.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check that each address in the registry has the expected score.
|
||||||
for r in &addresses.registry {
|
for r in &addresses.registry {
|
||||||
let count = xs.iter()
|
let expected_score = records.iter().fold(
|
||||||
.rev()
|
None::<AddressScore>, |sum, rec|
|
||||||
.take(usize::from(n))
|
if &rec.addr == &r.addr {
|
||||||
.filter(|Ma(x)| x == &r.addr)
|
sum.map_or(Some(rec.score), |s| Some(s + rec.score))
|
||||||
.count();
|
} else {
|
||||||
if r.score as usize != count {
|
sum
|
||||||
|
});
|
||||||
|
|
||||||
|
if Some(r.score) != expected_score {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
QuickCheck::new().quickcheck(property as fn(Vec<Ma>, u8) -> bool)
|
quickcheck(prop as fn(_) -> _)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user