Simplify the Addresses (#1012)

* Simplify the Addresses

* Remove println
This commit is contained in:
Pierre Krieger
2019-03-20 17:30:00 +01:00
committed by GitHub
parent 7be97b4f93
commit 7b2980133d
3 changed files with 48 additions and 226 deletions

View File

@ -18,114 +18,56 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use arrayvec::ArrayVec;
use libp2p_core::Multiaddr;
use std::{fmt, time::Duration, time::Instant};
use smallvec::SmallVec;
use std::fmt;
/// List of addresses of a peer.
#[derive(Clone)]
pub struct Addresses {
/// Contains an `Instant` when the address expires. If `None`, we are connected to this
/// address.
addrs: ArrayVec<[(Multiaddr, Option<Instant>); 6]>,
/// Time-to-live for addresses we're not connected to.
expiration: Duration,
addrs: SmallVec<[Multiaddr; 6]>,
}
impl Addresses {
/// Creates a new list of addresses.
pub fn new() -> Addresses {
Self::with_time_to_live(Duration::from_secs(60 * 60))
}
/// Creates a new list of addresses. The addresses we're not connected to will use the given
/// time-to-live before they expire.
pub fn with_time_to_live(ttl: Duration) -> Addresses {
Addresses {
addrs: ArrayVec::new(),
expiration: ttl,
addrs: SmallVec::new(),
}
}
/// Returns the list of addresses.
pub fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
let now = Instant::now();
self.addrs.iter().filter_map(move |(addr, exp)| {
if let Some(exp) = exp {
if *exp >= now {
Some(addr)
} else {
None
}
} else {
Some(addr)
}
})
self.addrs.iter()
}
/// If true, we are connected to all the addresses returned by `iter()`.
///
/// Returns false if the list of addresses is empty.
pub fn is_connected(&self) -> bool {
// Note: we're either connected to all addresses or none. There's no in-between.
self.addrs.first().map(|(_, exp)| exp.is_none()).unwrap_or(false)
}
/// If we were connected to that addresses, indicates that we are now disconnected.
pub fn set_disconnected(&mut self, addr: &Multiaddr) {
let pos = match self.addrs.iter().position(|(a, _)| a == addr) {
Some(p) => p,
None => return,
};
// We were already disconnected.
if self.addrs[pos].1.is_some() {
return;
}
// Address is the only known address.
if self.addrs.len() == 1 {
self.addrs[pos].1 = Some(Instant::now() + self.expiration);
return;
}
// We know other connected addresses. Remove this one.
self.addrs.remove(pos);
/// Returns true if the list of addresses is empty.
pub fn is_empty(&self) -> bool {
self.addrs.is_empty()
}
/// Removes the given address from the list. Typically called if an address is determined to
/// be invalid or unreachable.
pub fn remove_addr(&mut self, addr: &Multiaddr) {
if let Some(pos) = self.addrs.iter().position(|(a, _)| a == addr) {
pub fn remove(&mut self, addr: &Multiaddr) {
if let Some(pos) = self.addrs.iter().position(|a| a == addr) {
self.addrs.remove(pos);
}
if self.addrs.len() <= self.addrs.inline_size() {
self.addrs.shrink_to_fit();
}
}
/// Inserts an address in the list. The address is an address we're not connected to, or may
/// not be connected to.
pub fn insert_not_connected(&mut self, addr: Multiaddr) {
// Don't insert if either we're already in the list, or we're connected to any address.
if self.addrs.iter().any(|(a, expires)| a == &addr || expires.is_none()) {
return;
}
// Do a cleanup pass.
let now = Instant::now();
self.addrs.retain(move |(_, exp)| {
exp.expect("We check above that all the expires are Some") > now
});
let _ = self.addrs.try_push((addr, Some(Instant::now() + self.expiration)));
}
/// Inserts an address in the list. We know that the address is reachable.
pub fn insert_connected(&mut self, addr: Multiaddr) {
if !self.is_connected() {
/// Clears the list. It is empty afterwards.
pub fn clear(&mut self) {
self.addrs.clear();
self.addrs.shrink_to_fit();
}
if self.addrs.iter().all(|(a, _)| *a != addr) {
let _ = self.addrs.try_push((addr, None));
/// Inserts an address in the list. No effect if the address was already in the list.
pub fn insert(&mut self, addr: Multiaddr) {
if self.addrs.iter().all(|a| *a != addr) {
self.addrs.push(addr);
}
}
}
@ -139,106 +81,7 @@ impl Default for Addresses {
impl fmt::Debug for Addresses {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list()
.entries(self.addrs.iter().map(|(a, _)| a))
.entries(self.addrs.iter())
.finish()
}
}
#[cfg(test)]
mod tests {
use libp2p_core::multiaddr;
use super::Addresses;
use std::{iter, thread, time::Duration};
#[test]
fn insert_connected_after_not_connected() {
let mut addrs = Addresses::new();
addrs.insert_not_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap());
addrs.insert_not_connected("/ip4/6.7.8.9/tcp/5".parse().unwrap());
addrs.insert_not_connected("/ip4/10.11.12.13/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 3);
assert!(!addrs.is_connected());
addrs.insert_connected("/ip4/8.9.10.11".parse().unwrap());
assert_eq!(addrs.iter().count(), 1);
assert!(addrs.is_connected());
}
#[test]
fn not_connected_expire() {
let mut addrs = Addresses::with_time_to_live(Duration::from_secs(2));
addrs.insert_not_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 1);
thread::sleep(Duration::from_secs(1));
assert_eq!(addrs.iter().count(), 1);
addrs.insert_not_connected("/ip4/6.7.8.9/tcp/5".parse().unwrap());
addrs.insert_not_connected("/ip4/10.11.12.13/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 3);
thread::sleep(Duration::from_secs(1));
assert_eq!(addrs.iter().count(), 2);
thread::sleep(Duration::from_secs(1));
assert_eq!(addrs.iter().count(), 0);
}
#[test]
fn connected_dont_expire() {
let mut addrs = Addresses::with_time_to_live(Duration::from_secs(1));
addrs.insert_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 1);
thread::sleep(Duration::from_secs(2));
assert_eq!(addrs.iter().count(), 1);
assert!(addrs.is_connected());
}
#[test]
fn dont_insert_disconnected_if_connected() {
let mut addrs = Addresses::new();
addrs.insert_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 1);
addrs.insert_not_connected("/ip4/5.6.7.8/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 1);
assert!(addrs.is_connected());
}
#[test]
fn disconnect_addr() {
let mut addrs = Addresses::new();
addrs.insert_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap());
addrs.insert_connected("/ip4/6.7.8.9/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 2);
addrs.set_disconnected(&"/ip4/1.2.3.4/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 1);
assert!(addrs.is_connected());
addrs.set_disconnected(&"/ip4/6.7.8.9/tcp/5".parse().unwrap());
assert_eq!(addrs.iter().count(), 1);
assert!(!addrs.is_connected());
}
#[test]
fn max_addrs() {
// Check that the number of addresses stops increasing even if we continue inserting.
let mut addrs = Addresses::new();
let mut previous_loop_count = None;
for n in 0.. {
let addr: multiaddr::Multiaddr = iter::once(multiaddr::Protocol::Tcp(n)).collect();
addrs.insert_not_connected(addr);
let num = addrs.iter().count();
if previous_loop_count == Some(num) {
return; // Test success
}
previous_loop_count = Some(num);
}
}
}

View File

@ -214,21 +214,13 @@ impl<TSubstream> Kademlia<TSubstream> {
/// Underlying implementation for `add_connected_address` and `add_not_connected_address`.
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, connected: bool) {
match self.kbuckets.entry(peer_id) {
kbucket::Entry::InKbucketConnected(mut entry) => if connected {
entry.value().insert_connected(address)
} else {
entry.value().insert_not_connected(address)
},
kbucket::Entry::InKbucketConnectedPending(mut entry) => if connected {
entry.value().insert_connected(address)
} else {
entry.value().insert_not_connected(address)
},
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert_not_connected(address),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert_not_connected(address),
kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::NotInKbucket(entry) => {
let mut addresses = Addresses::new();
addresses.insert_not_connected(address);
addresses.insert(address);
match entry.insert_disconnected(addresses) {
kbucket::InsertOutcome::Inserted => {
let event = KademliaOut::KBucketAdded {
@ -430,17 +422,15 @@ where
},
kbucket::Entry::InKbucketDisconnected(mut entry) => {
debug_assert!(!entry.value().is_connected());
if let Some(address) = address {
entry.value().insert_connected(address);
entry.value().insert(address);
}
entry.set_connected();
},
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => {
debug_assert!(!entry.value().is_connected());
if let Some(address) = address {
entry.value().insert_connected(address);
entry.value().insert(address);
}
entry.set_connected();
},
@ -448,7 +438,7 @@ where
kbucket::Entry::NotInKbucket(entry) => {
let mut addresses = Addresses::new();
if let Some(address) = address {
addresses.insert_connected(address);
addresses.insert(address);
}
match entry.insert_connected(addresses) {
kbucket::InsertOutcome::Inserted => {
@ -480,7 +470,7 @@ where
if let Some(list) = self.kbuckets.entry(peer_id).value() {
// TODO: don't remove the address if the error is that we are already connected
// to this peer
list.remove_addr(addr);
list.remove(addr);
}
}
}
@ -495,9 +485,7 @@ where
if let ConnectedPoint::Dialer { address } = old_endpoint {
match self.kbuckets.entry(id) {
kbucket::Entry::InKbucketConnected(mut entry) => {
debug_assert!(entry.value().is_connected());
entry.value().set_disconnected(&address);
kbucket::Entry::InKbucketConnected(entry) => {
match entry.set_disconnected() {
kbucket::SetDisconnectedOutcome::Kept(_) => {},
kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => {
@ -509,9 +497,7 @@ where
},
}
},
kbucket::Entry::InKbucketConnectedPending(mut entry) => {
debug_assert!(entry.value().is_connected());
entry.value().set_disconnected(&address);
kbucket::Entry::InKbucketConnectedPending(entry) => {
entry.set_disconnected();
},
kbucket::Entry::InKbucketDisconnected(_) => {
@ -528,7 +514,7 @@ where
}
}
fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
fn inject_replaced(&mut self, peer_id: PeerId, _old: ConnectedPoint, new_endpoint: ConnectedPoint) {
// We need to re-send the active queries.
for (query_id, query) in self.active_queries.iter() {
if query.is_waiting(&peer_id) {
@ -540,12 +526,8 @@ where
}
if let Some(list) = self.kbuckets.entry(&peer_id).value() {
if let ConnectedPoint::Dialer { address } = old_endpoint {
list.set_disconnected(&address);
}
if let ConnectedPoint::Dialer { address } = new_endpoint {
list.insert_connected(address);
list.insert(address);
}
}
}
@ -868,21 +850,13 @@ fn build_kad_peer(
peer_id: PeerId,
kbuckets: &mut KBucketsTable<PeerId, Addresses>
) -> KadPeer {
debug_assert_ne!(*kbuckets.my_id(), peer_id);
let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.entry(&peer_id).value() {
let connected = if addresses.is_connected() {
KadConnectionType::Connected
} else {
// TODO: there's also pending connection
KadConnectionType::NotConnected
};
(addresses.iter().cloned().collect(), connected)
} else {
// TODO: there's also pending connection
(Vec::new(), KadConnectionType::NotConnected)
let (multiaddrs, connection_ty) = match kbuckets.entry(&peer_id) {
kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection?
kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the local ID"),
};
KadPeer {

View File

@ -418,6 +418,11 @@ where
table.first_connected_pos += 1;
// And return a EntryInKbucketDisc.
debug_assert!(table.nodes.iter()
.position(move |e| e.id == *peer_id)
.map(|p| p < table.first_connected_pos)
.unwrap_or(false));
SetDisconnectedOutcome::Kept(EntryInKbucketDisc {
parent: self.parent,
peer_id: self.peer_id,