mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-30 18:21:33 +00:00
protocols/kad: Implement NetworkBehaviour::inject_address_change (#1649)
With 826f513
a `StreamMuxer` can notify that the address of a remote
peer changed. This is needed to support the transport protocol QUIC as
remotes can change their IP addresses within the lifetime of a single
connection.
This commit implements the `NetworkBehaviour::inject_address_change`
handler to update the Kademlia routing table accordingly.
This commit is contained in:
@ -11,6 +11,9 @@
|
|||||||
a peer takes effect immediately without an orderly connection shutdown.
|
a peer takes effect immediately without an orderly connection shutdown.
|
||||||
See [PR 1619](https://github.com/libp2p/rust-libp2p/pull/1619) for further details.
|
See [PR 1619](https://github.com/libp2p/rust-libp2p/pull/1619) for further details.
|
||||||
|
|
||||||
|
- Add `ConnectedPoint::get_remote_address`
|
||||||
|
([PR 1649](https://github.com/libp2p/rust-libp2p/pull/1649)).
|
||||||
|
|
||||||
# 0.20.1 [2020-07-17]
|
# 0.20.1 [2020-07-17]
|
||||||
|
|
||||||
- Update ed25519-dalek dependency.
|
- Update ed25519-dalek dependency.
|
||||||
|
@ -133,6 +133,19 @@ impl ConnectedPoint {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the address of the remote stored in this struct.
|
||||||
|
///
|
||||||
|
/// For `Dialer`, this returns `address`. For `Listener`, this returns `send_back_addr`.
|
||||||
|
///
|
||||||
|
/// Note that the remote node might not be listening on this address and hence the address might
|
||||||
|
/// not be usable to establish new connections.
|
||||||
|
pub fn get_remote_address(&self) -> &Multiaddr {
|
||||||
|
match self {
|
||||||
|
ConnectedPoint::Dialer { address } => address,
|
||||||
|
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Modifies the address of the remote stored in this struct.
|
/// Modifies the address of the remote stored in this struct.
|
||||||
///
|
///
|
||||||
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
|
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
|
||||||
|
@ -6,6 +6,9 @@
|
|||||||
`Distance` for the bucket
|
`Distance` for the bucket
|
||||||
([PR 1680](https://github.com/libp2p/rust-libp2p/pull/1680)).
|
([PR 1680](https://github.com/libp2p/rust-libp2p/pull/1680)).
|
||||||
|
|
||||||
|
- Add `NetworkBehaviour::inject_address_change` implementation
|
||||||
|
([PR 1649](https://github.com/libp2p/rust-libp2p/pull/1649)).
|
||||||
|
|
||||||
# 0.21.0 [2020-07-01]
|
# 0.21.0 [2020-07-01]
|
||||||
|
|
||||||
- Remove `KademliaEvent::Discovered`
|
- Remove `KademliaEvent::Discovered`
|
||||||
|
@ -91,6 +91,19 @@ impl Addresses {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Replaces an old address with a new address.
|
||||||
|
///
|
||||||
|
/// Returns true if the previous address was found and replaced with a clone
|
||||||
|
/// of the new address, returns false otherwise.
|
||||||
|
pub fn replace(&mut self, old: &Multiaddr, new: &Multiaddr) -> bool {
|
||||||
|
if let Some(a) = self.addrs.iter_mut().find(|a| *a == old) {
|
||||||
|
*a = new.clone();
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Addresses {
|
impl fmt::Debug for Addresses {
|
||||||
|
@ -1449,6 +1449,59 @@ where
|
|||||||
self.connected_peers.insert(peer.clone());
|
self.connected_peers.insert(peer.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn inject_address_change(
|
||||||
|
&mut self,
|
||||||
|
peer: &PeerId,
|
||||||
|
_: &ConnectionId,
|
||||||
|
old: &ConnectedPoint,
|
||||||
|
new: &ConnectedPoint
|
||||||
|
) {
|
||||||
|
let (old, new) = (old.get_remote_address(), new.get_remote_address());
|
||||||
|
|
||||||
|
// Update routing table.
|
||||||
|
if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer.clone())).value() {
|
||||||
|
if addrs.replace(old, new) {
|
||||||
|
debug!("Address '{}' replaced with '{}' for peer '{}'.", old, new, peer);
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
|
||||||
|
present.",
|
||||||
|
old, new, peer,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
|
||||||
|
routing table.",
|
||||||
|
old, new, peer,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update query address cache.
|
||||||
|
//
|
||||||
|
// Given two connected nodes: local node A and remote node B. Say node B
|
||||||
|
// is not in node A's routing table. Additionally node B is part of the
|
||||||
|
// `QueryInner::addresses` list of an ongoing query on node A. Say Node
|
||||||
|
// B triggers an address change and then disconnects. Later on the
|
||||||
|
// earlier mentioned query on node A would like to connect to node B.
|
||||||
|
// Without replacing the address in the `QueryInner::addresses` set node
|
||||||
|
// A would attempt to dial the old and not the new address.
|
||||||
|
//
|
||||||
|
// While upholding correctness, iterating through all discovered
|
||||||
|
// addresses of a peer in all currently ongoing queries might have a
|
||||||
|
// large performance impact. If so, the code below might be worth
|
||||||
|
// revisiting.
|
||||||
|
for query in self.queries.iter_mut() {
|
||||||
|
if let Some(addrs) = query.inner.addresses.get_mut(peer) {
|
||||||
|
for addr in addrs.iter_mut() {
|
||||||
|
if addr == old {
|
||||||
|
*addr = new.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_addr_reach_failure(
|
fn inject_addr_reach_failure(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: Option<&PeerId>,
|
peer_id: Option<&PeerId>,
|
||||||
|
@ -32,6 +32,7 @@ use futures::{
|
|||||||
};
|
};
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
|
connection::{ConnectedPoint, ConnectionId},
|
||||||
PeerId,
|
PeerId,
|
||||||
Transport,
|
Transport,
|
||||||
identity,
|
identity,
|
||||||
@ -1099,3 +1100,41 @@ fn manual_bucket_inserts() {
|
|||||||
Poll::Pending
|
Poll::Pending
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn network_behaviour_inject_address_change() {
|
||||||
|
let local_peer_id = PeerId::random();
|
||||||
|
|
||||||
|
let remote_peer_id = PeerId::random();
|
||||||
|
let connection_id = ConnectionId::new(1);
|
||||||
|
let old_address: Multiaddr = Protocol::Memory(1).into();
|
||||||
|
let new_address: Multiaddr = Protocol::Memory(2).into();
|
||||||
|
|
||||||
|
let mut kademlia = Kademlia::new(
|
||||||
|
local_peer_id.clone(),
|
||||||
|
MemoryStore::new(local_peer_id),
|
||||||
|
);
|
||||||
|
|
||||||
|
kademlia.inject_connection_established(
|
||||||
|
&remote_peer_id,
|
||||||
|
&connection_id,
|
||||||
|
&ConnectedPoint::Dialer { address: old_address.clone() },
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![old_address.clone()],
|
||||||
|
kademlia.addresses_of_peer(&remote_peer_id),
|
||||||
|
);
|
||||||
|
|
||||||
|
kademlia.inject_address_change(
|
||||||
|
&remote_peer_id,
|
||||||
|
&connection_id,
|
||||||
|
&ConnectedPoint::Dialer { address: old_address.clone() },
|
||||||
|
&ConnectedPoint::Dialer { address: new_address.clone() },
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![new_address.clone()],
|
||||||
|
kademlia.addresses_of_peer(&remote_peer_id),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user