feat(core)!: make ConnectionIds globally unique (#3327)

Instead of offering a public constructor, users are now no longer able to construct `ConnectionId`s at all. They only public API exposed are the derived traits. Internally, `ConnectionId`s are monotonically incremented using a static atomic counter, thus no two connections will ever get assigned the same ID.
This commit is contained in:
Thomas Eizinger
2023-01-24 03:29:41 +11:00
committed by GitHub
parent b0b035b2ff
commit 778f7a2d1a
5 changed files with 44 additions and 38 deletions

View File

@ -223,15 +223,17 @@ where
}
};
#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: peer,
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &endpoint,
failed_addresses: &[],
other_established: 0, // first connection
}));
if let Some(kind) = kind {
gs.on_connection_handler_event(peer, ConnectionId::new(1), HandlerEvent::PeerKind(kind));
#[allow(deprecated)]
gs.on_connection_handler_event(peer, ConnectionId::DUMMY, HandlerEvent::PeerKind(kind));
}
if explicit {
gs.add_explicit_peer(&peer);
@ -577,9 +579,10 @@ fn test_join() {
for _ in 0..3 {
let random_peer = PeerId::random();
// inform the behaviour of a new peer
#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: random_peer,
connection_id: ConnectionId::new(1),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: "/ip4/127.0.0.1".parse::<Multiaddr>().unwrap(),
role_override: Endpoint::Dialer,
@ -959,7 +962,10 @@ fn test_get_random_peers() {
*p,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new(1)],
connections: vec![
#[allow(deprecated)]
ConnectionId::DUMMY,
],
},
)
})
@ -3009,7 +3015,8 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
//receive from p1
gs.on_connection_handler_event(
p1,
ConnectionId::new(0),
#[allow(deprecated)]
ConnectionId::DUMMY,
HandlerEvent::Message {
rpc: GossipsubRpc {
messages: vec![raw_message1],
@ -3035,7 +3042,8 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
//receive from p2
gs.on_connection_handler_event(
p2,
ConnectionId::new(0),
#[allow(deprecated)]
ConnectionId::DUMMY,
HandlerEvent::Message {
rpc: GossipsubRpc {
messages: vec![raw_message3],
@ -3647,7 +3655,8 @@ fn test_scoring_p4_invalid_signature() {
gs.on_connection_handler_event(
peers[0],
ConnectionId::new(0),
#[allow(deprecated)]
ConnectionId::DUMMY,
HandlerEvent::Message {
rpc: GossipsubRpc {
messages: vec![],
@ -4128,10 +4137,11 @@ fn test_scoring_p6() {
}
//add additional connection for 3 others with addr
#[allow(deprecated)]
for id in others.iter().take(3) {
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: *id,
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: addr.clone(),
role_override: Endpoint::Dialer,
@ -4150,9 +4160,10 @@ fn test_scoring_p6() {
//add additional connection for 3 of the peers to addr2
for peer in peers.iter().take(3) {
#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: *peer,
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: addr2.clone(),
role_override: Endpoint::Dialer,
@ -4180,9 +4191,10 @@ fn test_scoring_p6() {
);
//two times same ip doesn't count twice
#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: peers[0],
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Dialer,
@ -5194,7 +5206,8 @@ fn test_subscribe_and_graft_with_negative_score() {
let (mut gs2, _, _) = inject_nodes1().create_network();
let connection_id = ConnectionId::new(0);
#[allow(deprecated)]
let connection_id = ConnectionId::DUMMY;
let topic = Topic::new("test");

View File

@ -1294,7 +1294,8 @@ fn network_behaviour_on_address_change() {
let local_peer_id = PeerId::random();
let remote_peer_id = PeerId::random();
let connection_id = ConnectionId::new(1);
#[allow(deprecated)]
let connection_id = ConnectionId::DUMMY;
let old_address: Multiaddr = Protocol::Memory(1).into();
let new_address: Multiaddr = Protocol::Memory(2).into();

View File

@ -22,11 +22,14 @@
If you have previously set `connection_event_buffer_size` you should re-evaluate what a good size for a _per connection_ buffer is.
See [PR 3188].
- Remove `ConnectionId::new`. Manually creating `ConnectionId`s is now unsupported. See [PR 3327].
[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
[PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272
[PR 3327]: https://github.com/libp2p/rust-libp2p/pull/3327
[PR 3188]: https://github.com/libp2p/rust-libp2p/pull/3188
# 0.41.1

View File

@ -45,30 +45,30 @@ use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply};
use libp2p_core::{upgrade, UpgradeError};
use libp2p_core::{Endpoint, PeerId};
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
use std::time::Duration;
use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll};
static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1);
/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConnectionId(usize);
impl ConnectionId {
/// Creates a `ConnectionId` from a non-negative integer.
/// A "dummy" [`ConnectionId`].
///
/// This is primarily useful for creating connection IDs
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
Self(id)
}
}
/// Really, you should not use this, not even for testing but it is here if you need it.
#[deprecated(
since = "0.42.0",
note = "Don't use this, it will be removed at a later stage again."
)]
pub const DUMMY: ConnectionId = ConnectionId(0);
impl std::ops::Add<usize> for ConnectionId {
type Output = Self;
fn add(self, other: usize) -> Self {
Self(self.0 + other)
/// Returns the next available [`ConnectionId`].
pub(crate) fn next() -> Self {
Self(NEXT_CONNECTION_ID.fetch_add(1, Ordering::SeqCst))
}
}

View File

@ -102,9 +102,6 @@ where
/// The pending connections that are currently being negotiated.
pending: HashMap<ConnectionId, PendingConnection<THandler>>,
/// Next available identifier for a new connection / task.
next_connection_id: ConnectionId,
/// Size of the task command buffer (per task).
task_command_buffer_size: usize,
@ -327,7 +324,6 @@ where
counters: ConnectionCounters::new(limits),
established: Default::default(),
pending: Default::default(),
next_connection_id: ConnectionId::new(0),
task_command_buffer_size: config.task_command_buffer_size,
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
@ -414,13 +410,6 @@ where
self.established.keys()
}
fn next_connection_id(&mut self) -> ConnectionId {
let connection_id = self.next_connection_id;
self.next_connection_id = self.next_connection_id + 1;
connection_id
}
fn spawn(&mut self, task: BoxFuture<'static, ()>) {
self.executor.spawn(task)
}
@ -460,7 +449,7 @@ where
dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
);
let connection_id = self.next_connection_id();
let connection_id = ConnectionId::next();
let (abort_notifier, abort_receiver) = oneshot::channel();
@ -510,7 +499,7 @@ where
return Err((limit, handler));
}
let connection_id = self.next_connection_id();
let connection_id = ConnectionId::next();
let (abort_notifier, abort_receiver) = oneshot::channel();