mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-28 01:01:34 +00:00
protocols/identify: Assist in peer discovery based on reported listen addresses from other peers (#2232)
Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -5,6 +5,10 @@
|
||||
|
||||
- Update dependencies.
|
||||
|
||||
- Assist in peer discovery by returning reported listen addresses
|
||||
of other peers from `addresses_of_peer`.
|
||||
[PR 2232](https://github.com/libp2p/rust-libp2p/pull/2232)
|
||||
|
||||
# 0.30.0 [2021-07-12]
|
||||
|
||||
- Update dependencies.
|
||||
|
@ -14,6 +14,7 @@ futures = "0.3.1"
|
||||
libp2p-core = { version = "0.30.0", path = "../../core", default-features = false }
|
||||
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
|
||||
log = "0.4.1"
|
||||
lru = "0.6"
|
||||
prost = "0.8"
|
||||
smallvec = "1.6.1"
|
||||
wasm-timer = "0.2"
|
||||
|
@ -31,9 +31,11 @@ use libp2p_swarm::{
|
||||
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
|
||||
ProtocolsHandlerUpgrErr,
|
||||
};
|
||||
use lru::LruCache;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
io,
|
||||
iter::FromIterator,
|
||||
pin::Pin,
|
||||
task::Context,
|
||||
task::Poll,
|
||||
@ -57,6 +59,8 @@ pub struct Identify {
|
||||
/// Peers to which an active push with current information about
|
||||
/// the local peer should be sent.
|
||||
pending_push: HashSet<PeerId>,
|
||||
/// The addresses of all peers that we have discovered.
|
||||
discovered_peers: LruCache<PeerId, HashSet<Multiaddr>>,
|
||||
}
|
||||
|
||||
/// A pending reply to an inbound identification request.
|
||||
@ -109,6 +113,10 @@ pub struct IdentifyConfig {
|
||||
///
|
||||
/// Disabled by default.
|
||||
pub push_listen_addr_updates: bool,
|
||||
|
||||
/// How many entries of discovered peers to keep before we discard
|
||||
/// the least-recently used one.
|
||||
pub cache_size: usize,
|
||||
}
|
||||
|
||||
impl IdentifyConfig {
|
||||
@ -122,6 +130,7 @@ impl IdentifyConfig {
|
||||
initial_delay: Duration::from_millis(500),
|
||||
interval: Duration::from_secs(5 * 60),
|
||||
push_listen_addr_updates: false,
|
||||
cache_size: 100,
|
||||
}
|
||||
}
|
||||
|
||||
@ -152,17 +161,29 @@ impl IdentifyConfig {
|
||||
self.push_listen_addr_updates = b;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configures the size of the LRU cache, caching addresses of discovered peers.
|
||||
///
|
||||
/// The [`Swarm`](libp2p_swarm::Swarm) may extend the set of addresses of an outgoing connection attempt via
|
||||
/// [`Identify::addresses_of_peer`].
|
||||
pub fn with_cache_size(mut self, cache_size: usize) -> Self {
|
||||
self.cache_size = cache_size;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Identify {
|
||||
/// Creates a new `Identify` network behaviour.
|
||||
pub fn new(config: IdentifyConfig) -> Self {
|
||||
let discovered_peers = LruCache::new(config.cache_size);
|
||||
|
||||
Identify {
|
||||
config,
|
||||
connected: HashMap::new(),
|
||||
pending_replies: VecDeque::new(),
|
||||
events: VecDeque::new(),
|
||||
pending_push: HashSet::new(),
|
||||
discovered_peers,
|
||||
}
|
||||
}
|
||||
|
||||
@ -254,6 +275,10 @@ impl NetworkBehaviour for Identify {
|
||||
) {
|
||||
match event {
|
||||
IdentifyHandlerEvent::Identified(info) => {
|
||||
// Replace existing addresses to prevent other peer from filling up our memory.
|
||||
self.discovered_peers
|
||||
.put(peer_id, HashSet::from_iter(info.listen_addrs.clone()));
|
||||
|
||||
let observed = info.observed_addr.clone();
|
||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||
IdentifyEvent::Received { peer_id, info },
|
||||
@ -388,6 +413,27 @@ impl NetworkBehaviour for Identify {
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
|
||||
self.discovered_peers
|
||||
.get(peer)
|
||||
.cloned()
|
||||
.map(|addr| Vec::from_iter(addr))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn inject_addr_reach_failure(
|
||||
&mut self,
|
||||
peer_id: Option<&PeerId>,
|
||||
addr: &Multiaddr,
|
||||
_: &dyn std::error::Error,
|
||||
) {
|
||||
if let Some(peer) = peer_id {
|
||||
if let Some(entry) = self.discovered_peers.get_mut(peer) {
|
||||
entry.remove(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Event emitted by the `Identify` behaviour.
|
||||
@ -552,11 +598,7 @@ mod tests {
|
||||
|
||||
let (mut swarm1, pubkey1) = {
|
||||
let (pubkey, transport) = transport();
|
||||
let protocol = Identify::new(
|
||||
IdentifyConfig::new("a".to_string(), pubkey.clone())
|
||||
// Delay identification requests so we can test the push protocol.
|
||||
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
|
||||
);
|
||||
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));
|
||||
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
|
||||
(swarm, pubkey)
|
||||
};
|
||||
@ -565,9 +607,7 @@ mod tests {
|
||||
let (pubkey, transport) = transport();
|
||||
let protocol = Identify::new(
|
||||
IdentifyConfig::new("a".to_string(), pubkey.clone())
|
||||
.with_agent_version("b".to_string())
|
||||
// Delay identification requests so we can test the push protocol.
|
||||
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
|
||||
.with_agent_version("b".to_string()),
|
||||
);
|
||||
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
|
||||
(swarm, pubkey)
|
||||
@ -626,4 +666,80 @@ mod tests {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn discover_peer_after_disconnect() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let mut swarm1 = {
|
||||
let (pubkey, transport) = transport();
|
||||
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));
|
||||
|
||||
Swarm::new(transport, protocol, pubkey.to_peer_id())
|
||||
};
|
||||
|
||||
let mut swarm2 = {
|
||||
let (pubkey, transport) = transport();
|
||||
let protocol = Identify::new(
|
||||
IdentifyConfig::new("a".to_string(), pubkey.clone())
|
||||
.with_agent_version("b".to_string()),
|
||||
);
|
||||
|
||||
Swarm::new(transport, protocol, pubkey.to_peer_id())
|
||||
};
|
||||
|
||||
let swarm1_peer_id = *swarm1.local_peer_id();
|
||||
|
||||
let listener = swarm1
|
||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
||||
.unwrap();
|
||||
|
||||
let listen_addr = async_std::task::block_on(async {
|
||||
loop {
|
||||
match swarm1.select_next_some().await {
|
||||
SwarmEvent::NewListenAddr {
|
||||
address,
|
||||
listener_id,
|
||||
} if listener_id == listener => return address,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
async_std::task::spawn(async move {
|
||||
loop {
|
||||
swarm1.next().await;
|
||||
}
|
||||
});
|
||||
|
||||
swarm2.dial_addr(listen_addr).unwrap();
|
||||
|
||||
// wait until we identified
|
||||
async_std::task::block_on(async {
|
||||
loop {
|
||||
if let SwarmEvent::Behaviour(IdentifyEvent::Received { .. }) =
|
||||
swarm2.select_next_some().await
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();
|
||||
|
||||
// we should still be able to dial now!
|
||||
swarm2.dial(&swarm1_peer_id).unwrap();
|
||||
|
||||
let connected_peer = async_std::task::block_on(async {
|
||||
loop {
|
||||
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
|
||||
swarm2.select_next_some().await
|
||||
{
|
||||
break peer_id;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assert_eq!(connected_peer, swarm1_peer_id);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user