swarm: Extend NetworkBehaviour callbacks. (#2011)

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
David Craven
2021-03-24 17:21:53 +01:00
committed by GitHub
parent be2fb4ea8a
commit 7779b8e2c1
9 changed files with 155 additions and 49 deletions

View File

@ -187,7 +187,7 @@ where
/// ///
/// The translation is transport-specific. See [`Transport::address_translation`]. /// The translation is transport-specific. See [`Transport::address_translation`].
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr) pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
-> impl Iterator<Item = Multiaddr> + 'a -> Vec<Multiaddr>
where where
TMuxer: 'a, TMuxer: 'a,
THandler: 'a, THandler: 'a,
@ -201,7 +201,7 @@ where
addrs.sort_unstable(); addrs.sort_unstable();
addrs.dedup(); addrs.dedup();
addrs.into_iter() addrs
} }
/// Returns the peer id of the local node. /// Returns the peer id of the local node.

View File

@ -26,7 +26,7 @@ use libp2p_core::{
Multiaddr, Multiaddr,
PeerId, PeerId,
PublicKey, PublicKey,
connection::ConnectionId, connection::{ConnectionId, ListenerId},
upgrade::UpgradeError upgrade::UpgradeError
}; };
use libp2p_swarm::{ use libp2p_swarm::{
@ -233,13 +233,13 @@ impl NetworkBehaviour for Identify {
self.pending_push.remove(peer_id); self.pending_push.remove(peer_id);
} }
fn inject_new_listen_addr(&mut self, _addr: &Multiaddr) { fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
if self.config.push_listen_addr_updates { if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys()); self.pending_push.extend(self.connected.keys());
} }
} }
fn inject_expired_listen_addr(&mut self, _addr: &Multiaddr) { fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
if self.config.push_listen_addr_updates { if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys()); self.pending_push.extend(self.connected.keys());
} }

View File

@ -37,7 +37,7 @@ use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState}; use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord}; use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet}; use fnv::{FnvHashMap, FnvHashSet};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::{ConnectionId, ListenerId}};
use libp2p_swarm::{ use libp2p_swarm::{
DialPeerCondition, DialPeerCondition,
NetworkBehaviour, NetworkBehaviour,
@ -1888,11 +1888,11 @@ where
}; };
} }
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { fn inject_new_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
self.local_addrs.insert(addr.clone()); self.local_addrs.insert(addr.clone());
} }
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { fn inject_expired_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
self.local_addrs.remove(addr); self.local_addrs.remove(addr);
} }

View File

@ -244,6 +244,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}) })
}; };
// Build the list of statements to put in the body of `inject_new_listener()`.
let inject_new_listener_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}
Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_new_listener(id); },
None => quote!{ self.#field_n.inject_new_listener(id); },
})
})
};
// Build the list of statements to put in the body of `inject_new_listen_addr()`. // Build the list of statements to put in the body of `inject_new_listen_addr()`.
let inject_new_listen_addr_stmts = { let inject_new_listen_addr_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
@ -252,8 +266,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
} }
Some(match field.ident { Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_new_listen_addr(addr); }, Some(ref i) => quote!{ self.#i.inject_new_listen_addr(id, addr); },
None => quote!{ self.#field_n.inject_new_listen_addr(addr); }, None => quote!{ self.#field_n.inject_new_listen_addr(id, addr); },
}) })
}) })
}; };
@ -266,8 +280,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
} }
Some(match field.ident { Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(addr); }, Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(id, addr); },
None => quote!{ self.#field_n.inject_expired_listen_addr(addr); }, None => quote!{ self.#field_n.inject_expired_listen_addr(id, addr); },
}) })
}) })
}; };
@ -286,6 +300,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}) })
}; };
// Build the list of statements to put in the body of `inject_expired_external_addr()`.
let inject_expired_external_addr_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}
Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_expired_external_addr(addr); },
None => quote!{ self.#field_n.inject_expired_external_addr(addr); },
})
})
};
// Build the list of statements to put in the body of `inject_listener_error()`. // Build the list of statements to put in the body of `inject_listener_error()`.
let inject_listener_error_stmts = { let inject_listener_error_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
@ -504,11 +532,15 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_dial_failure_stmts);* #(#inject_dial_failure_stmts);*
} }
fn inject_new_listen_addr(&mut self, addr: &#multiaddr) { fn inject_new_listener(&mut self, id: #listener_id) {
#(#inject_new_listener_stmts);*
}
fn inject_new_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
#(#inject_new_listen_addr_stmts);* #(#inject_new_listen_addr_stmts);*
} }
fn inject_expired_listen_addr(&mut self, addr: &#multiaddr) { fn inject_expired_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
#(#inject_expired_listen_addr_stmts);* #(#inject_expired_listen_addr_stmts);*
} }
@ -516,6 +548,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_new_external_addr_stmts);* #(#inject_new_external_addr_stmts);*
} }
fn inject_expired_external_addr(&mut self, addr: &#multiaddr) {
#(#inject_expired_external_addr_stmts);*
}
fn inject_listener_error(&mut self, id: #listener_id, err: &(dyn std::error::Error + 'static)) { fn inject_listener_error(&mut self, id: #listener_id, err: &(dyn std::error::Error + 'static)) {
#(#inject_listener_error_stmts);* #(#inject_listener_error_stmts);*
} }

View File

@ -147,17 +147,17 @@ pub trait NetworkBehaviour: Send + 'static {
fn inject_dial_failure(&mut self, _peer_id: &PeerId) { fn inject_dial_failure(&mut self, _peer_id: &PeerId) {
} }
/// Indicates to the behaviour that a new listener was created.
fn inject_new_listener(&mut self, _id: ListenerId) {
}
/// Indicates to the behaviour that we have started listening on a new multiaddr. /// Indicates to the behaviour that we have started listening on a new multiaddr.
fn inject_new_listen_addr(&mut self, _addr: &Multiaddr) { fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
} }
/// Indicates to the behaviour that a new multiaddr we were listening on has expired, /// Indicates to the behaviour that a multiaddr we were listening on has expired,
/// which means that we are no longer listening in it. /// which means that we are no longer listening in it.
fn inject_expired_listen_addr(&mut self, _addr: &Multiaddr) { fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
}
/// Indicates to the behaviour that we have discovered a new external address for us.
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
} }
/// A listener experienced an error. /// A listener experienced an error.
@ -168,6 +168,14 @@ pub trait NetworkBehaviour: Send + 'static {
fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) { fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) {
} }
/// Indicates to the behaviour that we have discovered a new external address for us.
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
}
/// Indicates to the behaviour that an external address was removed.
fn inject_expired_external_addr(&mut self, _addr: &Multiaddr) {
}
/// Polls for things that swarm should do. /// Polls for things that swarm should do.
/// ///
/// This API mimics the API of the `Stream` trait. The method may register the current task in /// This API mimics the API of the `Stream` trait. The method may register the current task in

View File

@ -327,7 +327,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// ///
/// Returns an error if the address is not supported. /// Returns an error if the address is not supported.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> { pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
self.network.listen_on(addr) let id = self.network.listen_on(addr)?;
self.behaviour.inject_new_listener(id);
Ok(id)
} }
/// Remove some listener. /// Remove some listener.
@ -412,7 +414,18 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly /// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
/// through this method. /// through this method.
pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult { pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
self.external_addrs.add(a, s) let result = self.external_addrs.add(a.clone(), s);
let expired = match &result {
AddAddressResult::Inserted { expired } => {
self.behaviour.inject_new_external_addr(&a);
expired
}
AddAddressResult::Updated { expired } => expired,
};
for a in expired {
self.behaviour.inject_expired_external_addr(&a.addr);
}
result
} }
/// Removes an external address of the local node, regardless of /// Removes an external address of the local node, regardless of
@ -422,7 +435,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Returns `true` if the address existed and was removed, `false` /// Returns `true` if the address existed and was removed, `false`
/// otherwise. /// otherwise.
pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool { pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool {
self.external_addrs.remove(addr) if self.external_addrs.remove(addr) {
self.behaviour.inject_expired_external_addr(addr);
true
} else {
false
}
} }
/// Bans a peer by its peer ID. /// Bans a peer by its peer ID.
@ -565,19 +583,19 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
if !this.listened_addrs.contains(&listen_addr) { if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone()) this.listened_addrs.push(listen_addr.clone())
} }
this.behaviour.inject_new_listen_addr(&listen_addr); this.behaviour.inject_new_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr)); return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
} }
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => { Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr); log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
this.listened_addrs.retain(|a| a != &listen_addr); this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr); this.behaviour.inject_expired_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr)); return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
} }
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => { Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() { for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(addr); this.behaviour.inject_expired_listen_addr(listener_id, addr);
} }
this.behaviour.inject_listener_closed(listener_id, match &reason { this.behaviour.inject_listener_closed(listener_id, match &reason {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -732,10 +750,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}, },
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => { Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
for addr in this.network.address_translation(&address) { for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| a.addr != addr) { this.add_external_address(addr, score);
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr, score);
} }
}, },
} }

View File

@ -168,8 +168,8 @@ impl Default for Addresses {
/// The result of adding an address to an ordered list of /// The result of adding an address to an ordered list of
/// addresses with associated scores. /// addresses with associated scores.
pub enum AddAddressResult { pub enum AddAddressResult {
Inserted, Inserted { expired: SmallVec<[AddressRecord; 8]> },
Updated, Updated { expired: SmallVec<[AddressRecord; 8]> },
} }
impl Addresses { impl Addresses {
@ -206,8 +206,11 @@ impl Addresses {
} }
// Remove addresses that have a score of 0. // Remove addresses that have a score of 0.
let mut expired = SmallVec::new();
while self.registry.last().map(|e| e.score.is_zero()).unwrap_or(false) { while self.registry.last().map(|e| e.score.is_zero()).unwrap_or(false) {
self.registry.pop(); if let Some(addr) = self.registry.pop() {
expired.push(addr);
}
} }
// If the address score is finite, remember this report. // If the address score is finite, remember this report.
@ -220,13 +223,13 @@ impl Addresses {
if r.addr == addr { if r.addr == addr {
r.score = r.score + score; r.score = r.score + score;
isort(&mut self.registry); isort(&mut self.registry);
return AddAddressResult::Updated return AddAddressResult::Updated { expired }
} }
} }
// It is a new record. // It is a new record.
self.registry.push(AddressRecord::new(addr, score)); self.registry.push(AddressRecord::new(addr, score));
AddAddressResult::Inserted AddAddressResult::Inserted { expired }
} }
/// Explicitly remove an address from the collection. /// Explicitly remove an address from the collection.

View File

@ -115,9 +115,11 @@ where
pub inject_event: Vec<(PeerId, ConnectionId, <<TInner::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent)>, pub inject_event: Vec<(PeerId, ConnectionId, <<TInner::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent)>,
pub inject_addr_reach_failure: Vec<(Option<PeerId>, Multiaddr)>, pub inject_addr_reach_failure: Vec<(Option<PeerId>, Multiaddr)>,
pub inject_dial_failure: Vec<PeerId>, pub inject_dial_failure: Vec<PeerId>,
pub inject_new_listen_addr: Vec<Multiaddr>, pub inject_new_listener: Vec<ListenerId>,
pub inject_new_listen_addr: Vec<(ListenerId, Multiaddr)>,
pub inject_new_external_addr: Vec<Multiaddr>, pub inject_new_external_addr: Vec<Multiaddr>,
pub inject_expired_listen_addr: Vec<Multiaddr>, pub inject_expired_listen_addr: Vec<(ListenerId, Multiaddr)>,
pub inject_expired_external_addr: Vec<Multiaddr>,
pub inject_listener_error: Vec<ListenerId>, pub inject_listener_error: Vec<ListenerId>,
pub inject_listener_closed: Vec<(ListenerId, bool)>, pub inject_listener_closed: Vec<(ListenerId, bool)>,
pub poll: usize, pub poll: usize,
@ -138,9 +140,11 @@ where
inject_event: Vec::new(), inject_event: Vec::new(),
inject_addr_reach_failure: Vec::new(), inject_addr_reach_failure: Vec::new(),
inject_dial_failure: Vec::new(), inject_dial_failure: Vec::new(),
inject_new_listener: Vec::new(),
inject_new_listen_addr: Vec::new(), inject_new_listen_addr: Vec::new(),
inject_new_external_addr: Vec::new(), inject_new_external_addr: Vec::new(),
inject_expired_listen_addr: Vec::new(), inject_expired_listen_addr: Vec::new(),
inject_expired_external_addr: Vec::new(),
inject_listener_error: Vec::new(), inject_listener_error: Vec::new(),
inject_listener_closed: Vec::new(), inject_listener_closed: Vec::new(),
poll: 0, poll: 0,
@ -217,14 +221,19 @@ where
self.inner.inject_dial_failure(p); self.inner.inject_dial_failure(p);
} }
fn inject_new_listen_addr(&mut self, a: &Multiaddr) { fn inject_new_listener(&mut self, id: ListenerId) {
self.inject_new_listen_addr.push(a.clone()); self.inject_new_listener.push(id);
self.inner.inject_new_listen_addr(a); self.inner.inject_new_listener(id);
} }
fn inject_expired_listen_addr(&mut self, a: &Multiaddr) { fn inject_new_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
self.inject_expired_listen_addr.push(a.clone()); self.inject_new_listen_addr.push((id, a.clone()));
self.inner.inject_expired_listen_addr(a); self.inner.inject_new_listen_addr(id, a);
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
self.inject_expired_listen_addr.push((id, a.clone()));
self.inner.inject_expired_listen_addr(id, a);
} }
fn inject_new_external_addr(&mut self, a: &Multiaddr) { fn inject_new_external_addr(&mut self, a: &Multiaddr) {
@ -232,6 +241,11 @@ where
self.inner.inject_new_external_addr(a); self.inner.inject_new_external_addr(a);
} }
fn inject_expired_external_addr(&mut self, a: &Multiaddr) {
self.inject_expired_external_addr.push(a.clone());
self.inner.inject_expired_external_addr(a);
}
fn inject_listener_error(&mut self, l: ListenerId, e: &(dyn std::error::Error + 'static)) { fn inject_listener_error(&mut self, l: ListenerId, e: &(dyn std::error::Error + 'static)) {
self.inject_listener_error.push(l.clone()); self.inject_listener_error.push(l.clone());
self.inner.inject_listener_error(l, e); self.inner.inject_listener_error(l, e);

View File

@ -33,7 +33,7 @@ use libp2p_core::{
ConnectedPoint, ConnectedPoint,
PeerId, PeerId,
Multiaddr, Multiaddr,
connection::ConnectionId, connection::{ConnectionId, ListenerId},
either::{EitherError, EitherOutput}, either::{EitherError, EitherOutput},
upgrade::{DeniedUpgrade, EitherUpgrade} upgrade::{DeniedUpgrade, EitherUpgrade}
}; };
@ -110,6 +110,12 @@ where
} }
} }
fn inject_address_change(&mut self, peer_id: &PeerId, connection: &ConnectionId, old: &ConnectedPoint, new: &ConnectedPoint) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_address_change(peer_id, connection, old, new)
}
}
fn inject_event( fn inject_event(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
@ -133,15 +139,21 @@ where
} }
} }
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { fn inject_new_listener(&mut self, id: ListenerId) {
if let Some(inner) = self.inner.as_mut() { if let Some(inner) = self.inner.as_mut() {
inner.inject_new_listen_addr(addr) inner.inject_new_listener(id)
} }
} }
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
if let Some(inner) = self.inner.as_mut() { if let Some(inner) = self.inner.as_mut() {
inner.inject_expired_listen_addr(addr) inner.inject_new_listen_addr(id, addr)
}
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_expired_listen_addr(id, addr)
} }
} }
@ -151,6 +163,24 @@ where
} }
} }
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_expired_external_addr(addr)
}
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_listener_error(id, err)
}
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_listener_closed(id, reason)
}
}
fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters)
-> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>> -> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>
{ {