Skip to content

Commit

Permalink
Revert "[LI-HOTFIX] Return valid data during throttling (#514)" (#515)
Browse files Browse the repository at this point in the history
This reverts commit d4f6f91.

For the "consumer get stuck during throttling" issue, we realized there is a safer solution than the above commit. The alternative solution is to increase the maxThrottleTime by increase the quota window size, which should effectively reduce the overall throughput. We will go with that path first. If the issue is still not mitigated, we can fallback to this solution and re-apply this patch.
  • Loading branch information
CCisGG authored Jun 11, 2024
1 parent d4f6f91 commit 05c9ff8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,20 @@ public boolean handleResponse(FetchResponse response) {
return false;
}
if (nextMetadata.isFull()) {
if (response.throttleTimeMs() > 0) {
// [LIKAFKA-59133] To avoid stuck consumer, we made a server side change to return valid fetch responses
// even when the request is throttled. To honor the server side change, we log the throttling and still
// handle the fetch response.
if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) {
// Normally, an empty full fetch response would be invalid. However, KIP-219
// specifies that if the broker wants to throttle the client, it will respond
// to a full fetch request with an empty response and a throttleTimeMs
// value set. We don't want to log this with a warning, since it's not an error.
// However, the empty full fetch response can't be processed, so it's still appropriate
// to return false here.
if (log.isDebugEnabled()) {
log.debug("Node {} sent a response indicate the request is throttled for {} ms.", node,
response.throttleTimeMs());
log.debug("Node {} sent a empty full fetch response to indicate that this " +
"client should be throttled for {} ms.", node, response.throttleTimeMs());
}
nextMetadata = FetchMetadata.INITIAL;
return false;
}

String problem = verifyFullFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid full fetch response with {}", node, problem);
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -990,23 +990,24 @@ class KafkaApis(val requestChannel: RequestChannel,
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)

val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)

// [LIKAFKA-59133] We made a change here to actually fill in the data to the fetch response even when throttling happens.
// This prevents the consumers completely getting stuck when throttling happens intensively.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)

// [LIKAFKA-45345] even if the throttleTimeMs is 0, we still record it so that
// the throttle-time sensor does not expire before the byte-rate sensor in quotas.fetch
// or the request-time sensor in quotas.request.
val (effectiveBandwidthThrottleTime, effectiveRequestThrottleTime) = if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs

// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
// If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
(bandwidthThrottleTimeMs, 0)
} else {
(0, requestThrottleTimeMs)
}
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}")
(0, 0)
}
Expand Down

0 comments on commit 05c9ff8

Please sign in to comment.