From 2a5a5180e9d1144843264e63ce370f77c04a2ff5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 24 Nov 2022 04:46:43 +1100 Subject: [PATCH] fix(kad): Don't wait for behaviour after receiving `AddProvider` (#3152) Previously, we would erroneously always go into the `WaitingUser` (now called `WaitingBehaviour`) state after receiving a message on an inbound stream. However, the `AddProvider` message does not warrant a "response" from the behaviour. Thus, any incoming `AddProvider` message would result in a stale substream that would eventually be dropped as soon as more than 32 inbound streams have been opened. With this patch, we inline the message handling into the upper match block and perform the correct state transition upon each message. For `AddProvider`, we go back into `WaitingMessage` because the spec mandates that we need to be ready to receive more messages on an inbound stream. Fixes #3048. --- protocols/kad/CHANGELOG.md | 9 ++- protocols/kad/src/handler.rs | 125 ++++++++++++++++++++--------------- 2 files changed, 80 insertions(+), 54 deletions(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 98e055f6..e4e29259 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -4,17 +4,22 @@ - Update to `libp2p-swarm` `v0.41.0`. -- Replace `Kademlia`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. +- Replace `Kademlia`'s `NetworkBehaviour` implementation `inject_*` methods with the new `on_*` methods. See [PR 3011]. -- Replace `KademliaHandler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods. +- Replace `KademliaHandler`'s `ConnectionHandler` implementation `inject_*` methods with the new `on_*` methods. See [PR 3085]. - Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090]. +- Fix bad state transition on incoming `AddProvider` messages. + This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused." + See [PR 3152]. + [PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 [PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 +[PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152 # 0.41.0 diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index a4380470..345762d4 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -170,8 +170,8 @@ enum InboundSubstreamState { connection_id: UniqueConnecId, substream: KadInStreamSink, }, - /// Waiting for the user to send a [`KademliaHandlerIn`] event containing the response. - WaitingUser( + /// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response. + WaitingBehaviour( UniqueConnecId, KadInStreamSink, Option, @@ -206,7 +206,7 @@ impl InboundSubstreamState { phantom: PhantomData, }, ) { - InboundSubstreamState::WaitingUser(conn_id, substream, mut waker) + InboundSubstreamState::WaitingBehaviour(conn_id, substream, mut waker) if conn_id == id.connec_unique_id => { *self = InboundSubstreamState::PendingSend(conn_id, substream, msg); @@ -233,7 +233,7 @@ impl InboundSubstreamState { }, ) { InboundSubstreamState::WaitingMessage { substream, .. } - | InboundSubstreamState::WaitingUser(_, substream, _) + | InboundSubstreamState::WaitingBehaviour(_, substream, _) | InboundSubstreamState::PendingSend(_, substream, _) | InboundSubstreamState::PendingFlush(_, substream) | InboundSubstreamState::Closing(substream) => { @@ -655,7 +655,7 @@ where .inbound_substreams .iter_mut() .find(|state| match state { - InboundSubstreamState::WaitingUser(conn_id, _, _) => { + InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => { conn_id == &request_id.connec_unique_id } _ => false, @@ -1004,14 +1004,68 @@ where connection_id, mut substream, } => match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(msg))) => { - if let Ok(ev) = process_kad_request(msg, connection_id) { - *this = - InboundSubstreamState::WaitingUser(connection_id, substream, None); - return Poll::Ready(Some(ConnectionHandlerEvent::Custom(ev))); - } else { - *this = InboundSubstreamState::Closing(substream); - } + Poll::Ready(Some(Ok(KadRequestMsg::Ping))) => { + log::warn!("Kademlia PING messages are unsupported"); + + *this = InboundSubstreamState::Closing(substream); + } + Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::FindNodeReq { + key, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::GetProvidersReq { + key, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))) => { + *this = InboundSubstreamState::WaitingMessage { + first: false, + connection_id, + substream, + }; + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::AddProvider { key, provider }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::GetRecord { + key, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::PutRecord { + record, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); } Poll::Pending => { *this = InboundSubstreamState::WaitingMessage { @@ -1029,9 +1083,12 @@ where return Poll::Ready(None); } }, - InboundSubstreamState::WaitingUser(id, substream, _) => { - *this = - InboundSubstreamState::WaitingUser(id, substream, Some(cx.waker().clone())); + InboundSubstreamState::WaitingBehaviour(id, substream, _) => { + *this = InboundSubstreamState::WaitingBehaviour( + id, + substream, + Some(cx.waker().clone()), + ); return Poll::Pending; } @@ -1080,42 +1137,6 @@ where } } -/// Processes a Kademlia message that's expected to be a request from a remote. -fn process_kad_request( - event: KadRequestMsg, - connec_unique_id: UniqueConnecId, -) -> Result, io::Error> { - match event { - KadRequestMsg::Ping => { - // TODO: implement; although in practice the PING message is never - // used, so we may consider removing it altogether - Err(io::Error::new( - io::ErrorKind::InvalidData, - "the PING Kademlia message is not implemented", - )) - } - KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq { - key, - request_id: KademliaRequestId { connec_unique_id }, - }), - KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq { - key, - request_id: KademliaRequestId { connec_unique_id }, - }), - KadRequestMsg::AddProvider { key, provider } => { - Ok(KademliaHandlerEvent::AddProvider { key, provider }) - } - KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetRecord { - key, - request_id: KademliaRequestId { connec_unique_id }, - }), - KadRequestMsg::PutValue { record } => Ok(KademliaHandlerEvent::PutRecord { - record, - request_id: KademliaRequestId { connec_unique_id }, - }), - } -} - /// Process a Kademlia message that's supposed to be a response to one of our requests. fn process_kad_response( event: KadResponseMsg,