Skip to content

Commit

Permalink
[LI-HOTFIX] [Delayed Election PR - Part 2] Create delayed elections i…
Browse files Browse the repository at this point in the history
…n Kafka controller when corrupted brokers start up (#428)

TICKET = LIKAFKA-47837
EXIT_CRITERIA = This PR has strong dependency on Zookeeper, and the logic to register a new corrupted broker with the controller will need to change once we migrate to KRaft mode.
LI_DESCRIPTION =
As part of the power outage resiliency work, we want to make sure that when there are multiple corrupted brokers coming up, and there is an offline partition belonging on those brokers, the controller elects the broker with the highest offset.

To achieve this, we must allow Kafka Controller to call ListOffsets on any broker, so that it can compare the results and elect the broker with the highest offset to be the leader. As of today Kafka Controller can only make requests that are of type ControlRequest. We add the capability to call ListOffsets here.

When a corrupted broker is detected, the controller begins delayed election tasks with a specified timeout for all partitions on it that are offline. It then follows up by sending a ListOffsets request to the broker. When the responses come in from the ListOffsets, they are stored in-memory. The controller waits until either the timeout for the election task has expired, or when the controller has received offsets from all brokers for a partition, whichever happens earlier. At this time, the controller chooses the broker with the highest broker epoch (and if there are multiple brokers with the same epoch, the one with the highest offset) as the leader for that partition.

Added integration test for delayed election scenario. Also tested this on an actual cluster using Kafka server.
  • Loading branch information
wyuka authored Apr 3, 2023
1 parent 91a3559 commit c73b0d9
Show file tree
Hide file tree
Showing 12 changed files with 1,038 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ public class ListOffsetsRequest extends AbstractRequest {

public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;
public static final int CONTROLLER_REPLICA_ID = -100;

private final ListOffsetsRequestData data;
private final Set<TopicPartition> duplicatePartitions;

public static class Builder extends AbstractRequest.Builder<ListOffsetsRequest> {
private final ListOffsetsRequestData data;

public static Builder forController(short allowedVersion) {
return new Builder((short) 0, allowedVersion,
ListOffsetsRequest.CONTROLLER_REPLICA_ID, IsolationLevel.READ_UNCOMMITTED);
}

public static Builder forReplica(short allowedVersion, int replicaId) {
return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
}
Expand Down
95 changes: 72 additions & 23 deletions core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package kafka.controller

import java.net.SocketTimeoutException
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}

import java.util.concurrent.{BlockingDeque, LinkedBlockingDeque, TimeUnit}
import com.yammer.metrics.core.{Gauge, Timer}
import kafka.api._
import kafka.cluster.Broker
Expand All @@ -41,11 +40,18 @@ import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicParti

import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
import scala.collection.{Seq, Map, Set, mutable}
import scala.collection.{Map, Seq, Set, mutable}

object ControllerChannelManager {
val QueueSizeMetricName = "QueueSize"
val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs"
val SupportedRequestApiKeys: Set[ApiKeys] = Set(
ApiKeys.STOP_REPLICA,
ApiKeys.LEADER_AND_ISR,
ApiKeys.UPDATE_METADATA,
ApiKeys.LI_COMBINED_CONTROL,
ApiKeys.LIST_OFFSETS
)
}

class ControllerChannelManager(controllerContext: ControllerContext,
Expand Down Expand Up @@ -84,7 +90,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
}

def initBrokerResponseSensors(): Unit = {
Array(ApiKeys.STOP_REPLICA, ApiKeys.LEADER_AND_ISR, ApiKeys.UPDATE_METADATA, ApiKeys.LI_COMBINED_CONTROL).foreach { k: ApiKeys =>
SupportedRequestApiKeys.foreach { k: ApiKeys =>
brokerResponseSensors.put(k, new BrokerResponseTimeStats(k))
}
}
Expand All @@ -96,7 +102,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
}
}

def sendRequest(brokerId: Int, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
def sendRequest(brokerId: Int, request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: AbstractResponse => Unit = null): Unit = {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
Expand Down Expand Up @@ -126,7 +132,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
}

private def addNewBroker(broker: Broker): Unit = {
val messageQueue = new LinkedBlockingQueue[QueueItem]
val messageQueue = new LinkedBlockingDeque[QueueItem]
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
Expand Down Expand Up @@ -225,14 +231,14 @@ class ControllerChannelManager(controllerContext: ControllerContext,
}
}

case class QueueItem(apiKey: ApiKeys, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
case class QueueItem(apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: AbstractResponse => Unit, enqueueTimeMs: Long)

case class LatestRequestStatus(isInFlight: Boolean, isInQueue: Boolean, enqueueTimeMs: Long)

class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val queue: BlockingQueue[QueueItem],
val queue: BlockingDeque[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
val config: KafkaConfig,
Expand Down Expand Up @@ -279,7 +285,43 @@ class RequestSendThread(val controllerId: Int,
sendAndReceive(requestBuilder, callback)
}

private def nextRequestAndCallback(): (AbstractControlRequest.Builder[_ <: AbstractControlRequest], AbstractResponse => Unit) = {
/**
* This method blocks for an item to be available in the request queue. Once an item is available, it takes
* that item from the queue and checks if it is a control request.
*
* If it is, then this method merges the control request into the controllerRequestMerger.
* If it is not a control request, it puts the item back into the queue.
*
* @return If the request taken from the queue is a control request, return the enqueue
* time and whether to continue merging requests further.
* If the request taken from the queue is found to be a non-control request, return None.
*/
private def takeFromQueueAndMerge(): Option[(Long, Boolean)] = {
val queueItem = queue.take()
val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queueItem
// Request can either be a control request (that can be sent only from a controller to a broker), or a
// generic Kafka request (like ListOffsetsRequest).
requestBuilder match {
// In case of a control request, merge the request to the accumulated list of control requests
case controlRequestBuilder: AbstractControlRequest.Builder[AbstractControlRequest] =>
val continueMerge = mergeControlRequest(enqueueTimeMs, apiKey, controlRequestBuilder, callback)
Some(enqueueTimeMs, continueMerge)
case _ =>
// In case of a non-control request, put the popped item back into the queue and stop merging. It will be
// read back from the queue in a subsequent call to takeSingleItemFromQueue.
queue.putFirst(queueItem)
None
}
}

private def takeSingleItemFromQueue(): (AbstractRequest.Builder[_ <: AbstractRequest], AbstractResponse => Unit) = {
val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
latestRequestStatus = LatestRequestStatus(isInFlight = true, isInQueue = false, enqueueTimeMs)
updateMetrics(apiKey, enqueueTimeMs)
(requestBuilder, callback)
}

private def nextRequestAndCallback(): (AbstractRequest.Builder[_ <: AbstractRequest], AbstractResponse => Unit) = {
if (controllerRequestMerger.hasPendingRequests() ||
(config.interBrokerProtocolVersion >= KAFKA_2_4_IV1 &&
config.liCombinedControlRequestEnable &&
Expand All @@ -297,33 +339,40 @@ class RequestSendThread(val controllerId: Int,
// handle case 4 first
var shouldContinueMerging = true
if (!controllerRequestMerger.hasPendingRequests()) {
val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
latestRequestStatus = LatestRequestStatus(isInFlight = true, isInQueue = false, enqueueTimeMs)
shouldContinueMerging = mergeControlRequest(enqueueTimeMs, apiKey, requestBuilder, callback)
takeFromQueueAndMerge() match {
// Control request taken from queue and merged
case Some((enqueueTimeMs, continueMerge)) =>
latestRequestStatus = LatestRequestStatus(isInFlight = true, isInQueue = false, enqueueTimeMs)
shouldContinueMerging = continueMerge
case None =>
// Non-control request was spotted in the queue. We should simply return this request.
shouldContinueMerging = false
return takeSingleItemFromQueue()
}
}

// now we are guaranteed that the controllerRequestMerger is not empty (case 1 or 3)
// drain the queue until the queue is empty
// drain the queue until the queue is empty. If we encounter a non-control request,
// halt and send requests accumulated so far.

// one concurrent access case considering the producer of the queue:
// an item is put to the queue right after the condition check below.
// That behavior does not change correctness since the inserted item will be picked up in the next round
while (!queue.isEmpty && shouldContinueMerging) {
val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
shouldContinueMerging = mergeControlRequest(enqueueTimeMs, apiKey, requestBuilder, callback)
// Continue merging if the item taken from the queue is a
// control request and has set `shouldContinueMerging` to true
shouldContinueMerging = takeFromQueueAndMerge().exists(_._2)
}

val requestBuilder = controllerRequestMerger.pollLatestRequest()
(requestBuilder, controllerRequestMerger.triggerCallback _)
} else {
// use the old behavior of sending each item in the queue as a separate request
val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
latestRequestStatus = LatestRequestStatus(isInFlight = true, isInQueue = false, enqueueTimeMs)
updateMetrics(apiKey, enqueueTimeMs)
(requestBuilder, callback)
takeSingleItemFromQueue()
}
}

private def sendAndReceive(requestBuilder: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
private def sendAndReceive(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest],
callback: AbstractResponse => Unit): Unit = {
var remoteTimeMs: Long = 0

Expand Down Expand Up @@ -367,9 +416,9 @@ class RequestSendThread(val controllerId: Int,
if (clientResponse != null) {
val requestHeader = clientResponse.requestHeader
val api = requestHeader.apiKey
if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA &&
api != ApiKeys.LI_COMBINED_CONTROL)
if (!ControllerChannelManager.SupportedRequestApiKeys.contains(api)) {
throw new KafkaException(s"Unexpected apiKey received: $api")
}


if (api == ApiKeys.UPDATE_METADATA && !requestBuilder.asInstanceOf[UpdateMetadataRequest.Builder].partitionStates().isEmpty) {
Expand Down Expand Up @@ -863,7 +912,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,

case class ControllerBrokerStateInfo(networkClient: NetworkClient,
brokerNode: Node,
messageQueue: BlockingQueue[QueueItem],
messageQueue: BlockingDeque[QueueItem],
requestSendThread: RequestSendThread,
queueSizeGauge: Gauge[Int],
requestRateAndTimeMetrics: Timer,
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ object ControllerState {
def value = 102
}

case object CorruptedBrokerOffsetsReceived extends ControllerState {
def value = 103
}

case object DelayedElectionSuccess extends ControllerState {
def value = 104
}

case object RegisterCorruptedBroker extends ControllerState {
def value = 105
}

val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
AlterPartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange,
LeaderAndIsrResponseReceived, LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable,
Expand Down
Loading

0 comments on commit c73b0d9

Please sign in to comment.