From 443a34f46442374c0274669fffd130b1f686d21a Mon Sep 17 00:00:00 2001 From: "Joseph (Ting-Chou) Lin" Date: Thu, 20 Apr 2023 15:30:27 -0700 Subject: [PATCH] [LI-HOTFIX] Fix race condition that blocks alterIsr request queue (#451) TICKET = LIKAFKA-52126 LIKAFKA-52185 EXIT_CRITERIA = Until rebasing to a release that includes apache/kafka#13159 The patch is included as a part of apache/kafka#13159. There's a race condition that can cause an unchecked exception thrown ``` 2023/04/11 00:45:31.342 ERROR [NetworkClient] [BrokerToControllerChannelManager broker=5678 name=alterIsr] [kafka-server] [] [BrokerToControllerChannelManager broker=5678 name=alterIsr] Uncaught error in request completion: java.lang.IllegalStateException: No entry found for connection 5674 ``` in this loop ```scala activeControllerAddress().foreach { controllerAddress => { networkClient.disconnect(controllerAddress.idString) }} ``` When this happens, the following line is not executed, ```scala requestQueue.putFirst(queueItem) ``` and the `queueItem` is lost forever. This cause the corresponding [`clearInFlightRequest()`](https://github.com/linkedin/kafka/blob/bb63ee6a4d375d7dd2cef7109acf21562640e17a/core/src/main/scala/kafka/server/BrokerToControllerRequestManager.scala#L135) never executed, and the **whole alterIsrRequest queue is blocked forever**. This basically ruins the foundation of Kafka's monitoring and operations, because almost everything relies on ISR expansion, and fail to expand ISR basically prevents us from doing any kind of surgery. --- .../kafka/server/BrokerToControllerChannelManager.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 05f50b720b1f..1ee74f686b4f 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -342,7 +342,12 @@ class BrokerToControllerRequestThread( } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { // just close the controller connection and wait for metadata cache update in doWork activeControllerAddress().foreach { controllerAddress => { - networkClient.disconnect(controllerAddress.idString) + try { + // We don't care if disconnect has an error, just log it and get a new network client + networkClient.disconnect(controllerAddress.idString) + } catch { + case t: Throwable => error("Had an error while disconnecting from NetworkClient.", t) + } }} requestQueue.putFirst(queueItem)