From 99ad3b6eaf10dc2dcb931333ad7788fa07fea74b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:37:06 +0200 Subject: [PATCH] refactor(kad): don't use `OutboundOpenInfo` As part of pushing #3268 forward, remove the use of `OutboundOpenInfo` from `libp2p-kad`. Related #3268. Pull-Request: #3760. --- protocols/kad/src/handler_priv.rs | 76 +++++++++++++------------------ 1 file changed, 32 insertions(+), 44 deletions(-) diff --git a/protocols/kad/src/handler_priv.rs b/protocols/kad/src/handler_priv.rs index ed2d0521..b99eb295 100644 --- a/protocols/kad/src/handler_priv.rs +++ b/protocols/kad/src/handler_priv.rs @@ -67,8 +67,7 @@ pub struct KademliaHandler { /// List of outbound substreams that are waiting to become active next. /// Contains the request we want to send, and the user data if we expect an answer. - requested_streams: - VecDeque)>>, + pending_messages: VecDeque<(KadRequestMsg, Option)>, /// List of active inbound substreams with the state they are in. inbound_substreams: SelectAll>, @@ -499,7 +498,7 @@ where inbound_substreams: Default::default(), outbound_substreams: Default::default(), num_requested_outbound_streams: 0, - requested_streams: Default::default(), + pending_messages: Default::default(), keep_alive, protocol_status: ProtocolStatus::Unconfirmed, } @@ -507,19 +506,22 @@ where fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { - protocol, - info: (msg, user_data), - }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { protocol, info: () }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, ) { - self.outbound_substreams - .push(OutboundSubstreamState::PendingSend( - protocol, msg, user_data, - )); + if let Some((msg, user_data)) = self.pending_messages.pop_front() { + self.outbound_substreams + .push(OutboundSubstreamState::PendingSend( + protocol, msg, user_data, + )); + } else { + debug_assert!(false, "Requested outbound stream without message") + } + self.num_requested_outbound_streams -= 1; + if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want @@ -587,9 +589,7 @@ where fn on_dial_upgrade_error( &mut self, DialUpgradeError { - info: (_, user_data), - error, - .. + info: (), error, .. }: DialUpgradeError< ::OutboundOpenInfo, ::OutboundProtocol, @@ -597,10 +597,12 @@ where ) { // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying - if let Some(user_data) = user_data { + + if let Some((_, Some(user_data))) = self.pending_messages.pop_front() { self.outbound_substreams .push(OutboundSubstreamState::ReportError(error.into(), user_data)); } + self.num_requested_outbound_streams -= 1; } } @@ -614,8 +616,7 @@ where type Error = io::Error; // TODO: better error type? type InboundProtocol = Either; type OutboundProtocol = KademliaProtocolConfig; - // Message of the request to send to the remote, and user data if we expect an answer. - type OutboundOpenInfo = (KadRequestMsg, Option); + type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { @@ -645,10 +646,7 @@ where } KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::FindNodeRes { closer_peers, @@ -656,10 +654,7 @@ where } => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }), KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::GetProvidersRes { closer_peers, @@ -674,24 +669,15 @@ where ), KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, None), - )); + self.pending_messages.push_back((msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; - self.requested_streams.push_back(SubstreamProtocol::new( - self.config.protocol_config.clone(), - (msg, Some(user_data)), - )); + self.pending_messages.push_back((msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, @@ -750,11 +736,13 @@ where let num_in_progress_outbound_substreams = self.outbound_substreams.len() + self.num_requested_outbound_streams; - if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS { - if let Some(protocol) = self.requested_streams.pop_front() { - self.num_requested_outbound_streams += 1; - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }); - } + if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS + && self.num_requested_outbound_streams < self.pending_messages.len() + { + self.num_requested_outbound_streams += 1; + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(self.config.protocol_config.clone(), ()), + }); } let no_streams = self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty(); @@ -828,7 +816,7 @@ where { type Item = ConnectionHandlerEvent< KademliaProtocolConfig, - (KadRequestMsg, Option), + (), KademliaHandlerEvent, io::Error, >; @@ -964,7 +952,7 @@ where { type Item = ConnectionHandlerEvent< KademliaProtocolConfig, - (KadRequestMsg, Option), + (), KademliaHandlerEvent, io::Error, >;