Skip to content

Commit

Permalink
[LI-HOTFIX] Fix race condition that blocks alterIsr request queue (#451)
Browse files Browse the repository at this point in the history
TICKET = LIKAFKA-52126 LIKAFKA-52185
EXIT_CRITERIA = Until rebasing to a release that includes apache#13159

The patch is included as a part of apache#13159.

There's a race condition that can cause an unchecked exception thrown
```
2023/04/11 00:45:31.342 ERROR [NetworkClient] [BrokerToControllerChannelManager broker=5678 name=alterIsr] [kafka-server] [] [BrokerToControllerChannelManager broker=5678 name=alterIsr] Uncaught error in request completion:
java.lang.IllegalStateException: No entry found for connection 5674
```

in this loop
```scala
activeControllerAddress().foreach { controllerAddress => {
  networkClient.disconnect(controllerAddress.idString)
}}
```

When this happens, the following line is not executed,
```scala
requestQueue.putFirst(queueItem)
```
 and the `queueItem` is lost forever.

This cause the corresponding [`clearInFlightRequest()`](https://github.com/linkedin/kafka/blob/bb63ee6a4d375d7dd2cef7109acf21562640e17a/core/src/main/scala/kafka/server/BrokerToControllerRequestManager.scala#L135) never executed, and the **whole alterIsrRequest queue is blocked forever**.

This basically ruins the foundation of Kafka's monitoring and operations, because almost everything relies on ISR expansion, and fail to expand ISR basically prevents us from doing any kind of surgery.
  • Loading branch information
lmr3796 authored Apr 20, 2023
1 parent bb63ee6 commit 443a34f
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,12 @@ class BrokerToControllerRequestThread(
} else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
// just close the controller connection and wait for metadata cache update in doWork
activeControllerAddress().foreach { controllerAddress => {
networkClient.disconnect(controllerAddress.idString)
try {
// We don't care if disconnect has an error, just log it and get a new network client
networkClient.disconnect(controllerAddress.idString)
} catch {
case t: Throwable => error("Had an error while disconnecting from NetworkClient.", t)
}
}}

requestQueue.putFirst(queueItem)
Expand Down

0 comments on commit 443a34f

Please sign in to comment.