mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-13 01:51:23 +00:00
fix(kad): Don't wait for behaviour after receiving AddProvider
(#3152)
Previously, we would erroneously always go into the `WaitingUser` (now called `WaitingBehaviour`) state after receiving a message on an inbound stream. However, the `AddProvider` message does not warrant a "response" from the behaviour. Thus, any incoming `AddProvider` message would result in a stale substream that would eventually be dropped as soon as more than 32 inbound streams have been opened. With this patch, we inline the message handling into the upper match block and perform the correct state transition upon each message. For `AddProvider`, we go back into `WaitingMessage` because the spec mandates that we need to be ready to receive more messages on an inbound stream. Fixes #3048.
This commit is contained in:
@ -4,17 +4,22 @@
|
||||
|
||||
- Update to `libp2p-swarm` `v0.41.0`.
|
||||
|
||||
- Replace `Kademlia`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods.
|
||||
- Replace `Kademlia`'s `NetworkBehaviour` implementation `inject_*` methods with the new `on_*` methods.
|
||||
See [PR 3011].
|
||||
|
||||
- Replace `KademliaHandler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods.
|
||||
- Replace `KademliaHandler`'s `ConnectionHandler` implementation `inject_*` methods with the new `on_*` methods.
|
||||
See [PR 3085].
|
||||
|
||||
- Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090].
|
||||
|
||||
- Fix bad state transition on incoming `AddProvider` messages.
|
||||
This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused."
|
||||
See [PR 3152].
|
||||
|
||||
[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
|
||||
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
|
||||
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090
|
||||
[PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152
|
||||
|
||||
# 0.41.0
|
||||
|
||||
|
@ -170,8 +170,8 @@ enum InboundSubstreamState<TUserData> {
|
||||
connection_id: UniqueConnecId,
|
||||
substream: KadInStreamSink<NegotiatedSubstream>,
|
||||
},
|
||||
/// Waiting for the user to send a [`KademliaHandlerIn`] event containing the response.
|
||||
WaitingUser(
|
||||
/// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response.
|
||||
WaitingBehaviour(
|
||||
UniqueConnecId,
|
||||
KadInStreamSink<NegotiatedSubstream>,
|
||||
Option<Waker>,
|
||||
@ -206,7 +206,7 @@ impl<TUserData> InboundSubstreamState<TUserData> {
|
||||
phantom: PhantomData,
|
||||
},
|
||||
) {
|
||||
InboundSubstreamState::WaitingUser(conn_id, substream, mut waker)
|
||||
InboundSubstreamState::WaitingBehaviour(conn_id, substream, mut waker)
|
||||
if conn_id == id.connec_unique_id =>
|
||||
{
|
||||
*self = InboundSubstreamState::PendingSend(conn_id, substream, msg);
|
||||
@ -233,7 +233,7 @@ impl<TUserData> InboundSubstreamState<TUserData> {
|
||||
},
|
||||
) {
|
||||
InboundSubstreamState::WaitingMessage { substream, .. }
|
||||
| InboundSubstreamState::WaitingUser(_, substream, _)
|
||||
| InboundSubstreamState::WaitingBehaviour(_, substream, _)
|
||||
| InboundSubstreamState::PendingSend(_, substream, _)
|
||||
| InboundSubstreamState::PendingFlush(_, substream)
|
||||
| InboundSubstreamState::Closing(substream) => {
|
||||
@ -655,7 +655,7 @@ where
|
||||
.inbound_substreams
|
||||
.iter_mut()
|
||||
.find(|state| match state {
|
||||
InboundSubstreamState::WaitingUser(conn_id, _, _) => {
|
||||
InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => {
|
||||
conn_id == &request_id.connec_unique_id
|
||||
}
|
||||
_ => false,
|
||||
@ -1004,14 +1004,68 @@ where
|
||||
connection_id,
|
||||
mut substream,
|
||||
} => match substream.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(msg))) => {
|
||||
if let Ok(ev) = process_kad_request(msg, connection_id) {
|
||||
*this =
|
||||
InboundSubstreamState::WaitingUser(connection_id, substream, None);
|
||||
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(ev)));
|
||||
} else {
|
||||
*this = InboundSubstreamState::Closing(substream);
|
||||
}
|
||||
Poll::Ready(Some(Ok(KadRequestMsg::Ping))) => {
|
||||
log::warn!("Kademlia PING messages are unsupported");
|
||||
|
||||
*this = InboundSubstreamState::Closing(substream);
|
||||
}
|
||||
Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => {
|
||||
*this =
|
||||
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
|
||||
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
|
||||
KademliaHandlerEvent::FindNodeReq {
|
||||
key,
|
||||
request_id: KademliaRequestId {
|
||||
connec_unique_id: connection_id,
|
||||
},
|
||||
},
|
||||
)));
|
||||
}
|
||||
Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => {
|
||||
*this =
|
||||
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
|
||||
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
|
||||
KademliaHandlerEvent::GetProvidersReq {
|
||||
key,
|
||||
request_id: KademliaRequestId {
|
||||
connec_unique_id: connection_id,
|
||||
},
|
||||
},
|
||||
)));
|
||||
}
|
||||
Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))) => {
|
||||
*this = InboundSubstreamState::WaitingMessage {
|
||||
first: false,
|
||||
connection_id,
|
||||
substream,
|
||||
};
|
||||
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
|
||||
KademliaHandlerEvent::AddProvider { key, provider },
|
||||
)));
|
||||
}
|
||||
Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => {
|
||||
*this =
|
||||
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
|
||||
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
|
||||
KademliaHandlerEvent::GetRecord {
|
||||
key,
|
||||
request_id: KademliaRequestId {
|
||||
connec_unique_id: connection_id,
|
||||
},
|
||||
},
|
||||
)));
|
||||
}
|
||||
Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => {
|
||||
*this =
|
||||
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
|
||||
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
|
||||
KademliaHandlerEvent::PutRecord {
|
||||
record,
|
||||
request_id: KademliaRequestId {
|
||||
connec_unique_id: connection_id,
|
||||
},
|
||||
},
|
||||
)));
|
||||
}
|
||||
Poll::Pending => {
|
||||
*this = InboundSubstreamState::WaitingMessage {
|
||||
@ -1029,9 +1083,12 @@ where
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
},
|
||||
InboundSubstreamState::WaitingUser(id, substream, _) => {
|
||||
*this =
|
||||
InboundSubstreamState::WaitingUser(id, substream, Some(cx.waker().clone()));
|
||||
InboundSubstreamState::WaitingBehaviour(id, substream, _) => {
|
||||
*this = InboundSubstreamState::WaitingBehaviour(
|
||||
id,
|
||||
substream,
|
||||
Some(cx.waker().clone()),
|
||||
);
|
||||
|
||||
return Poll::Pending;
|
||||
}
|
||||
@ -1080,42 +1137,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes a Kademlia message that's expected to be a request from a remote.
|
||||
fn process_kad_request<TUserData>(
|
||||
event: KadRequestMsg,
|
||||
connec_unique_id: UniqueConnecId,
|
||||
) -> Result<KademliaHandlerEvent<TUserData>, io::Error> {
|
||||
match event {
|
||||
KadRequestMsg::Ping => {
|
||||
// TODO: implement; although in practice the PING message is never
|
||||
// used, so we may consider removing it altogether
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"the PING Kademlia message is not implemented",
|
||||
))
|
||||
}
|
||||
KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq {
|
||||
key,
|
||||
request_id: KademliaRequestId { connec_unique_id },
|
||||
}),
|
||||
KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq {
|
||||
key,
|
||||
request_id: KademliaRequestId { connec_unique_id },
|
||||
}),
|
||||
KadRequestMsg::AddProvider { key, provider } => {
|
||||
Ok(KademliaHandlerEvent::AddProvider { key, provider })
|
||||
}
|
||||
KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetRecord {
|
||||
key,
|
||||
request_id: KademliaRequestId { connec_unique_id },
|
||||
}),
|
||||
KadRequestMsg::PutValue { record } => Ok(KademliaHandlerEvent::PutRecord {
|
||||
record,
|
||||
request_id: KademliaRequestId { connec_unique_id },
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Process a Kademlia message that's supposed to be a response to one of our requests.
|
||||
fn process_kad_response<TUserData>(
|
||||
event: KadResponseMsg,
|
||||
|
Reference in New Issue
Block a user