diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 98e055f6..e4e29259 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -4,17 +4,22 @@ - Update to `libp2p-swarm` `v0.41.0`. -- Replace `Kademlia`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. +- Replace `Kademlia`'s `NetworkBehaviour` implementation `inject_*` methods with the new `on_*` methods. See [PR 3011]. -- Replace `KademliaHandler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods. +- Replace `KademliaHandler`'s `ConnectionHandler` implementation `inject_*` methods with the new `on_*` methods. See [PR 3085]. - Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090]. +- Fix bad state transition on incoming `AddProvider` messages. + This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused." + See [PR 3152]. + [PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 [PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 +[PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152 # 0.41.0 diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index a4380470..345762d4 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -170,8 +170,8 @@ enum InboundSubstreamState { connection_id: UniqueConnecId, substream: KadInStreamSink, }, - /// Waiting for the user to send a [`KademliaHandlerIn`] event containing the response. - WaitingUser( + /// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response. + WaitingBehaviour( UniqueConnecId, KadInStreamSink, Option, @@ -206,7 +206,7 @@ impl InboundSubstreamState { phantom: PhantomData, }, ) { - InboundSubstreamState::WaitingUser(conn_id, substream, mut waker) + InboundSubstreamState::WaitingBehaviour(conn_id, substream, mut waker) if conn_id == id.connec_unique_id => { *self = InboundSubstreamState::PendingSend(conn_id, substream, msg); @@ -233,7 +233,7 @@ impl InboundSubstreamState { }, ) { InboundSubstreamState::WaitingMessage { substream, .. } - | InboundSubstreamState::WaitingUser(_, substream, _) + | InboundSubstreamState::WaitingBehaviour(_, substream, _) | InboundSubstreamState::PendingSend(_, substream, _) | InboundSubstreamState::PendingFlush(_, substream) | InboundSubstreamState::Closing(substream) => { @@ -655,7 +655,7 @@ where .inbound_substreams .iter_mut() .find(|state| match state { - InboundSubstreamState::WaitingUser(conn_id, _, _) => { + InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => { conn_id == &request_id.connec_unique_id } _ => false, @@ -1004,14 +1004,68 @@ where connection_id, mut substream, } => match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(msg))) => { - if let Ok(ev) = process_kad_request(msg, connection_id) { - *this = - InboundSubstreamState::WaitingUser(connection_id, substream, None); - return Poll::Ready(Some(ConnectionHandlerEvent::Custom(ev))); - } else { - *this = InboundSubstreamState::Closing(substream); - } + Poll::Ready(Some(Ok(KadRequestMsg::Ping))) => { + log::warn!("Kademlia PING messages are unsupported"); + + *this = InboundSubstreamState::Closing(substream); + } + Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::FindNodeReq { + key, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::GetProvidersReq { + key, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))) => { + *this = InboundSubstreamState::WaitingMessage { + first: false, + connection_id, + substream, + }; + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::AddProvider { key, provider }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::GetRecord { + key, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); + } + Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => { + *this = + InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + KademliaHandlerEvent::PutRecord { + record, + request_id: KademliaRequestId { + connec_unique_id: connection_id, + }, + }, + ))); } Poll::Pending => { *this = InboundSubstreamState::WaitingMessage { @@ -1029,9 +1083,12 @@ where return Poll::Ready(None); } }, - InboundSubstreamState::WaitingUser(id, substream, _) => { - *this = - InboundSubstreamState::WaitingUser(id, substream, Some(cx.waker().clone())); + InboundSubstreamState::WaitingBehaviour(id, substream, _) => { + *this = InboundSubstreamState::WaitingBehaviour( + id, + substream, + Some(cx.waker().clone()), + ); return Poll::Pending; } @@ -1080,42 +1137,6 @@ where } } -/// Processes a Kademlia message that's expected to be a request from a remote. -fn process_kad_request( - event: KadRequestMsg, - connec_unique_id: UniqueConnecId, -) -> Result, io::Error> { - match event { - KadRequestMsg::Ping => { - // TODO: implement; although in practice the PING message is never - // used, so we may consider removing it altogether - Err(io::Error::new( - io::ErrorKind::InvalidData, - "the PING Kademlia message is not implemented", - )) - } - KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq { - key, - request_id: KademliaRequestId { connec_unique_id }, - }), - KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq { - key, - request_id: KademliaRequestId { connec_unique_id }, - }), - KadRequestMsg::AddProvider { key, provider } => { - Ok(KademliaHandlerEvent::AddProvider { key, provider }) - } - KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetRecord { - key, - request_id: KademliaRequestId { connec_unique_id }, - }), - KadRequestMsg::PutValue { record } => Ok(KademliaHandlerEvent::PutRecord { - record, - request_id: KademliaRequestId { connec_unique_id }, - }), - } -} - /// Process a Kademlia message that's supposed to be a response to one of our requests. fn process_kad_response( event: KadResponseMsg,