refactor(dcutr): remove ActionBuilder. (#3304)

addresses #3299
This commit is contained in:
João Oliveira 2023-01-10 03:43:59 +00:00 committed by GitHub
parent 8f70fedbbf
commit 20ce07c7d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -21,7 +21,6 @@
//! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node. //! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node.
use crate::handler; use crate::handler;
use crate::protocol;
use either::Either; use either::Either;
use libp2p_core::connection::{ConnectedPoint, ConnectionId}; use libp2p_core::connection::{ConnectedPoint, ConnectionId};
use libp2p_core::multiaddr::Protocol; use libp2p_core::multiaddr::Protocol;
@ -68,7 +67,7 @@ pub enum Error {
pub struct Behaviour { pub struct Behaviour {
/// Queue of actions to return when polled. /// Queue of actions to return when polled.
queued_actions: VecDeque<ActionBuilder>, queued_events: VecDeque<NetworkBehaviourAction<Event, handler::Prototype>>,
/// All direct (non-relayed) connections. /// All direct (non-relayed) connections.
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>, direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,
@ -81,13 +80,22 @@ pub struct Behaviour {
impl Behaviour { impl Behaviour {
pub fn new(local_peer_id: PeerId) -> Self { pub fn new(local_peer_id: PeerId) -> Self {
Behaviour { Behaviour {
queued_actions: Default::default(), queued_events: Default::default(),
direct_connections: Default::default(), direct_connections: Default::default(),
external_addresses: Default::default(), external_addresses: Default::default(),
local_peer_id, local_peer_id,
} }
} }
fn observed_addreses(&self) -> Vec<Multiaddr> {
self.external_addresses
.iter()
.cloned()
.filter(|a| !a.iter().any(|p| p == Protocol::P2pCircuit))
.map(|a| a.with(Protocol::P2p(self.local_peer_id.into())))
.collect()
}
fn on_connection_established( fn on_connection_established(
&mut self, &mut self,
ConnectionEstablished { ConnectionEstablished {
@ -108,11 +116,14 @@ impl Behaviour {
// connection upgrade by initiating a direct connection to A. // connection upgrade by initiating a direct connection to A.
// //
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
self.queued_actions.extend([ self.queued_events.extend([
ActionBuilder::Connect { NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
attempt: 1,
handler: NotifyHandler::One(connection_id), handler: NotifyHandler::One(connection_id),
event: Either::Left(handler::relayed::Command::Connect {
obs_addrs: self.observed_addreses(),
attempt: 1,
}),
}, },
NetworkBehaviourAction::GenerateEvent( NetworkBehaviourAction::GenerateEvent(
Event::InitiatedDirectConnectionUpgrade { Event::InitiatedDirectConnectionUpgrade {
@ -122,8 +133,7 @@ impl Behaviour {
ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."),
}, },
}, },
) ),
.into(),
]); ]);
} }
} else { } else {
@ -147,26 +157,28 @@ impl Behaviour {
{ {
let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known.");
if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
self.queued_actions.push_back(ActionBuilder::Connect { self.queued_events
peer_id, .push_back(NetworkBehaviourAction::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id), handler: NotifyHandler::One(relayed_connection_id),
attempt: attempt + 1, peer_id,
}); event: Either::Left(handler::relayed::Command::Connect {
attempt: attempt + 1,
obs_addrs: self.observed_addreses(),
}),
})
} else { } else {
self.queued_actions.extend([ self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler { NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(relayed_connection_id), handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left( event: Either::Left(
handler::relayed::Command::UpgradeFinishedDontKeepAlive, handler::relayed::Command::UpgradeFinishedDontKeepAlive,
), ),
} },
.into(),
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id, remote_peer_id: peer_id,
error: Error::Dial, error: Error::Dial,
}) }),
.into(),
]); ]);
} }
} }
@ -217,93 +229,87 @@ impl NetworkBehaviour for Behaviour {
inbound_connect, inbound_connect,
remote_addr, remote_addr,
}) => { }) => {
self.queued_actions.extend([ self.queued_events.extend([
ActionBuilder::AcceptInboundConnect { NetworkBehaviourAction::NotifyHandler {
peer_id: event_source,
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
inbound_connect, peer_id: event_source,
event: Either::Left(handler::relayed::Command::AcceptInboundConnect {
inbound_connect,
obs_addrs: self.observed_addreses(),
}),
}, },
NetworkBehaviourAction::GenerateEvent( NetworkBehaviourAction::GenerateEvent(
Event::RemoteInitiatedDirectConnectionUpgrade { Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id: event_source, remote_peer_id: event_source,
remote_relayed_addr: remote_addr, remote_relayed_addr: remote_addr,
}, },
) ),
.into(),
]); ]);
} }
Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => {
self.queued_actions.push_back( self.queued_events
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { .push_back(NetworkBehaviourAction::GenerateEvent(
remote_peer_id: event_source, Event::DirectConnectionUpgradeFailed {
error: Error::Handler(error), remote_peer_id: event_source,
}) error: Error::Handler(error),
.into(), },
); ));
} }
Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => {
self.queued_actions.push_back( self.queued_events.push_back(NetworkBehaviourAction::Dial {
NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(event_source)
opts: DialOpts::peer_id(event_source) .addresses(remote_addrs)
.addresses(remote_addrs) .condition(dial_opts::PeerCondition::Always)
.condition(dial_opts::PeerCondition::Always) .build(),
.build(), handler: handler::Prototype::DirectConnection {
handler: handler::Prototype::DirectConnection { relayed_connection_id: connection,
relayed_connection_id: connection, role: handler::Role::Listener,
role: handler::Role::Listener, },
}, });
}
.into(),
);
} }
Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => {
self.queued_actions.push_back( self.queued_events
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { .push_back(NetworkBehaviourAction::GenerateEvent(
remote_peer_id: event_source, Event::DirectConnectionUpgradeFailed {
error: Error::Handler(error), remote_peer_id: event_source,
}) error: Error::Handler(error),
.into(), },
); ));
} }
Either::Left(handler::relayed::Event::OutboundConnectNegotiated { Either::Left(handler::relayed::Event::OutboundConnectNegotiated {
remote_addrs, remote_addrs,
attempt, attempt,
}) => { }) => {
self.queued_actions.push_back( self.queued_events.push_back(NetworkBehaviourAction::Dial {
NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(event_source)
opts: DialOpts::peer_id(event_source) .condition(dial_opts::PeerCondition::Always)
.condition(dial_opts::PeerCondition::Always) .addresses(remote_addrs)
.addresses(remote_addrs) .override_role()
.override_role() .build(),
.build(), handler: handler::Prototype::DirectConnection {
handler: handler::Prototype::DirectConnection { relayed_connection_id: connection,
relayed_connection_id: connection, role: handler::Role::Initiator { attempt },
role: handler::Role::Initiator { attempt }, },
}, });
}
.into(),
);
} }
Either::Right(Either::Left( Either::Right(Either::Left(
handler::direct::Event::DirectConnectionUpgradeSucceeded { handler::direct::Event::DirectConnectionUpgradeSucceeded {
relayed_connection_id, relayed_connection_id,
}, },
)) => { )) => {
self.queued_actions.extend([ self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler { NetworkBehaviourAction::NotifyHandler {
peer_id: event_source, peer_id: event_source,
handler: NotifyHandler::One(relayed_connection_id), handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left( event: Either::Left(
handler::relayed::Command::UpgradeFinishedDontKeepAlive, handler::relayed::Command::UpgradeFinishedDontKeepAlive,
), ),
} },
.into(),
NetworkBehaviourAction::GenerateEvent( NetworkBehaviourAction::GenerateEvent(
Event::DirectConnectionUpgradeSucceeded { Event::DirectConnectionUpgradeSucceeded {
remote_peer_id: event_source, remote_peer_id: event_source,
}, },
) ),
.into(),
]); ]);
} }
Either::Right(Either::Right(event)) => void::unreachable(event), Either::Right(Either::Right(event)) => void::unreachable(event),
@ -315,8 +321,8 @@ impl NetworkBehaviour for Behaviour {
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> { ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(action) = self.queued_actions.pop_front() { if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); return Poll::Ready(event);
} }
Poll::Pending Poll::Pending
@ -345,70 +351,3 @@ impl NetworkBehaviour for Behaviour {
} }
} }
} }
/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`]
/// before being returned in [`Behaviour::poll`].
enum ActionBuilder {
Done(NetworkBehaviourAction<Event, handler::Prototype>),
Connect {
attempt: u8,
handler: NotifyHandler,
peer_id: PeerId,
},
AcceptInboundConnect {
inbound_connect: Box<protocol::inbound::PendingConnect>,
handler: NotifyHandler,
peer_id: PeerId,
},
}
impl From<NetworkBehaviourAction<Event, handler::Prototype>> for ActionBuilder {
fn from(action: NetworkBehaviourAction<Event, handler::Prototype>) -> Self {
Self::Done(action)
}
}
impl ActionBuilder {
fn build(
self,
local_peer_id: PeerId,
external_addresses: &ExternalAddresses,
) -> NetworkBehaviourAction<Event, handler::Prototype> {
let obs_addrs = || {
external_addresses
.iter()
.cloned()
.filter(|a| !a.iter().any(|p| p == Protocol::P2pCircuit))
.map(|a| a.with(Protocol::P2p(local_peer_id.into())))
.collect()
};
match self {
ActionBuilder::Done(action) => action,
ActionBuilder::AcceptInboundConnect {
inbound_connect,
handler,
peer_id,
} => NetworkBehaviourAction::NotifyHandler {
handler,
peer_id,
event: Either::Left(handler::relayed::Command::AcceptInboundConnect {
inbound_connect,
obs_addrs: obs_addrs(),
}),
},
ActionBuilder::Connect {
attempt,
handler,
peer_id,
} => NetworkBehaviourAction::NotifyHandler {
handler,
peer_id,
event: Either::Left(handler::relayed::Command::Connect {
attempt,
obs_addrs: obs_addrs(),
}),
},
}
}
}