From 05c9ff8917e2661ca0ff8653a8c51c2f719373cf Mon Sep 17 00:00:00 2001 From: Hao Geng Date: Tue, 11 Jun 2024 13:25:37 -0700 Subject: [PATCH] Revert "[LI-HOTFIX] Return valid data during throttling (#514)" (#515) This reverts commit d4f6f914f4bb711d7af502ed934ca31fe9688a68. For the "consumer get stuck during throttling" issue, we realized there is a safer solution than the above commit. The alternative solution is to increase the maxThrottleTime by increase the quota window size, which should effectively reduce the overall throughput. We will go with that path first. If the issue is still not mitigated, we can fallback to this solution and re-apply this patch. --- .../kafka/clients/FetchSessionHandler.java | 18 +++++++++++------- .../main/scala/kafka/server/KafkaApis.scala | 13 +++++++------ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 61b3555f732a..b516e01923c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -410,16 +410,20 @@ public boolean handleResponse(FetchResponse response) { return false; } if (nextMetadata.isFull()) { - if (response.throttleTimeMs() > 0) { - // [LIKAFKA-59133] To avoid stuck consumer, we made a server side change to return valid fetch responses - // even when the request is throttled. To honor the server side change, we log the throttling and still - // handle the fetch response. + if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) { + // Normally, an empty full fetch response would be invalid. However, KIP-219 + // specifies that if the broker wants to throttle the client, it will respond + // to a full fetch request with an empty response and a throttleTimeMs + // value set. We don't want to log this with a warning, since it's not an error. + // However, the empty full fetch response can't be processed, so it's still appropriate + // to return false here. if (log.isDebugEnabled()) { - log.debug("Node {} sent a response indicate the request is throttled for {} ms.", node, - response.throttleTimeMs()); + log.debug("Node {} sent a empty full fetch response to indicate that this " + + "client should be throttled for {} ms.", node, response.throttleTimeMs()); } + nextMetadata = FetchMetadata.INITIAL; + return false; } - String problem = verifyFullFetchResponsePartitions(response); if (problem != null) { log.info("Node {} sent an invalid full fetch response with {}", node, problem); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9a71e7884f5d..9fb4a74cd416 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -990,23 +990,24 @@ class KafkaApis(val requestChannel: RequestChannel, val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs) val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs) - - // [LIKAFKA-59133] We made a change here to actually fill in the data to the fetch response even when throttling happens. - // This prevents the consumers completely getting stuck when throttling happens intensively. - unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) - // [LIKAFKA-45345] even if the throttleTimeMs is 0, we still record it so that // the throttle-time sensor does not expire before the byte-rate sensor in quotas.fetch // or the request-time sensor in quotas.request. val (effectiveBandwidthThrottleTime, effectiveRequestThrottleTime) = if (maxThrottleTimeMs > 0) { request.apiThrottleTimeMs = maxThrottleTimeMs - + // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value + // from the fetch quota because we are going to return an empty response. + quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs) + // If throttling is required, return an empty response. + unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) if (bandwidthThrottleTimeMs > requestThrottleTimeMs) { (bandwidthThrottleTimeMs, 0) } else { (0, requestThrottleTimeMs) } } else { + // Get the actual response. This will update the fetch context. + unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}") (0, 0) }