feat: report changes in supported protocols to ConnectionHandler

With this patch, implementations of `ConnectionHandler` (which are typically composed in a tree) can exchange information about the supported protocols of a remote with each other via `ConnectionHandlerEvent::ReportRemoteProtocols`. The provided `ProtocolSupport` enum can describe either additions or removals of the remote peer's protocols.

This information is aggregated in the connection and passed down to the `ConnectionHandler` via `ConnectionEvent::RemoteProtocolsChange`.

Similarly, if the listen protocols of a connection change, all `ConnectionHandler`s on the connection will be notified via `ConnectionEvent::LocalProtocolsChange`. This will allow us to eventually remove `PollParameters` from `NetworkBehaviour`.

This pattern allows protocols on a connection to communicate with each other. For example, protocols like identify can share the list of (supposedly) supported protocols by the remote with all other handlers. A protocol like kademlia can accurately add and remove a remote from its routing table as a result.

Resolves: #2680.
Related: #3124.

Pull-Request: #3651.
This commit is contained in:
Thomas Eizinger
2023-05-08 16:36:30 +02:00
committed by GitHub
parent b8a7684153
commit b035fc80a0
30 changed files with 844 additions and 243 deletions

View File

@ -21,18 +21,23 @@
mod error;
pub(crate) mod pool;
mod supported_protocols;
pub use error::ConnectionError;
pub(crate) use error::{
PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
};
pub use supported_protocols::SupportedProtocols;
use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, DialUpgradeError, FullyNegotiatedInbound,
FullyNegotiatedOutbound, ListenUpgradeError,
FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsAdded, ProtocolsChange,
UpgradeInfoSend,
};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
use crate::{ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol};
use crate::{
ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, SubstreamProtocol,
};
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
@ -47,6 +52,7 @@ use libp2p_core::upgrade::{
};
use libp2p_core::Endpoint;
use libp2p_identity::PeerId;
use std::collections::HashSet;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
@ -147,6 +153,9 @@ where
requested_substreams: FuturesUnordered<
SubstreamRequested<THandler::OutboundOpenInfo, THandler::OutboundProtocol>,
>,
local_supported_protocols: HashSet<StreamProtocol>,
remote_supported_protocols: HashSet<StreamProtocol>,
}
impl<THandler> fmt::Debug for Connection<THandler>
@ -171,10 +180,18 @@ where
/// and connection handler.
pub(crate) fn new(
muxer: StreamMuxerBox,
handler: THandler,
mut handler: THandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
) -> Self {
let initial_protocols = gather_supported_protocols(&handler);
if !initial_protocols.is_empty() {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(
ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)),
));
}
Connection {
muxing: muxer,
handler,
@ -184,6 +201,8 @@ where
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
requested_substreams: Default::default(),
local_supported_protocols: initial_protocols,
remote_supported_protocols: Default::default(),
}
}
@ -213,6 +232,8 @@ where
shutdown,
max_negotiating_inbound_streams,
substream_upgrade_protocol_override,
local_supported_protocols: supported_protocols,
remote_supported_protocols,
} = self.get_mut();
loop {
@ -246,6 +267,31 @@ where
Poll::Ready(ConnectionHandlerEvent::Close(err)) => {
return Poll::Ready(Err(ConnectionError::Handler(err)));
}
Poll::Ready(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Added(protocols),
)) => {
if let Some(added) =
ProtocolsChange::add(remote_supported_protocols, &protocols)
{
handler.on_connection_event(ConnectionEvent::RemoteProtocolsChange(added));
remote_supported_protocols.extend(protocols);
}
continue;
}
Poll::Ready(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Removed(protocols),
)) => {
if let Some(removed) =
ProtocolsChange::remove(remote_supported_protocols, &protocols)
{
handler
.on_connection_event(ConnectionEvent::RemoteProtocolsChange(removed));
remote_supported_protocols.retain(|p| !protocols.contains(p));
}
continue;
}
}
// In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams.
@ -376,9 +422,33 @@ where
}
}
let new_protocols = gather_supported_protocols(handler);
for change in ProtocolsChange::from_full_sets(supported_protocols, &new_protocols) {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
}
*supported_protocols = new_protocols;
return Poll::Pending; // Nothing can make progress, return `Pending`.
}
}
#[cfg(test)]
fn poll_noop_waker(
&mut self,
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
Pin::new(self).poll(&mut Context::from_waker(futures::task::noop_waker_ref()))
}
}
fn gather_supported_protocols(handler: &impl ConnectionHandler) -> HashSet<StreamProtocol> {
handler
.listen_protocol()
.upgrade()
.protocol_info()
.filter_map(|i| StreamProtocol::try_from_owned(i.as_ref().to_owned()).ok())
.collect()
}
/// Borrowed information about an incoming connection currently being negotiated.
@ -605,9 +675,10 @@ enum Shutdown {
mod tests {
use super::*;
use crate::keep_alive;
use futures::future;
use futures::AsyncRead;
use futures::AsyncWrite;
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::StreamMuxer;
use quickcheck::*;
use std::sync::{Arc, Weak};
@ -629,8 +700,7 @@ mod tests {
max_negotiating_inbound_streams,
);
let result = Pin::new(&mut connection)
.poll(&mut Context::from_waker(futures::task::noop_waker_ref()));
let result = connection.poll_noop_waker();
assert!(result.is_pending());
assert_eq!(
@ -654,13 +724,11 @@ mod tests {
);
connection.handler.open_new_outbound();
let _ = Pin::new(&mut connection)
.poll(&mut Context::from_waker(futures::task::noop_waker_ref()));
let _ = connection.poll_noop_waker();
std::thread::sleep(upgrade_timeout + Duration::from_secs(1));
let _ = Pin::new(&mut connection)
.poll(&mut Context::from_waker(futures::task::noop_waker_ref()));
let _ = connection.poll_noop_waker();
assert!(matches!(
connection.handler.error.unwrap(),
@ -668,6 +736,94 @@ mod tests {
))
}
#[test]
fn propagates_changes_to_supported_inbound_protocols() {
let mut connection = Connection::new(
StreamMuxerBox::new(PendingStreamMuxer),
ConfigurableProtocolConnectionHandler::default(),
None,
0,
);
// First, start listening on a single protocol.
connection.handler.listen_on(&["/foo"]);
let _ = connection.poll_noop_waker();
assert_eq!(connection.handler.local_added, vec![vec!["/foo"]]);
assert!(connection.handler.local_removed.is_empty());
// Second, listen on two protocols.
connection.handler.listen_on(&["/foo", "/bar"]);
let _ = connection.poll_noop_waker();
assert_eq!(
connection.handler.local_added,
vec![vec!["/foo"], vec!["/bar"]],
"expect to only receive an event for the newly added protocols"
);
assert!(connection.handler.local_removed.is_empty());
// Third, stop listening on the first protocol.
connection.handler.listen_on(&["/bar"]);
let _ = connection.poll_noop_waker();
assert_eq!(
connection.handler.local_added,
vec![vec!["/foo"], vec!["/bar"]]
);
assert_eq!(connection.handler.local_removed, vec![vec!["/foo"]]);
}
#[test]
fn only_propagtes_actual_changes_to_remote_protocols_to_handler() {
let mut connection = Connection::new(
StreamMuxerBox::new(PendingStreamMuxer),
ConfigurableProtocolConnectionHandler::default(),
None,
0,
);
// First, remote supports a single protocol.
connection.handler.remote_adds_support_for(&["/foo"]);
let _ = connection.poll_noop_waker();
assert_eq!(connection.handler.remote_added, vec![vec!["/foo"]]);
assert!(connection.handler.remote_removed.is_empty());
// Second, it adds a protocol but also still includes the first one.
connection
.handler
.remote_adds_support_for(&["/foo", "/bar"]);
let _ = connection.poll_noop_waker();
assert_eq!(
connection.handler.remote_added,
vec![vec!["/foo"], vec!["/bar"]],
"expect to only receive an event for the newly added protocol"
);
assert!(connection.handler.remote_removed.is_empty());
// Third, stop listening on a protocol it never advertised (we can't control what handlers do so this needs to be handled gracefully).
connection.handler.remote_removes_support_for(&["/baz"]);
let _ = connection.poll_noop_waker();
assert_eq!(
connection.handler.remote_added,
vec![vec!["/foo"], vec!["/bar"]]
);
assert!(&connection.handler.remote_removed.is_empty());
// Fourth, stop listening on a protocol that was previously supported
connection.handler.remote_removes_support_for(&["/bar"]);
let _ = connection.poll_noop_waker();
assert_eq!(
connection.handler.remote_added,
vec![vec!["/foo"], vec!["/bar"]]
);
assert_eq!(connection.handler.remote_removed, vec![vec!["/bar"]]);
}
struct DummyStreamMuxer {
counter: Arc<()>,
}
@ -785,6 +941,40 @@ mod tests {
}
}
#[derive(Default)]
struct ConfigurableProtocolConnectionHandler {
events: Vec<ConnectionHandlerEvent<DeniedUpgrade, (), Void, Void>>,
active_protocols: HashSet<StreamProtocol>,
local_added: Vec<Vec<StreamProtocol>>,
local_removed: Vec<Vec<StreamProtocol>>,
remote_added: Vec<Vec<StreamProtocol>>,
remote_removed: Vec<Vec<StreamProtocol>>,
}
impl ConfigurableProtocolConnectionHandler {
fn listen_on(&mut self, protocols: &[&'static str]) {
self.active_protocols = protocols.iter().copied().map(StreamProtocol::new).collect();
}
fn remote_adds_support_for(&mut self, protocols: &[&'static str]) {
self.events
.push(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Added(
protocols.iter().copied().map(StreamProtocol::new).collect(),
),
));
}
fn remote_removes_support_for(&mut self, protocols: &[&'static str]) {
self.events
.push(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Removed(
protocols.iter().copied().map(StreamProtocol::new).collect(),
),
));
}
}
impl ConnectionHandler for MockConnectionHandler {
type InEvent = Void;
type OutEvent = Void;
@ -821,7 +1011,10 @@ mod tests {
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.error = Some(error)
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
@ -855,6 +1048,112 @@ mod tests {
Poll::Pending
}
}
impl ConnectionHandler for ConfigurableProtocolConnectionHandler {
type InEvent = Void;
type OutEvent = Void;
type Error = Void;
type InboundProtocol = ManyProtocolsUpgrade;
type OutboundProtocol = DeniedUpgrade;
type InboundOpenInfo = ();
type OutboundOpenInfo = ();
fn listen_protocol(
&self,
) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(
ManyProtocolsUpgrade {
protocols: Vec::from_iter(self.active_protocols.clone()),
},
(),
)
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::LocalProtocolsChange(ProtocolsChange::Added(added)) => {
self.local_added.push(added.cloned().collect())
}
ConnectionEvent::LocalProtocolsChange(ProtocolsChange::Removed(removed)) => {
self.local_removed.push(removed.cloned().collect())
}
ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Added(added)) => {
self.remote_added.push(added.cloned().collect())
}
ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Removed(removed)) => {
self.remote_removed.push(removed.cloned().collect())
}
_ => {}
}
}
fn on_behaviour_event(&mut self, event: Self::InEvent) {
void::unreachable(event)
}
fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::Yes
}
fn poll(
&mut self,
_: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
if let Some(event) = self.events.pop() {
return Poll::Ready(event);
}
Poll::Pending
}
}
struct ManyProtocolsUpgrade {
protocols: Vec<StreamProtocol>,
}
impl UpgradeInfo for ManyProtocolsUpgrade {
type Info = StreamProtocol;
type InfoIter = std::vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
self.protocols.clone().into_iter()
}
}
impl<C> InboundUpgrade<C> for ManyProtocolsUpgrade {
type Output = C;
type Error = Void;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, stream: C, _: Self::Info) -> Self::Future {
future::ready(Ok(stream))
}
}
impl<C> OutboundUpgrade<C> for ManyProtocolsUpgrade {
type Output = C;
type Error = Void;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, stream: C, _: Self::Info) -> Self::Future {
future::ready(Ok(stream))
}
}
}
/// The endpoint roles associated with a pending peer-to-peer connection.