2022-02-13 21:57:38 +01:00
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod error ;
pub ( crate ) mod pool ;
2023-05-08 16:36:30 +02:00
mod supported_protocols ;
2022-02-13 21:57:38 +01:00
2023-04-26 09:31:56 +02:00
pub use error ::ConnectionError ;
pub ( crate ) use error ::{
PendingConnectionError , PendingInboundConnectionError , PendingOutboundConnectionError ,
2022-02-13 21:57:38 +01:00
} ;
2023-05-08 16:36:30 +02:00
pub use supported_protocols ::SupportedProtocols ;
2022-02-13 21:57:38 +01:00
2023-01-12 11:21:02 +00:00
use crate ::handler ::{
AddressChange , ConnectionEvent , ConnectionHandler , DialUpgradeError , FullyNegotiatedInbound ,
2023-05-08 16:36:30 +02:00
FullyNegotiatedOutbound , ListenUpgradeError , ProtocolSupport , ProtocolsAdded , ProtocolsChange ,
UpgradeInfoSend ,
2023-01-12 11:21:02 +00:00
} ;
2023-05-12 08:19:23 +02:00
use crate ::upgrade ::{ InboundUpgradeSend , OutboundUpgradeSend } ;
2023-05-08 16:36:30 +02:00
use crate ::{
2023-05-12 08:19:23 +02:00
ConnectionHandlerEvent , KeepAlive , Stream , StreamProtocol , StreamUpgradeError ,
SubstreamProtocol ,
2023-05-08 16:36:30 +02:00
} ;
2023-05-12 08:19:23 +02:00
use futures ::future ::BoxFuture ;
2022-09-21 23:02:21 +10:00
use futures ::stream ::FuturesUnordered ;
use futures ::FutureExt ;
use futures ::StreamExt ;
use futures_timer ::Delay ;
use instant ::Instant ;
2022-02-13 21:57:38 +01:00
use libp2p_core ::connection ::ConnectedPoint ;
use libp2p_core ::multiaddr ::Multiaddr ;
2022-09-21 23:02:21 +10:00
use libp2p_core ::muxing ::{ StreamMuxerBox , StreamMuxerEvent , StreamMuxerExt , SubstreamBox } ;
2023-05-08 10:55:17 +02:00
use libp2p_core ::upgrade ;
2023-05-12 08:19:23 +02:00
use libp2p_core ::upgrade ::{ NegotiationError , ProtocolError } ;
2023-03-13 01:46:58 +11:00
use libp2p_core ::Endpoint ;
use libp2p_identity ::PeerId ;
2023-09-20 04:02:29 +05:30
use std ::cmp ::max ;
2023-05-08 16:36:30 +02:00
use std ::collections ::HashSet ;
2023-08-07 16:31:15 +07:00
use std ::fmt ::{ Display , Formatter } ;
2022-07-18 04:20:11 +01:00
use std ::future ::Future ;
2023-01-24 03:29:41 +11:00
use std ::sync ::atomic ::{ AtomicUsize , Ordering } ;
2022-09-21 23:02:21 +10:00
use std ::task ::Waker ;
use std ::time ::Duration ;
use std ::{ fmt , io , mem , pin ::Pin , task ::Context , task ::Poll } ;
2022-02-13 21:57:38 +01:00
2023-01-24 03:29:41 +11:00
static NEXT_CONNECTION_ID : AtomicUsize = AtomicUsize ::new ( 1 ) ;
2023-01-18 19:56:32 +11:00
/// Connection identifier.
#[ derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord) ]
pub struct ConnectionId ( usize ) ;
impl ConnectionId {
2023-03-21 20:28:46 +01:00
/// Creates an _unchecked_ [`ConnectionId`].
///
/// [`Swarm`](crate::Swarm) enforces that [`ConnectionId`]s are unique and not reused.
/// This constructor does not, hence the _unchecked_.
///
/// It is primarily meant for allowing manual tests of [`NetworkBehaviour`](crate::NetworkBehaviour)s.
pub fn new_unchecked ( id : usize ) -> Self {
Self ( id )
}
2023-01-24 03:29:41 +11:00
/// Returns the next available [`ConnectionId`].
pub ( crate ) fn next ( ) -> Self {
Self ( NEXT_CONNECTION_ID . fetch_add ( 1 , Ordering ::SeqCst ) )
2023-01-18 19:56:32 +11:00
}
}
2023-08-07 16:31:15 +07:00
impl Display for ConnectionId {
fn fmt ( & self , f : & mut Formatter ) -> fmt ::Result {
write! ( f , " {} " , self . 0 )
}
}
2022-02-13 21:57:38 +01:00
/// Information about a successfully established connection.
#[ derive(Debug, Clone, PartialEq, Eq) ]
2023-04-26 09:31:56 +02:00
pub ( crate ) struct Connected {
2022-02-13 21:57:38 +01:00
/// The connected endpoint, including network address information.
2023-04-26 09:31:56 +02:00
pub ( crate ) endpoint : ConnectedPoint ,
2022-02-13 21:57:38 +01:00
/// Information obtained from the transport.
2023-04-26 09:31:56 +02:00
pub ( crate ) peer_id : PeerId ,
2022-02-13 21:57:38 +01:00
}
/// Event generated by a [`Connection`].
#[ derive(Debug, Clone) ]
2023-04-26 09:31:56 +02:00
pub ( crate ) enum Event < T > {
2022-02-21 13:32:24 +01:00
/// Event generated by the [`ConnectionHandler`].
2022-02-13 21:57:38 +01:00
Handler ( T ) ,
/// Address of the remote has changed.
AddressChange ( Multiaddr ) ,
}
2022-02-21 13:32:24 +01:00
/// A multiplexed connection to a peer with an associated [`ConnectionHandler`].
2023-04-26 09:31:56 +02:00
pub ( crate ) struct Connection < THandler >
2022-02-13 21:57:38 +01:00
where
2022-02-21 13:32:24 +01:00
THandler : ConnectionHandler ,
2022-02-13 21:57:38 +01:00
{
/// Node that handles the muxing.
2022-07-18 04:20:11 +01:00
muxing : StreamMuxerBox ,
2022-09-21 23:02:21 +10:00
/// The underlying handler.
handler : THandler ,
/// Futures that upgrade incoming substreams.
negotiating_in : FuturesUnordered <
2023-05-12 08:19:23 +02:00
StreamUpgrade <
2022-09-21 23:02:21 +10:00
THandler ::InboundOpenInfo ,
2023-05-12 08:19:23 +02:00
< THandler ::InboundProtocol as InboundUpgradeSend > ::Output ,
< THandler ::InboundProtocol as InboundUpgradeSend > ::Error ,
2022-09-21 23:02:21 +10:00
> ,
> ,
/// Futures that upgrade outgoing substreams.
negotiating_out : FuturesUnordered <
2023-05-12 08:19:23 +02:00
StreamUpgrade <
2022-09-21 23:02:21 +10:00
THandler ::OutboundOpenInfo ,
2023-05-12 08:19:23 +02:00
< THandler ::OutboundProtocol as OutboundUpgradeSend > ::Output ,
< THandler ::OutboundProtocol as OutboundUpgradeSend > ::Error ,
2022-09-21 23:02:21 +10:00
> ,
> ,
/// The currently planned connection & handler shutdown.
shutdown : Shutdown ,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override : Option < upgrade ::Version > ,
/// The maximum number of inbound streams concurrently negotiating on a
/// connection. New inbound streams exceeding the limit are dropped and thus
/// reset.
///
/// Note: This only enforces a limit on the number of concurrently
/// negotiating inbound streams. The total number of inbound streams on a
/// connection is the sum of negotiating and negotiated streams. A limit on
2023-10-09 11:58:32 +11:00
/// the total number of streams can be enforced at the [`StreamMuxerBox`] level.
2022-09-21 23:02:21 +10:00
max_negotiating_inbound_streams : usize ,
/// Contains all upgrades that are waiting for a new outbound substream.
///
/// The upgrade timeout is already ticking here so this may fail in case the remote is not quick
/// enough in providing us with a new stream.
requested_substreams : FuturesUnordered <
SubstreamRequested < THandler ::OutboundOpenInfo , THandler ::OutboundProtocol > ,
> ,
2023-05-08 16:36:30 +02:00
local_supported_protocols : HashSet < StreamProtocol > ,
remote_supported_protocols : HashSet < StreamProtocol > ,
2023-09-20 04:02:29 +05:30
idle_timeout : Duration ,
2022-02-13 21:57:38 +01:00
}
2022-02-16 13:15:52 +01:00
impl < THandler > fmt ::Debug for Connection < THandler >
2022-02-13 21:57:38 +01:00
where
2022-02-21 13:32:24 +01:00
THandler : ConnectionHandler + fmt ::Debug ,
2022-07-18 04:20:11 +01:00
THandler ::OutboundOpenInfo : fmt ::Debug ,
2022-02-13 21:57:38 +01:00
{
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
f . debug_struct ( " Connection " )
. field ( " handler " , & self . handler )
. finish ( )
}
}
2022-02-21 13:32:24 +01:00
impl < THandler > Unpin for Connection < THandler > where THandler : ConnectionHandler { }
2022-02-13 21:57:38 +01:00
2022-02-16 13:15:52 +01:00
impl < THandler > Connection < THandler >
2022-02-13 21:57:38 +01:00
where
2022-02-21 13:32:24 +01:00
THandler : ConnectionHandler ,
2022-02-13 21:57:38 +01:00
{
/// Builds a new `Connection` from the given substream multiplexer
/// and connection handler.
2023-04-26 09:31:56 +02:00
pub ( crate ) fn new (
2022-02-18 11:32:58 +01:00
muxer : StreamMuxerBox ,
2023-05-08 16:36:30 +02:00
mut handler : THandler ,
2022-02-18 11:32:58 +01:00
substream_upgrade_protocol_override : Option < upgrade ::Version > ,
2022-06-08 11:48:46 +02:00
max_negotiating_inbound_streams : usize ,
2023-09-20 04:02:29 +05:30
idle_timeout : Duration ,
2022-02-18 11:32:58 +01:00
) -> Self {
2023-05-08 16:36:30 +02:00
let initial_protocols = gather_supported_protocols ( & handler ) ;
if ! initial_protocols . is_empty ( ) {
handler . on_connection_event ( ConnectionEvent ::LocalProtocolsChange (
ProtocolsChange ::Added ( ProtocolsAdded ::from_set ( & initial_protocols ) ) ,
) ) ;
}
2022-09-21 23:02:21 +10:00
Connection {
muxing : muxer ,
2022-06-08 11:48:46 +02:00
handler ,
2022-09-21 23:02:21 +10:00
negotiating_in : Default ::default ( ) ,
negotiating_out : Default ::default ( ) ,
shutdown : Shutdown ::None ,
2022-06-08 11:48:46 +02:00
substream_upgrade_protocol_override ,
max_negotiating_inbound_streams ,
2022-09-21 23:02:21 +10:00
requested_substreams : Default ::default ( ) ,
2023-05-08 16:36:30 +02:00
local_supported_protocols : initial_protocols ,
remote_supported_protocols : Default ::default ( ) ,
2023-09-20 04:02:29 +05:30
idle_timeout ,
2022-02-13 21:57:38 +01:00
}
}
/// Notifies the connection handler of an event.
2023-05-14 12:58:08 +02:00
pub ( crate ) fn on_behaviour_event ( & mut self , event : THandler ::FromBehaviour ) {
2023-01-12 11:21:02 +00:00
self . handler . on_behaviour_event ( event ) ;
2022-02-13 21:57:38 +01:00
}
/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
2023-04-26 09:31:56 +02:00
pub ( crate ) fn close ( self ) -> ( THandler , impl Future < Output = io ::Result < ( ) > > ) {
2022-09-21 23:02:21 +10:00
( self . handler , self . muxing . close ( ) )
2022-02-13 21:57:38 +01:00
}
2022-05-03 22:26:28 +02:00
/// Polls the handler and the substream, forwarding events from the former to the latter and
/// vice versa.
2023-04-26 09:31:56 +02:00
pub ( crate ) fn poll (
2022-09-21 23:02:21 +10:00
self : Pin < & mut Self > ,
2022-02-13 21:57:38 +01:00
cx : & mut Context < '_ > ,
2023-05-14 12:58:08 +02:00
) -> Poll < Result < Event < THandler ::ToBehaviour > , ConnectionError < THandler ::Error > > > {
2022-09-21 23:02:21 +10:00
let Self {
requested_substreams ,
muxing ,
handler ,
negotiating_out ,
negotiating_in ,
shutdown ,
max_negotiating_inbound_streams ,
substream_upgrade_protocol_override ,
2023-05-08 16:36:30 +02:00
local_supported_protocols : supported_protocols ,
remote_supported_protocols ,
2023-09-20 04:02:29 +05:30
idle_timeout ,
2022-09-21 23:02:21 +10:00
} = self . get_mut ( ) ;
2022-02-13 21:57:38 +01:00
loop {
2022-09-21 23:02:21 +10:00
match requested_substreams . poll_next_unpin ( cx ) {
Poll ::Ready ( Some ( Ok ( ( ) ) ) ) = > continue ,
2023-01-12 11:21:02 +00:00
Poll ::Ready ( Some ( Err ( info ) ) ) = > {
handler . on_connection_event ( ConnectionEvent ::DialUpgradeError (
DialUpgradeError {
info ,
2023-05-08 10:55:17 +02:00
error : StreamUpgradeError ::Timeout ,
2023-01-12 11:21:02 +00:00
} ,
) ) ;
2022-09-21 23:02:21 +10:00
continue ;
}
Poll ::Ready ( None ) | Poll ::Pending = > { }
}
// Poll the [`ConnectionHandler`].
match handler . poll ( cx ) {
2022-05-03 22:26:28 +02:00
Poll ::Pending = > { }
2022-09-21 23:02:21 +10:00
Poll ::Ready ( ConnectionHandlerEvent ::OutboundSubstreamRequest { protocol } ) = > {
let timeout = * protocol . timeout ( ) ;
let ( upgrade , user_data ) = protocol . into_upgrade ( ) ;
requested_substreams . push ( SubstreamRequested ::new ( user_data , timeout , upgrade ) ) ;
2022-07-18 04:20:11 +01:00
continue ; // Poll handler until exhausted.
2022-05-03 22:26:28 +02:00
}
2023-05-16 21:20:00 +02:00
Poll ::Ready ( ConnectionHandlerEvent ::NotifyBehaviour ( event ) ) = > {
2022-05-03 22:26:28 +02:00
return Poll ::Ready ( Ok ( Event ::Handler ( event ) ) ) ;
}
2022-09-21 23:02:21 +10:00
Poll ::Ready ( ConnectionHandlerEvent ::Close ( err ) ) = > {
return Poll ::Ready ( Err ( ConnectionError ::Handler ( err ) ) ) ;
}
2023-05-08 16:36:30 +02:00
Poll ::Ready ( ConnectionHandlerEvent ::ReportRemoteProtocols (
ProtocolSupport ::Added ( protocols ) ,
) ) = > {
if let Some ( added ) =
ProtocolsChange ::add ( remote_supported_protocols , & protocols )
{
handler . on_connection_event ( ConnectionEvent ::RemoteProtocolsChange ( added ) ) ;
remote_supported_protocols . extend ( protocols ) ;
}
continue ;
}
Poll ::Ready ( ConnectionHandlerEvent ::ReportRemoteProtocols (
ProtocolSupport ::Removed ( protocols ) ,
) ) = > {
if let Some ( removed ) =
ProtocolsChange ::remove ( remote_supported_protocols , & protocols )
{
handler
. on_connection_event ( ConnectionEvent ::RemoteProtocolsChange ( removed ) ) ;
remote_supported_protocols . retain ( | p | ! protocols . contains ( p ) ) ;
}
continue ;
}
2022-05-03 22:26:28 +02:00
}
2022-02-13 21:57:38 +01:00
2022-09-21 23:02:21 +10:00
// In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams.
match negotiating_out . poll_next_unpin ( cx ) {
Poll ::Pending | Poll ::Ready ( None ) = > { }
2023-01-12 11:21:02 +00:00
Poll ::Ready ( Some ( ( info , Ok ( protocol ) ) ) ) = > {
handler . on_connection_event ( ConnectionEvent ::FullyNegotiatedOutbound (
FullyNegotiatedOutbound { protocol , info } ,
) ) ;
2022-09-21 23:02:21 +10:00
continue ;
}
2023-01-12 11:21:02 +00:00
Poll ::Ready ( Some ( ( info , Err ( error ) ) ) ) = > {
handler . on_connection_event ( ConnectionEvent ::DialUpgradeError (
DialUpgradeError { info , error } ,
) ) ;
2022-09-21 23:02:21 +10:00
continue ;
}
}
// In case both the [`ConnectionHandler`] and the negotiating outbound streams can not
// make any more progress, poll the negotiating inbound streams.
match negotiating_in . poll_next_unpin ( cx ) {
Poll ::Pending | Poll ::Ready ( None ) = > { }
2023-01-12 11:21:02 +00:00
Poll ::Ready ( Some ( ( info , Ok ( protocol ) ) ) ) = > {
handler . on_connection_event ( ConnectionEvent ::FullyNegotiatedInbound (
FullyNegotiatedInbound { protocol , info } ,
) ) ;
2022-09-21 23:02:21 +10:00
continue ;
}
2023-05-08 10:55:17 +02:00
Poll ::Ready ( Some ( ( info , Err ( StreamUpgradeError ::Apply ( error ) ) ) ) ) = > {
2023-01-12 11:21:02 +00:00
handler . on_connection_event ( ConnectionEvent ::ListenUpgradeError (
ListenUpgradeError { info , error } ,
) ) ;
2022-09-21 23:02:21 +10:00
continue ;
}
2023-05-08 10:55:17 +02:00
Poll ::Ready ( Some ( ( _ , Err ( StreamUpgradeError ::Io ( e ) ) ) ) ) = > {
2023-05-08 06:54:50 +02:00
log ::debug! ( " failed to upgrade inbound stream: {e} " ) ;
continue ;
}
2023-05-08 10:55:17 +02:00
Poll ::Ready ( Some ( ( _ , Err ( StreamUpgradeError ::NegotiationFailed ) ) ) ) = > {
log ::debug! ( " no protocol could be agreed upon for inbound stream " ) ;
continue ;
}
Poll ::Ready ( Some ( ( _ , Err ( StreamUpgradeError ::Timeout ) ) ) ) = > {
2023-05-08 06:54:50 +02:00
log ::debug! ( " inbound stream upgrade timed out " ) ;
continue ;
}
2022-09-21 23:02:21 +10:00
}
// Ask the handler whether it wants the connection (and the handler itself)
// to be kept alive, which determines the planned shutdown, if any.
let keep_alive = handler . connection_keep_alive ( ) ;
match ( & mut * shutdown , keep_alive ) {
( Shutdown ::Later ( timer , deadline ) , KeepAlive ::Until ( t ) ) = > {
if * deadline ! = t {
* deadline = t ;
2023-09-20 04:02:29 +05:30
if let Some ( new_duration ) = deadline . checked_duration_since ( Instant ::now ( ) )
{
let effective_keep_alive = max ( new_duration , * idle_timeout ) ;
timer . reset ( effective_keep_alive )
2022-09-21 23:02:21 +10:00
}
}
}
2023-09-20 04:02:29 +05:30
( _ , KeepAlive ::Until ( earliest_shutdown ) ) = > {
2023-09-27 17:38:03 +10:00
let now = Instant ::now ( ) ;
if let Some ( requested ) = earliest_shutdown . checked_duration_since ( now ) {
let effective_keep_alive = max ( requested , * idle_timeout ) ;
let safe_keep_alive = checked_add_fraction ( now , effective_keep_alive ) ;
2023-09-20 04:02:29 +05:30
// Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch.
// This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See <https://github.com/libp2p/rust-libp2p/issues/3844>/
2023-09-27 17:38:03 +10:00
* shutdown = Shutdown ::Later ( Delay ::new ( safe_keep_alive ) , earliest_shutdown )
2022-09-21 23:02:21 +10:00
}
}
2023-09-20 04:02:29 +05:30
( _ , KeepAlive ::No ) if idle_timeout = = & Duration ::ZERO = > {
* shutdown = Shutdown ::Asap ;
}
( Shutdown ::Later ( _ , _ ) , KeepAlive ::No ) = > {
// Do nothing, i.e. let the shutdown timer continue to tick.
}
( _ , KeepAlive ::No ) = > {
2023-09-27 17:38:03 +10:00
let now = Instant ::now ( ) ;
let safe_keep_alive = checked_add_fraction ( now , * idle_timeout ) ;
* shutdown = Shutdown ::Later ( Delay ::new ( safe_keep_alive ) , now + safe_keep_alive ) ;
2023-09-20 04:02:29 +05:30
}
2022-09-21 23:02:21 +10:00
( _ , KeepAlive ::Yes ) = > * shutdown = Shutdown ::None ,
} ;
// Check if the connection (and handler) should be shut down.
// As long as we're still negotiating substreams, shutdown is always postponed.
if negotiating_in . is_empty ( )
& & negotiating_out . is_empty ( )
& & requested_substreams . is_empty ( )
{
match shutdown {
Shutdown ::None = > { }
Shutdown ::Asap = > return Poll ::Ready ( Err ( ConnectionError ::KeepAliveTimeout ) ) ,
Shutdown ::Later ( delay , _ ) = > match Future ::poll ( Pin ::new ( delay ) , cx ) {
Poll ::Ready ( _ ) = > {
return Poll ::Ready ( Err ( ConnectionError ::KeepAliveTimeout ) )
}
Poll ::Pending = > { }
} ,
}
}
match muxing . poll_unpin ( cx ) ? {
2022-08-16 04:50:17 +02:00
Poll ::Pending = > { }
Poll ::Ready ( StreamMuxerEvent ::AddressChange ( address ) ) = > {
2023-01-12 11:21:02 +00:00
handler . on_connection_event ( ConnectionEvent ::AddressChange ( AddressChange {
new_address : & address ,
} ) ) ;
2022-08-16 04:50:17 +02:00
return Poll ::Ready ( Ok ( Event ::AddressChange ( address ) ) ) ;
2022-02-13 21:57:38 +01:00
}
}
2022-09-21 23:02:21 +10:00
if let Some ( requested_substream ) = requested_substreams . iter_mut ( ) . next ( ) {
match muxing . poll_outbound_unpin ( cx ) ? {
2022-08-16 04:50:17 +02:00
Poll ::Pending = > { }
Poll ::Ready ( substream ) = > {
2022-09-21 23:02:21 +10:00
let ( user_data , timeout , upgrade ) = requested_substream . extract ( ) ;
2023-05-12 08:19:23 +02:00
negotiating_out . push ( StreamUpgrade ::new_outbound (
2022-09-21 23:02:21 +10:00
substream ,
user_data ,
timeout ,
upgrade ,
* substream_upgrade_protocol_override ,
) ) ;
2022-08-16 04:50:17 +02:00
continue ; // Go back to the top, handler can potentially make progress again.
}
}
2022-07-18 04:20:11 +01:00
}
2022-09-21 23:02:21 +10:00
if negotiating_in . len ( ) < * max_negotiating_inbound_streams {
match muxing . poll_inbound_unpin ( cx ) ? {
Poll ::Pending = > { }
Poll ::Ready ( substream ) = > {
let protocol = handler . listen_protocol ( ) ;
2023-05-12 08:19:23 +02:00
negotiating_in . push ( StreamUpgrade ::new_inbound ( substream , protocol ) ) ;
2022-09-21 23:02:21 +10:00
continue ; // Go back to the top, handler can potentially make progress again.
}
2022-08-16 04:50:17 +02:00
}
2022-07-18 04:20:11 +01:00
}
2023-05-08 16:36:30 +02:00
let new_protocols = gather_supported_protocols ( handler ) ;
2023-05-24 08:25:05 +02:00
let changes = ProtocolsChange ::from_full_sets ( supported_protocols , & new_protocols ) ;
2023-05-08 16:36:30 +02:00
2023-05-24 08:25:05 +02:00
if ! changes . is_empty ( ) {
for change in changes {
handler . on_connection_event ( ConnectionEvent ::LocalProtocolsChange ( change ) ) ;
}
2023-05-08 16:36:30 +02:00
2023-05-24 08:25:05 +02:00
* supported_protocols = new_protocols ;
continue ; // Go back to the top, handler can potentially make progress again.
}
2023-05-08 16:36:30 +02:00
2022-07-18 04:20:11 +01:00
return Poll ::Pending ; // Nothing can make progress, return `Pending`.
2022-02-13 21:57:38 +01:00
}
}
2023-05-08 16:36:30 +02:00
#[ cfg(test) ]
fn poll_noop_waker (
& mut self ,
2023-05-14 12:58:08 +02:00
) -> Poll < Result < Event < THandler ::ToBehaviour > , ConnectionError < THandler ::Error > > > {
2023-05-08 16:36:30 +02:00
Pin ::new ( self ) . poll ( & mut Context ::from_waker ( futures ::task ::noop_waker_ref ( ) ) )
}
}
fn gather_supported_protocols ( handler : & impl ConnectionHandler ) -> HashSet < StreamProtocol > {
handler
. listen_protocol ( )
. upgrade ( )
. protocol_info ( )
. filter_map ( | i | StreamProtocol ::try_from_owned ( i . as_ref ( ) . to_owned ( ) ) . ok ( ) )
. collect ( )
2022-02-13 21:57:38 +01:00
}
2023-09-27 17:38:03 +10:00
/// Repeatedly halves and adds the [`Duration`] to the [`Instant`] until [`Instant::checked_add`] succeeds.
///
/// [`Instant`] depends on the underlying platform and has a limit of which points in time it can represent.
/// The [`Duration`] computed by the this function may not be the longest possible that we can add to `now` but it will work.
fn checked_add_fraction ( start : Instant , mut duration : Duration ) -> Duration {
while start . checked_add ( duration ) . is_none ( ) {
log ::debug! ( " {start:?} + {duration:?} cannot be presented, halving duration " ) ;
duration / = 2 ;
}
duration
}
2022-02-13 21:57:38 +01:00
/// Borrowed information about an incoming connection currently being negotiated.
#[ derive(Debug, Copy, Clone) ]
2023-04-26 09:31:56 +02:00
pub ( crate ) struct IncomingInfo < ' a > {
2022-02-13 21:57:38 +01:00
/// Local connection address.
2023-04-26 09:31:56 +02:00
pub ( crate ) local_addr : & ' a Multiaddr ,
2022-02-13 21:57:38 +01:00
/// Address used to send back data to the remote.
2023-04-26 09:31:56 +02:00
pub ( crate ) send_back_addr : & ' a Multiaddr ,
2022-02-13 21:57:38 +01:00
}
impl < ' a > IncomingInfo < ' a > {
/// Builds the [`ConnectedPoint`] corresponding to the incoming connection.
2023-04-26 09:31:56 +02:00
pub ( crate ) fn create_connected_point ( & self ) -> ConnectedPoint {
2022-02-13 21:57:38 +01:00
ConnectedPoint ::Listener {
local_addr : self . local_addr . clone ( ) ,
send_back_addr : self . send_back_addr . clone ( ) ,
}
}
}
2023-05-12 08:19:23 +02:00
struct StreamUpgrade < UserData , TOk , TErr > {
2022-09-21 23:02:21 +10:00
user_data : Option < UserData > ,
timeout : Delay ,
2023-05-12 08:19:23 +02:00
upgrade : BoxFuture < 'static , Result < TOk , StreamUpgradeError < TErr > > > ,
2022-09-21 23:02:21 +10:00
}
2023-05-12 08:19:23 +02:00
impl < UserData , TOk , TErr > StreamUpgrade < UserData , TOk , TErr > {
fn new_outbound < Upgrade > (
2022-09-21 23:02:21 +10:00
substream : SubstreamBox ,
user_data : UserData ,
timeout : Delay ,
upgrade : Upgrade ,
version_override : Option < upgrade ::Version > ,
2023-05-12 08:19:23 +02:00
) -> Self
where
Upgrade : OutboundUpgradeSend < Output = TOk , Error = TErr > ,
{
2022-09-21 23:02:21 +10:00
let effective_version = match version_override {
Some ( version_override ) if version_override ! = upgrade ::Version ::default ( ) = > {
log ::debug! (
" Substream upgrade protocol override: {:?} -> {:?} " ,
upgrade ::Version ::default ( ) ,
version_override
) ;
version_override
}
_ = > upgrade ::Version ::default ( ) ,
} ;
2023-05-12 08:19:23 +02:00
let protocols = upgrade . protocol_info ( ) ;
2022-09-21 23:02:21 +10:00
Self {
user_data : Some ( user_data ) ,
timeout ,
2023-05-12 08:19:23 +02:00
upgrade : Box ::pin ( async move {
let ( info , stream ) = multistream_select ::dialer_select_proto (
substream ,
protocols ,
effective_version ,
)
. await
. map_err ( to_stream_upgrade_error ) ? ;
let output = upgrade
. upgrade_outbound ( Stream ::new ( stream ) , info )
. await
. map_err ( StreamUpgradeError ::Apply ) ? ;
Ok ( output )
} ) ,
2022-09-21 23:02:21 +10:00
}
}
}
2023-05-12 08:19:23 +02:00
impl < UserData , TOk , TErr > StreamUpgrade < UserData , TOk , TErr > {
fn new_inbound < Upgrade > (
2022-09-21 23:02:21 +10:00
substream : SubstreamBox ,
protocol : SubstreamProtocol < Upgrade , UserData > ,
2023-05-12 08:19:23 +02:00
) -> Self
where
Upgrade : InboundUpgradeSend < Output = TOk , Error = TErr > ,
{
2022-09-21 23:02:21 +10:00
let timeout = * protocol . timeout ( ) ;
let ( upgrade , open_info ) = protocol . into_upgrade ( ) ;
2023-05-12 08:19:23 +02:00
let protocols = upgrade . protocol_info ( ) ;
2022-09-21 23:02:21 +10:00
Self {
user_data : Some ( open_info ) ,
timeout : Delay ::new ( timeout ) ,
2023-05-12 08:19:23 +02:00
upgrade : Box ::pin ( async move {
let ( info , stream ) =
multistream_select ::listener_select_proto ( substream , protocols )
. await
. map_err ( to_stream_upgrade_error ) ? ;
let output = upgrade
. upgrade_inbound ( Stream ::new ( stream ) , info )
. await
. map_err ( StreamUpgradeError ::Apply ) ? ;
Ok ( output )
} ) ,
2022-09-21 23:02:21 +10:00
}
}
}
2023-05-12 08:19:23 +02:00
fn to_stream_upgrade_error < T > ( e : NegotiationError ) -> StreamUpgradeError < T > {
match e {
NegotiationError ::Failed = > StreamUpgradeError ::NegotiationFailed ,
NegotiationError ::ProtocolError ( ProtocolError ::IoError ( e ) ) = > StreamUpgradeError ::Io ( e ) ,
NegotiationError ::ProtocolError ( other ) = > {
StreamUpgradeError ::Io ( io ::Error ::new ( io ::ErrorKind ::Other , other ) )
}
}
}
2022-09-21 23:02:21 +10:00
2023-05-12 08:19:23 +02:00
impl < UserData , TOk , TErr > Unpin for StreamUpgrade < UserData , TOk , TErr > { }
impl < UserData , TOk , TErr > Future for StreamUpgrade < UserData , TOk , TErr > {
type Output = ( UserData , Result < TOk , StreamUpgradeError < TErr > > ) ;
2022-09-21 23:02:21 +10:00
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self ::Output > {
match self . timeout . poll_unpin ( cx ) {
Poll ::Ready ( ( ) ) = > {
return Poll ::Ready ( (
self . user_data
. take ( )
. expect ( " Future not to be polled again once ready. " ) ,
2023-05-08 10:55:17 +02:00
Err ( StreamUpgradeError ::Timeout ) ,
2022-09-21 23:02:21 +10:00
) )
}
Poll ::Pending = > { }
}
2023-05-08 10:55:17 +02:00
let result = futures ::ready! ( self . upgrade . poll_unpin ( cx ) ) ;
let user_data = self
. user_data
. take ( )
. expect ( " Future not to be polled again once ready. " ) ;
2023-05-12 08:19:23 +02:00
Poll ::Ready ( ( user_data , result ) )
2022-09-21 23:02:21 +10:00
}
}
enum SubstreamRequested < UserData , Upgrade > {
Waiting {
user_data : UserData ,
timeout : Delay ,
upgrade : Upgrade ,
/// A waker to notify our [`FuturesUnordered`] that we have extracted the data.
///
/// This will ensure that we will get polled again in the next iteration which allows us to
/// resolve with `Ok(())` and be removed from the [`FuturesUnordered`].
extracted_waker : Option < Waker > ,
} ,
Done ,
}
impl < UserData , Upgrade > SubstreamRequested < UserData , Upgrade > {
fn new ( user_data : UserData , timeout : Duration , upgrade : Upgrade ) -> Self {
Self ::Waiting {
user_data ,
timeout : Delay ::new ( timeout ) ,
upgrade ,
extracted_waker : None ,
}
}
fn extract ( & mut self ) -> ( UserData , Delay , Upgrade ) {
match mem ::replace ( self , Self ::Done ) {
SubstreamRequested ::Waiting {
user_data ,
timeout ,
upgrade ,
extracted_waker : waker ,
} = > {
if let Some ( waker ) = waker {
waker . wake ( ) ;
}
( user_data , timeout , upgrade )
}
SubstreamRequested ::Done = > panic! ( " cannot extract twice " ) ,
}
}
}
impl < UserData , Upgrade > Unpin for SubstreamRequested < UserData , Upgrade > { }
impl < UserData , Upgrade > Future for SubstreamRequested < UserData , Upgrade > {
type Output = Result < ( ) , UserData > ;
fn poll ( self : Pin < & mut Self > , cx : & mut Context < '_ > ) -> Poll < Self ::Output > {
let this = self . get_mut ( ) ;
match mem ::replace ( this , Self ::Done ) {
SubstreamRequested ::Waiting {
user_data ,
upgrade ,
mut timeout ,
..
} = > match timeout . poll_unpin ( cx ) {
Poll ::Ready ( ( ) ) = > Poll ::Ready ( Err ( user_data ) ) ,
Poll ::Pending = > {
* this = Self ::Waiting {
user_data ,
upgrade ,
timeout ,
extracted_waker : Some ( cx . waker ( ) . clone ( ) ) ,
} ;
Poll ::Pending
}
} ,
SubstreamRequested ::Done = > Poll ::Ready ( Ok ( ( ) ) ) ,
}
}
}
/// The options for a planned connection & handler shutdown.
///
/// A shutdown is planned anew based on the the return value of
/// [`ConnectionHandler::connection_keep_alive`] of the underlying handler
/// after every invocation of [`ConnectionHandler::poll`].
///
/// A planned shutdown is always postponed for as long as there are ingoing
/// or outgoing substreams being negotiated, i.e. it is a graceful, "idle"
/// shutdown.
#[ derive(Debug) ]
enum Shutdown {
/// No shutdown is planned.
None ,
/// A shut down is planned as soon as possible.
Asap ,
/// A shut down is planned for when a `Delay` has elapsed.
Later ( Delay , Instant ) ,
}
#[ cfg(test) ]
mod tests {
use super ::* ;
2023-09-20 04:02:29 +05:30
use crate ::dummy ;
2023-05-08 16:36:30 +02:00
use futures ::future ;
2022-09-21 23:02:21 +10:00
use futures ::AsyncRead ;
use futures ::AsyncWrite ;
2023-05-08 16:36:30 +02:00
use libp2p_core ::upgrade ::{ DeniedUpgrade , InboundUpgrade , OutboundUpgrade , UpgradeInfo } ;
2022-09-21 23:02:21 +10:00
use libp2p_core ::StreamMuxer ;
use quickcheck ::* ;
use std ::sync ::{ Arc , Weak } ;
2023-09-20 04:02:29 +05:30
use std ::time ::Instant ;
2022-09-21 23:02:21 +10:00
use void ::Void ;
#[ test ]
fn max_negotiating_inbound_streams ( ) {
2023-10-10 08:55:14 +02:00
let _ = env_logger ::try_init ( ) ;
2022-09-21 23:02:21 +10:00
fn prop ( max_negotiating_inbound_streams : u8 ) {
let max_negotiating_inbound_streams : usize = max_negotiating_inbound_streams . into ( ) ;
let alive_substream_counter = Arc ::new ( ( ) ) ;
let mut connection = Connection ::new (
StreamMuxerBox ::new ( DummyStreamMuxer {
counter : alive_substream_counter . clone ( ) ,
} ) ,
2023-10-10 08:55:14 +02:00
MockConnectionHandler ::new ( Duration ::from_secs ( 10 ) ) ,
2022-09-21 23:02:21 +10:00
None ,
max_negotiating_inbound_streams ,
2023-09-20 04:02:29 +05:30
Duration ::ZERO ,
2022-09-21 23:02:21 +10:00
) ;
2023-05-08 16:36:30 +02:00
let result = connection . poll_noop_waker ( ) ;
2022-09-21 23:02:21 +10:00
assert! ( result . is_pending ( ) ) ;
assert_eq! (
Arc ::weak_count ( & alive_substream_counter ) ,
max_negotiating_inbound_streams ,
" Expect no more than the maximum number of allowed streams "
) ;
}
QuickCheck ::new ( ) . quickcheck ( prop as fn ( _ ) ) ;
}
#[ test ]
fn outbound_stream_timeout_starts_on_request ( ) {
let upgrade_timeout = Duration ::from_secs ( 1 ) ;
let mut connection = Connection ::new (
StreamMuxerBox ::new ( PendingStreamMuxer ) ,
2022-10-04 18:24:38 +11:00
MockConnectionHandler ::new ( upgrade_timeout ) ,
2022-09-21 23:02:21 +10:00
None ,
2 ,
2023-09-20 04:02:29 +05:30
Duration ::ZERO ,
2022-09-21 23:02:21 +10:00
) ;
connection . handler . open_new_outbound ( ) ;
2023-05-08 16:36:30 +02:00
let _ = connection . poll_noop_waker ( ) ;
2022-09-21 23:02:21 +10:00
std ::thread ::sleep ( upgrade_timeout + Duration ::from_secs ( 1 ) ) ;
2023-05-08 16:36:30 +02:00
let _ = connection . poll_noop_waker ( ) ;
2022-09-21 23:02:21 +10:00
assert! ( matches! (
connection . handler . error . unwrap ( ) ,
2023-05-08 10:55:17 +02:00
StreamUpgradeError ::Timeout
2022-09-21 23:02:21 +10:00
) )
}
2023-05-08 16:36:30 +02:00
#[ test ]
fn propagates_changes_to_supported_inbound_protocols ( ) {
let mut connection = Connection ::new (
StreamMuxerBox ::new ( PendingStreamMuxer ) ,
ConfigurableProtocolConnectionHandler ::default ( ) ,
None ,
0 ,
2023-09-20 04:02:29 +05:30
Duration ::ZERO ,
2023-05-08 16:36:30 +02:00
) ;
// First, start listening on a single protocol.
connection . handler . listen_on ( & [ " /foo " ] ) ;
let _ = connection . poll_noop_waker ( ) ;
assert_eq! ( connection . handler . local_added , vec! [ vec! [ " /foo " ] ] ) ;
assert! ( connection . handler . local_removed . is_empty ( ) ) ;
// Second, listen on two protocols.
connection . handler . listen_on ( & [ " /foo " , " /bar " ] ) ;
let _ = connection . poll_noop_waker ( ) ;
assert_eq! (
connection . handler . local_added ,
vec! [ vec! [ " /foo " ] , vec! [ " /bar " ] ] ,
" expect to only receive an event for the newly added protocols "
) ;
assert! ( connection . handler . local_removed . is_empty ( ) ) ;
// Third, stop listening on the first protocol.
connection . handler . listen_on ( & [ " /bar " ] ) ;
let _ = connection . poll_noop_waker ( ) ;
assert_eq! (
connection . handler . local_added ,
vec! [ vec! [ " /foo " ] , vec! [ " /bar " ] ]
) ;
assert_eq! ( connection . handler . local_removed , vec! [ vec! [ " /foo " ] ] ) ;
}
#[ test ]
fn only_propagtes_actual_changes_to_remote_protocols_to_handler ( ) {
let mut connection = Connection ::new (
StreamMuxerBox ::new ( PendingStreamMuxer ) ,
ConfigurableProtocolConnectionHandler ::default ( ) ,
None ,
0 ,
2023-09-20 04:02:29 +05:30
Duration ::ZERO ,
2023-05-08 16:36:30 +02:00
) ;
// First, remote supports a single protocol.
connection . handler . remote_adds_support_for ( & [ " /foo " ] ) ;
let _ = connection . poll_noop_waker ( ) ;
assert_eq! ( connection . handler . remote_added , vec! [ vec! [ " /foo " ] ] ) ;
assert! ( connection . handler . remote_removed . is_empty ( ) ) ;
// Second, it adds a protocol but also still includes the first one.
connection
. handler
. remote_adds_support_for ( & [ " /foo " , " /bar " ] ) ;
let _ = connection . poll_noop_waker ( ) ;
assert_eq! (
connection . handler . remote_added ,
vec! [ vec! [ " /foo " ] , vec! [ " /bar " ] ] ,
" expect to only receive an event for the newly added protocol "
) ;
assert! ( connection . handler . remote_removed . is_empty ( ) ) ;
// Third, stop listening on a protocol it never advertised (we can't control what handlers do so this needs to be handled gracefully).
connection . handler . remote_removes_support_for ( & [ " /baz " ] ) ;
let _ = connection . poll_noop_waker ( ) ;
assert_eq! (
connection . handler . remote_added ,
vec! [ vec! [ " /foo " ] , vec! [ " /bar " ] ]
) ;
assert! ( & connection . handler . remote_removed . is_empty ( ) ) ;
// Fourth, stop listening on a protocol that was previously supported
connection . handler . remote_removes_support_for ( & [ " /bar " ] ) ;
let _ = connection . poll_noop_waker ( ) ;
assert_eq! (
connection . handler . remote_added ,
vec! [ vec! [ " /foo " ] , vec! [ " /bar " ] ]
) ;
assert_eq! ( connection . handler . remote_removed , vec! [ vec! [ " /bar " ] ] ) ;
}
2023-09-20 04:02:29 +05:30
#[ tokio::test ]
async fn idle_timeout_with_keep_alive_no ( ) {
let idle_timeout = Duration ::from_millis ( 100 ) ;
let mut connection = Connection ::new (
StreamMuxerBox ::new ( PendingStreamMuxer ) ,
dummy ::ConnectionHandler ,
None ,
0 ,
idle_timeout ,
) ;
assert! ( connection . poll_noop_waker ( ) . is_pending ( ) ) ;
tokio ::time ::sleep ( idle_timeout ) . await ;
assert! ( matches! (
connection . poll_noop_waker ( ) ,
Poll ::Ready ( Err ( ConnectionError ::KeepAliveTimeout ) )
) ) ;
}
#[ tokio::test ]
async fn idle_timeout_with_keep_alive_until_greater_than_idle_timeout ( ) {
let idle_timeout = Duration ::from_millis ( 100 ) ;
let mut connection = Connection ::new (
StreamMuxerBox ::new ( PendingStreamMuxer ) ,
KeepAliveUntilConnectionHandler {
until : Instant ::now ( ) + idle_timeout * 2 ,
} ,
None ,
0 ,
idle_timeout ,
) ;
assert! ( connection . poll_noop_waker ( ) . is_pending ( ) ) ;
tokio ::time ::sleep ( idle_timeout ) . await ;
assert! (
connection . poll_noop_waker ( ) . is_pending ( ) ,
" `KeepAlive::Until` is greater than idle-timeout, continue sleeping "
) ;
tokio ::time ::sleep ( idle_timeout ) . await ;
assert! ( matches! (
connection . poll_noop_waker ( ) ,
Poll ::Ready ( Err ( ConnectionError ::KeepAliveTimeout ) )
) ) ;
}
#[ tokio::test ]
async fn idle_timeout_with_keep_alive_until_less_than_idle_timeout ( ) {
let idle_timeout = Duration ::from_millis ( 100 ) ;
let mut connection = Connection ::new (
StreamMuxerBox ::new ( PendingStreamMuxer ) ,
KeepAliveUntilConnectionHandler {
until : Instant ::now ( ) + idle_timeout / 2 ,
} ,
None ,
0 ,
idle_timeout ,
) ;
assert! ( connection . poll_noop_waker ( ) . is_pending ( ) ) ;
tokio ::time ::sleep ( idle_timeout / 2 ) . await ;
assert! (
connection . poll_noop_waker ( ) . is_pending ( ) ,
" `KeepAlive::Until` is less than idle-timeout, honor idle-timeout "
) ;
tokio ::time ::sleep ( idle_timeout / 2 ) . await ;
assert! ( matches! (
connection . poll_noop_waker ( ) ,
Poll ::Ready ( Err ( ConnectionError ::KeepAliveTimeout ) )
) ) ;
}
2023-09-27 17:38:03 +10:00
#[ test ]
fn checked_add_fraction_can_add_u64_max ( ) {
let _ = env_logger ::try_init ( ) ;
let start = Instant ::now ( ) ;
let duration = checked_add_fraction ( start , Duration ::from_secs ( u64 ::MAX ) ) ;
assert! ( start . checked_add ( duration ) . is_some ( ) )
}
2023-09-20 04:02:29 +05:30
struct KeepAliveUntilConnectionHandler {
until : Instant ,
}
impl ConnectionHandler for KeepAliveUntilConnectionHandler {
type FromBehaviour = Void ;
type ToBehaviour = Void ;
type Error = Void ;
type InboundProtocol = DeniedUpgrade ;
type OutboundProtocol = DeniedUpgrade ;
type InboundOpenInfo = ( ) ;
type OutboundOpenInfo = Void ;
fn listen_protocol (
& self ,
) -> SubstreamProtocol < Self ::InboundProtocol , Self ::InboundOpenInfo > {
SubstreamProtocol ::new ( DeniedUpgrade , ( ) )
}
fn connection_keep_alive ( & self ) -> KeepAlive {
KeepAlive ::Until ( self . until )
}
fn poll (
& mut self ,
_ : & mut Context < '_ > ,
) -> Poll <
ConnectionHandlerEvent <
Self ::OutboundProtocol ,
Self ::OutboundOpenInfo ,
Self ::ToBehaviour ,
Self ::Error ,
> ,
> {
Poll ::Pending
}
fn on_behaviour_event ( & mut self , _ : Self ::FromBehaviour ) { }
fn on_connection_event (
& mut self ,
_ : ConnectionEvent <
Self ::InboundProtocol ,
Self ::OutboundProtocol ,
Self ::InboundOpenInfo ,
Self ::OutboundOpenInfo ,
> ,
) {
}
}
2022-09-21 23:02:21 +10:00
struct DummyStreamMuxer {
counter : Arc < ( ) > ,
}
impl StreamMuxer for DummyStreamMuxer {
type Substream = PendingSubstream ;
type Error = Void ;
fn poll_inbound (
self : Pin < & mut Self > ,
_ : & mut Context < '_ > ,
) -> Poll < Result < Self ::Substream , Self ::Error > > {
Poll ::Ready ( Ok ( PendingSubstream ( Arc ::downgrade ( & self . counter ) ) ) )
}
fn poll_outbound (
self : Pin < & mut Self > ,
_ : & mut Context < '_ > ,
) -> Poll < Result < Self ::Substream , Self ::Error > > {
Poll ::Pending
}
fn poll_close ( self : Pin < & mut Self > , _ : & mut Context < '_ > ) -> Poll < Result < ( ) , Self ::Error > > {
Poll ::Ready ( Ok ( ( ) ) )
}
fn poll (
self : Pin < & mut Self > ,
_ : & mut Context < '_ > ,
) -> Poll < Result < StreamMuxerEvent , Self ::Error > > {
Poll ::Pending
}
}
/// A [`StreamMuxer`] which never returns a stream.
struct PendingStreamMuxer ;
impl StreamMuxer for PendingStreamMuxer {
type Substream = PendingSubstream ;
type Error = Void ;
fn poll_inbound (
self : Pin < & mut Self > ,
_ : & mut Context < '_ > ,
) -> Poll < Result < Self ::Substream , Self ::Error > > {
Poll ::Pending
}
fn poll_outbound (
self : Pin < & mut Self > ,
_ : & mut Context < '_ > ,
) -> Poll < Result < Self ::Substream , Self ::Error > > {
Poll ::Pending
}
fn poll_close ( self : Pin < & mut Self > , _ : & mut Context < '_ > ) -> Poll < Result < ( ) , Self ::Error > > {
Poll ::Pending
}
fn poll (
self : Pin < & mut Self > ,
_ : & mut Context < '_ > ,
) -> Poll < Result < StreamMuxerEvent , Self ::Error > > {
Poll ::Pending
}
}
struct PendingSubstream ( Weak < ( ) > ) ;
impl AsyncRead for PendingSubstream {
fn poll_read (
self : Pin < & mut Self > ,
_cx : & mut Context < '_ > ,
_buf : & mut [ u8 ] ,
) -> Poll < std ::io ::Result < usize > > {
Poll ::Pending
}
}
impl AsyncWrite for PendingSubstream {
fn poll_write (
self : Pin < & mut Self > ,
_cx : & mut Context < '_ > ,
_buf : & [ u8 ] ,
) -> Poll < std ::io ::Result < usize > > {
Poll ::Pending
}
fn poll_flush ( self : Pin < & mut Self > , _cx : & mut Context < '_ > ) -> Poll < std ::io ::Result < ( ) > > {
Poll ::Pending
}
fn poll_close ( self : Pin < & mut Self > , _cx : & mut Context < '_ > ) -> Poll < std ::io ::Result < ( ) > > {
Poll ::Pending
}
}
struct MockConnectionHandler {
outbound_requested : bool ,
2023-05-08 10:55:17 +02:00
error : Option < StreamUpgradeError < Void > > ,
2022-09-21 23:02:21 +10:00
upgrade_timeout : Duration ,
}
impl MockConnectionHandler {
fn new ( upgrade_timeout : Duration ) -> Self {
Self {
outbound_requested : false ,
error : None ,
upgrade_timeout ,
}
}
fn open_new_outbound ( & mut self ) {
self . outbound_requested = true ;
}
}
2023-05-08 16:36:30 +02:00
#[ derive(Default) ]
struct ConfigurableProtocolConnectionHandler {
events : Vec < ConnectionHandlerEvent < DeniedUpgrade , ( ) , Void , Void > > ,
active_protocols : HashSet < StreamProtocol > ,
local_added : Vec < Vec < StreamProtocol > > ,
local_removed : Vec < Vec < StreamProtocol > > ,
remote_added : Vec < Vec < StreamProtocol > > ,
remote_removed : Vec < Vec < StreamProtocol > > ,
}
impl ConfigurableProtocolConnectionHandler {
fn listen_on ( & mut self , protocols : & [ & 'static str ] ) {
self . active_protocols = protocols . iter ( ) . copied ( ) . map ( StreamProtocol ::new ) . collect ( ) ;
}
fn remote_adds_support_for ( & mut self , protocols : & [ & 'static str ] ) {
self . events
. push ( ConnectionHandlerEvent ::ReportRemoteProtocols (
ProtocolSupport ::Added (
protocols . iter ( ) . copied ( ) . map ( StreamProtocol ::new ) . collect ( ) ,
) ,
) ) ;
}
fn remote_removes_support_for ( & mut self , protocols : & [ & 'static str ] ) {
self . events
. push ( ConnectionHandlerEvent ::ReportRemoteProtocols (
ProtocolSupport ::Removed (
protocols . iter ( ) . copied ( ) . map ( StreamProtocol ::new ) . collect ( ) ,
) ,
) ) ;
}
}
2022-09-21 23:02:21 +10:00
impl ConnectionHandler for MockConnectionHandler {
2023-05-14 12:58:08 +02:00
type FromBehaviour = Void ;
type ToBehaviour = Void ;
2022-09-21 23:02:21 +10:00
type Error = Void ;
type InboundProtocol = DeniedUpgrade ;
type OutboundProtocol = DeniedUpgrade ;
type InboundOpenInfo = ( ) ;
type OutboundOpenInfo = ( ) ;
fn listen_protocol (
& self ,
) -> SubstreamProtocol < Self ::InboundProtocol , Self ::InboundOpenInfo > {
SubstreamProtocol ::new ( DeniedUpgrade , ( ) ) . with_timeout ( self . upgrade_timeout )
}
2023-01-12 11:21:02 +00:00
fn on_connection_event (
2022-09-21 23:02:21 +10:00
& mut self ,
2023-01-12 11:21:02 +00:00
event : ConnectionEvent <
Self ::InboundProtocol ,
Self ::OutboundProtocol ,
Self ::InboundOpenInfo ,
Self ::OutboundOpenInfo ,
> ,
2022-09-21 23:02:21 +10:00
) {
2023-01-12 11:21:02 +00:00
match event {
ConnectionEvent ::FullyNegotiatedInbound ( FullyNegotiatedInbound {
protocol ,
..
} ) = > void ::unreachable ( protocol ) ,
ConnectionEvent ::FullyNegotiatedOutbound ( FullyNegotiatedOutbound {
protocol ,
..
} ) = > void ::unreachable ( protocol ) ,
ConnectionEvent ::DialUpgradeError ( DialUpgradeError { error , .. } ) = > {
self . error = Some ( error )
}
2023-05-08 16:36:30 +02:00
ConnectionEvent ::AddressChange ( _ )
| ConnectionEvent ::ListenUpgradeError ( _ )
| ConnectionEvent ::LocalProtocolsChange ( _ )
| ConnectionEvent ::RemoteProtocolsChange ( _ ) = > { }
2023-01-12 11:21:02 +00:00
}
2022-09-21 23:02:21 +10:00
}
2023-05-14 12:58:08 +02:00
fn on_behaviour_event ( & mut self , event : Self ::FromBehaviour ) {
2022-09-21 23:02:21 +10:00
void ::unreachable ( event )
}
fn connection_keep_alive ( & self ) -> KeepAlive {
KeepAlive ::Yes
}
fn poll (
& mut self ,
_ : & mut Context < '_ > ,
) -> Poll <
ConnectionHandlerEvent <
Self ::OutboundProtocol ,
Self ::OutboundOpenInfo ,
2023-05-14 12:58:08 +02:00
Self ::ToBehaviour ,
2022-09-21 23:02:21 +10:00
Self ::Error ,
> ,
> {
if self . outbound_requested {
self . outbound_requested = false ;
return Poll ::Ready ( ConnectionHandlerEvent ::OutboundSubstreamRequest {
protocol : SubstreamProtocol ::new ( DeniedUpgrade , ( ) )
. with_timeout ( self . upgrade_timeout ) ,
} ) ;
}
Poll ::Pending
}
}
2023-05-08 16:36:30 +02:00
impl ConnectionHandler for ConfigurableProtocolConnectionHandler {
2023-05-14 12:58:08 +02:00
type FromBehaviour = Void ;
type ToBehaviour = Void ;
2023-05-08 16:36:30 +02:00
type Error = Void ;
type InboundProtocol = ManyProtocolsUpgrade ;
type OutboundProtocol = DeniedUpgrade ;
type InboundOpenInfo = ( ) ;
type OutboundOpenInfo = ( ) ;
fn listen_protocol (
& self ,
) -> SubstreamProtocol < Self ::InboundProtocol , Self ::InboundOpenInfo > {
SubstreamProtocol ::new (
ManyProtocolsUpgrade {
protocols : Vec ::from_iter ( self . active_protocols . clone ( ) ) ,
} ,
( ) ,
)
}
fn on_connection_event (
& mut self ,
event : ConnectionEvent <
Self ::InboundProtocol ,
Self ::OutboundProtocol ,
Self ::InboundOpenInfo ,
Self ::OutboundOpenInfo ,
> ,
) {
match event {
ConnectionEvent ::LocalProtocolsChange ( ProtocolsChange ::Added ( added ) ) = > {
self . local_added . push ( added . cloned ( ) . collect ( ) )
}
ConnectionEvent ::LocalProtocolsChange ( ProtocolsChange ::Removed ( removed ) ) = > {
self . local_removed . push ( removed . cloned ( ) . collect ( ) )
}
ConnectionEvent ::RemoteProtocolsChange ( ProtocolsChange ::Added ( added ) ) = > {
self . remote_added . push ( added . cloned ( ) . collect ( ) )
}
ConnectionEvent ::RemoteProtocolsChange ( ProtocolsChange ::Removed ( removed ) ) = > {
self . remote_removed . push ( removed . cloned ( ) . collect ( ) )
}
_ = > { }
}
}
2023-05-14 12:58:08 +02:00
fn on_behaviour_event ( & mut self , event : Self ::FromBehaviour ) {
2023-05-08 16:36:30 +02:00
void ::unreachable ( event )
}
fn connection_keep_alive ( & self ) -> KeepAlive {
KeepAlive ::Yes
}
fn poll (
& mut self ,
_ : & mut Context < '_ > ,
) -> Poll <
ConnectionHandlerEvent <
Self ::OutboundProtocol ,
Self ::OutboundOpenInfo ,
2023-05-14 12:58:08 +02:00
Self ::ToBehaviour ,
2023-05-08 16:36:30 +02:00
Self ::Error ,
> ,
> {
if let Some ( event ) = self . events . pop ( ) {
return Poll ::Ready ( event ) ;
}
Poll ::Pending
}
}
struct ManyProtocolsUpgrade {
protocols : Vec < StreamProtocol > ,
}
impl UpgradeInfo for ManyProtocolsUpgrade {
type Info = StreamProtocol ;
type InfoIter = std ::vec ::IntoIter < Self ::Info > ;
fn protocol_info ( & self ) -> Self ::InfoIter {
self . protocols . clone ( ) . into_iter ( )
}
}
impl < C > InboundUpgrade < C > for ManyProtocolsUpgrade {
type Output = C ;
type Error = Void ;
type Future = future ::Ready < Result < Self ::Output , Self ::Error > > ;
fn upgrade_inbound ( self , stream : C , _ : Self ::Info ) -> Self ::Future {
future ::ready ( Ok ( stream ) )
}
}
impl < C > OutboundUpgrade < C > for ManyProtocolsUpgrade {
type Output = C ;
type Error = Void ;
type Future = future ::Ready < Result < Self ::Output , Self ::Error > > ;
fn upgrade_outbound ( self , stream : C , _ : Self ::Info ) -> Self ::Future {
future ::ready ( Ok ( stream ) )
}
}
2022-09-21 23:02:21 +10:00
}
2023-01-18 19:56:32 +11:00
/// The endpoint roles associated with a pending peer-to-peer connection.
#[ derive(Debug, Clone, PartialEq, Eq, Hash) ]
enum PendingPoint {
/// The socket comes from a dialer.
///
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer {
/// Same as [`ConnectedPoint::Dialer`] `role_override`.
role_override : Endpoint ,
} ,
/// The socket comes from a listener.
Listener {
/// Local connection address.
local_addr : Multiaddr ,
/// Address used to send back data to the remote.
send_back_addr : Multiaddr ,
} ,
}
impl From < ConnectedPoint > for PendingPoint {
fn from ( endpoint : ConnectedPoint ) -> Self {
match endpoint {
ConnectedPoint ::Dialer { role_override , .. } = > PendingPoint ::Dialer { role_override } ,
ConnectedPoint ::Listener {
local_addr ,
send_back_addr ,
} = > PendingPoint ::Listener {
local_addr ,
send_back_addr ,
} ,
}
}
}