diff --git a/protocols/kad/src/addresses.rs b/protocols/kad/src/addresses.rs index 08843989..4efb1423 100644 --- a/protocols/kad/src/addresses.rs +++ b/protocols/kad/src/addresses.rs @@ -18,114 +18,56 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use arrayvec::ArrayVec; use libp2p_core::Multiaddr; -use std::{fmt, time::Duration, time::Instant}; +use smallvec::SmallVec; +use std::fmt; /// List of addresses of a peer. #[derive(Clone)] pub struct Addresses { - /// Contains an `Instant` when the address expires. If `None`, we are connected to this - /// address. - addrs: ArrayVec<[(Multiaddr, Option); 6]>, - /// Time-to-live for addresses we're not connected to. - expiration: Duration, + addrs: SmallVec<[Multiaddr; 6]>, } impl Addresses { /// Creates a new list of addresses. pub fn new() -> Addresses { - Self::with_time_to_live(Duration::from_secs(60 * 60)) - } - - /// Creates a new list of addresses. The addresses we're not connected to will use the given - /// time-to-live before they expire. - pub fn with_time_to_live(ttl: Duration) -> Addresses { Addresses { - addrs: ArrayVec::new(), - expiration: ttl, + addrs: SmallVec::new(), } } /// Returns the list of addresses. pub fn iter(&self) -> impl Iterator { - let now = Instant::now(); - self.addrs.iter().filter_map(move |(addr, exp)| { - if let Some(exp) = exp { - if *exp >= now { - Some(addr) - } else { - None - } - } else { - Some(addr) - } - }) + self.addrs.iter() } - /// If true, we are connected to all the addresses returned by `iter()`. - /// - /// Returns false if the list of addresses is empty. - pub fn is_connected(&self) -> bool { - // Note: we're either connected to all addresses or none. There's no in-between. - self.addrs.first().map(|(_, exp)| exp.is_none()).unwrap_or(false) - } - - /// If we were connected to that addresses, indicates that we are now disconnected. - pub fn set_disconnected(&mut self, addr: &Multiaddr) { - let pos = match self.addrs.iter().position(|(a, _)| a == addr) { - Some(p) => p, - None => return, - }; - - // We were already disconnected. - if self.addrs[pos].1.is_some() { - return; - } - - // Address is the only known address. - if self.addrs.len() == 1 { - self.addrs[pos].1 = Some(Instant::now() + self.expiration); - return; - } - - // We know other connected addresses. Remove this one. - self.addrs.remove(pos); + /// Returns true if the list of addresses is empty. + pub fn is_empty(&self) -> bool { + self.addrs.is_empty() } /// Removes the given address from the list. Typically called if an address is determined to /// be invalid or unreachable. - pub fn remove_addr(&mut self, addr: &Multiaddr) { - if let Some(pos) = self.addrs.iter().position(|(a, _)| a == addr) { + pub fn remove(&mut self, addr: &Multiaddr) { + if let Some(pos) = self.addrs.iter().position(|a| a == addr) { self.addrs.remove(pos); } + + if self.addrs.len() <= self.addrs.inline_size() { + self.addrs.shrink_to_fit(); + } } - /// Inserts an address in the list. The address is an address we're not connected to, or may - /// not be connected to. - pub fn insert_not_connected(&mut self, addr: Multiaddr) { - // Don't insert if either we're already in the list, or we're connected to any address. - if self.addrs.iter().any(|(a, expires)| a == &addr || expires.is_none()) { - return; - } - - // Do a cleanup pass. - let now = Instant::now(); - self.addrs.retain(move |(_, exp)| { - exp.expect("We check above that all the expires are Some") > now - }); - - let _ = self.addrs.try_push((addr, Some(Instant::now() + self.expiration))); + /// Clears the list. It is empty afterwards. + pub fn clear(&mut self) { + self.addrs.clear(); + self.addrs.shrink_to_fit(); } - /// Inserts an address in the list. We know that the address is reachable. - pub fn insert_connected(&mut self, addr: Multiaddr) { - if !self.is_connected() { - self.addrs.clear(); - } - - if self.addrs.iter().all(|(a, _)| *a != addr) { - let _ = self.addrs.try_push((addr, None)); + /// Inserts an address in the list. No effect if the address was already in the list. + pub fn insert(&mut self, addr: Multiaddr) { + if self.addrs.iter().all(|a| *a != addr) { + self.addrs.push(addr); } } } @@ -139,106 +81,7 @@ impl Default for Addresses { impl fmt::Debug for Addresses { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_list() - .entries(self.addrs.iter().map(|(a, _)| a)) + .entries(self.addrs.iter()) .finish() } } - -#[cfg(test)] -mod tests { - use libp2p_core::multiaddr; - use super::Addresses; - use std::{iter, thread, time::Duration}; - - #[test] - fn insert_connected_after_not_connected() { - let mut addrs = Addresses::new(); - addrs.insert_not_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap()); - addrs.insert_not_connected("/ip4/6.7.8.9/tcp/5".parse().unwrap()); - addrs.insert_not_connected("/ip4/10.11.12.13/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 3); - assert!(!addrs.is_connected()); - addrs.insert_connected("/ip4/8.9.10.11".parse().unwrap()); - assert_eq!(addrs.iter().count(), 1); - assert!(addrs.is_connected()); - } - - #[test] - fn not_connected_expire() { - let mut addrs = Addresses::with_time_to_live(Duration::from_secs(2)); - - addrs.insert_not_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 1); - - thread::sleep(Duration::from_secs(1)); - assert_eq!(addrs.iter().count(), 1); - - addrs.insert_not_connected("/ip4/6.7.8.9/tcp/5".parse().unwrap()); - addrs.insert_not_connected("/ip4/10.11.12.13/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 3); - - thread::sleep(Duration::from_secs(1)); - assert_eq!(addrs.iter().count(), 2); - - thread::sleep(Duration::from_secs(1)); - assert_eq!(addrs.iter().count(), 0); - } - - #[test] - fn connected_dont_expire() { - let mut addrs = Addresses::with_time_to_live(Duration::from_secs(1)); - addrs.insert_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 1); - - thread::sleep(Duration::from_secs(2)); - assert_eq!(addrs.iter().count(), 1); - assert!(addrs.is_connected()); - } - - #[test] - fn dont_insert_disconnected_if_connected() { - let mut addrs = Addresses::new(); - addrs.insert_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 1); - - addrs.insert_not_connected("/ip4/5.6.7.8/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 1); - assert!(addrs.is_connected()); - } - - #[test] - fn disconnect_addr() { - let mut addrs = Addresses::new(); - - addrs.insert_connected("/ip4/1.2.3.4/tcp/5".parse().unwrap()); - addrs.insert_connected("/ip4/6.7.8.9/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 2); - - addrs.set_disconnected(&"/ip4/1.2.3.4/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 1); - assert!(addrs.is_connected()); - - addrs.set_disconnected(&"/ip4/6.7.8.9/tcp/5".parse().unwrap()); - assert_eq!(addrs.iter().count(), 1); - assert!(!addrs.is_connected()); - } - - #[test] - fn max_addrs() { - // Check that the number of addresses stops increasing even if we continue inserting. - let mut addrs = Addresses::new(); - - let mut previous_loop_count = None; - - for n in 0.. { - let addr: multiaddr::Multiaddr = iter::once(multiaddr::Protocol::Tcp(n)).collect(); - addrs.insert_not_connected(addr); - - let num = addrs.iter().count(); - if previous_loop_count == Some(num) { - return; // Test success - } - previous_loop_count = Some(num); - } - } -} diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index f2ce1adb..46c5c14c 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -214,21 +214,13 @@ impl Kademlia { /// Underlying implementation for `add_connected_address` and `add_not_connected_address`. fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, connected: bool) { match self.kbuckets.entry(peer_id) { - kbucket::Entry::InKbucketConnected(mut entry) => if connected { - entry.value().insert_connected(address) - } else { - entry.value().insert_not_connected(address) - }, - kbucket::Entry::InKbucketConnectedPending(mut entry) => if connected { - entry.value().insert_connected(address) - } else { - entry.value().insert_not_connected(address) - }, - kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert_not_connected(address), - kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert_not_connected(address), + kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address), + kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address), + kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address), + kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert(address), kbucket::Entry::NotInKbucket(entry) => { let mut addresses = Addresses::new(); - addresses.insert_not_connected(address); + addresses.insert(address); match entry.insert_disconnected(addresses) { kbucket::InsertOutcome::Inserted => { let event = KademliaOut::KBucketAdded { @@ -430,17 +422,15 @@ where }, kbucket::Entry::InKbucketDisconnected(mut entry) => { - debug_assert!(!entry.value().is_connected()); if let Some(address) = address { - entry.value().insert_connected(address); + entry.value().insert(address); } entry.set_connected(); }, kbucket::Entry::InKbucketDisconnectedPending(mut entry) => { - debug_assert!(!entry.value().is_connected()); if let Some(address) = address { - entry.value().insert_connected(address); + entry.value().insert(address); } entry.set_connected(); }, @@ -448,7 +438,7 @@ where kbucket::Entry::NotInKbucket(entry) => { let mut addresses = Addresses::new(); if let Some(address) = address { - addresses.insert_connected(address); + addresses.insert(address); } match entry.insert_connected(addresses) { kbucket::InsertOutcome::Inserted => { @@ -480,7 +470,7 @@ where if let Some(list) = self.kbuckets.entry(peer_id).value() { // TODO: don't remove the address if the error is that we are already connected // to this peer - list.remove_addr(addr); + list.remove(addr); } } } @@ -495,9 +485,7 @@ where if let ConnectedPoint::Dialer { address } = old_endpoint { match self.kbuckets.entry(id) { - kbucket::Entry::InKbucketConnected(mut entry) => { - debug_assert!(entry.value().is_connected()); - entry.value().set_disconnected(&address); + kbucket::Entry::InKbucketConnected(entry) => { match entry.set_disconnected() { kbucket::SetDisconnectedOutcome::Kept(_) => {}, kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => { @@ -509,9 +497,7 @@ where }, } }, - kbucket::Entry::InKbucketConnectedPending(mut entry) => { - debug_assert!(entry.value().is_connected()); - entry.value().set_disconnected(&address); + kbucket::Entry::InKbucketConnectedPending(entry) => { entry.set_disconnected(); }, kbucket::Entry::InKbucketDisconnected(_) => { @@ -528,7 +514,7 @@ where } } - fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + fn inject_replaced(&mut self, peer_id: PeerId, _old: ConnectedPoint, new_endpoint: ConnectedPoint) { // We need to re-send the active queries. for (query_id, query) in self.active_queries.iter() { if query.is_waiting(&peer_id) { @@ -540,12 +526,8 @@ where } if let Some(list) = self.kbuckets.entry(&peer_id).value() { - if let ConnectedPoint::Dialer { address } = old_endpoint { - list.set_disconnected(&address); - } - if let ConnectedPoint::Dialer { address } = new_endpoint { - list.insert_connected(address); + list.insert(address); } } } @@ -868,21 +850,13 @@ fn build_kad_peer( peer_id: PeerId, kbuckets: &mut KBucketsTable ) -> KadPeer { - debug_assert_ne!(*kbuckets.my_id(), peer_id); - - let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.entry(&peer_id).value() { - let connected = if addresses.is_connected() { - KadConnectionType::Connected - } else { - // TODO: there's also pending connection - KadConnectionType::NotConnected - }; - - (addresses.iter().cloned().collect(), connected) - - } else { - // TODO: there's also pending connection - (Vec::new(), KadConnectionType::NotConnected) + let (multiaddrs, connection_ty) = match kbuckets.entry(&peer_id) { + kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected), // TODO: pending connection? + kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), + kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), + kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected), + kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected), + kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the local ID"), }; KadPeer { diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 49b33fad..e3e19e0b 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -418,6 +418,11 @@ where table.first_connected_pos += 1; // And return a EntryInKbucketDisc. + debug_assert!(table.nodes.iter() + .position(move |e| e.id == *peer_id) + .map(|p| p < table.first_connected_pos) + .unwrap_or(false)); + SetDisconnectedOutcome::Kept(EntryInKbucketDisc { parent: self.parent, peer_id: self.peer_id,