From 6da08dde389f359dc3292f65570f8894debd208e Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 18 Feb 2022 17:56:46 -0800 Subject: [PATCH] Forward the UMR inside the ControllerEvent thread --- .../kafka/controller/ControllerState.scala | 7 +- .../kafka/controller/KafkaController.scala | 35 +++++++++- .../main/scala/kafka/server/KafkaApis.scala | 65 +++++-------------- 3 files changed, 54 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index e83ee0709369..0f1a952a5507 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -124,9 +124,14 @@ object ControllerState { def value = 20 } + case object ForwardUpdateMetadataRequest extends ControllerState { + def value = 21 + } + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, AlterPartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable, ListPartitionReassignment, UpdateMetadataResponseReceived, - TopicDeletionFlagChange, PreferredControllerChange, TopicMinInSyncReplicasConfigChange, SkipControlledShutdownSafetyCheck) + TopicDeletionFlagChange, PreferredControllerChange, TopicMinInSyncReplicasConfigChange, SkipControlledShutdownSafetyCheck, + ForwardUpdateMetadataRequest) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 53ac6f2af663..c3517d5369bb 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -22,14 +22,14 @@ import kafka.admin.{AdminOperationException, AdminUtils} import kafka.api._ import kafka.cluster.Broker import kafka.common._ -import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback} +import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ForwardUpdateMetadataCallback, ListReassignmentsCallback} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server._ import kafka.utils._ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk._ import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler} -import org.apache.kafka.common.{ElectionType, KafkaException, TopicPartition, Node} +import org.apache.kafka.common.{ElectionType, KafkaException, Node, TopicPartition} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, NotEnoughReplicasException, PolicyViolationException, StaleBrokerEpochException} import org.apache.kafka.common.message.UpdateMetadataResponseData import org.apache.kafka.common.metrics.Metrics @@ -57,6 +57,7 @@ object KafkaController extends Logging { type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit + type ForwardUpdateMetadataCallback = UpdateMetadataResponse => Unit def satisfiesLiCreateTopicPolicy(createTopicPolicy : Option[CreateTopicPolicy], zkClient : KafkaZkClient, topic : String, partitionsAssignment : collection.Map[Int, ReplicaAssignment]): Boolean = { @@ -419,6 +420,10 @@ class KafkaController(val config: KafkaConfig, zkClient.updateBrokerInfo(newBrokerInfo) } + private[kafka] def forwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback): Unit = { + eventManager.put(ForwardUpdateMetadataRequest(umr, callback)) + } + private[kafka] def enableDefaultUncleanLeaderElection(): Unit = { eventManager.put(UncleanLeaderElectionEnable) } @@ -1458,6 +1463,26 @@ class KafkaController(val config: KafkaConfig, controllerContext.skipShutdownSafetyCheck += (id -> brokerEpoch) } + def processForwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback): Unit = { + if (!isActive) { + throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown") + } + + info(s"controller for clusterId=${clusterId} has received a remote, non-rewritten UpdateMetadataRequest " + + s"(clusterId=${umr.originClusterId}, routingClusterId=${umr.routingClusterId}): about to validate and rewrite it") + + // Inside KafkaApis, we've already validated that + // 1. the originClusterId is not equal to my local cluster Id + // 2. the routingClusterId is null + umr.rewriteRemoteRequest(clusterId, config.brokerId, + controllerContext.epoch, controllerContext.maxBrokerEpoch) + val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq + sendRemoteUpdateMetadataRequest(liveBrokers, umr) + + // For now, we always return a successful UpdateMetadataResponse + callback(new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code))) + } + private def safeToShutdown(id: Int, brokerEpoch: Long): Boolean = { // First, check whether or not the broker requesting shutdown has already been told that it is OK to shut down // at this epoch. @@ -2383,6 +2408,8 @@ class KafkaController(val config: KafkaConfig, processStartup() case SkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) => processSkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) + case ForwardUpdateMetadataRequest(umr, callback) => + processForwardUpdateMetadataRequest(umr, callback) } } catch { case e: ControllerMovedException => @@ -2589,6 +2616,10 @@ case class SkipControlledShutdownSafetyCheck(id: Int, brokerEpoch: Long, skipCon def state: ControllerState.SkipControlledShutdownSafetyCheck.type = ControllerState.SkipControlledShutdownSafetyCheck } +case class ForwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback) extends ControllerEvent { + def state = ControllerState.ForwardUpdateMetadataRequest +} + case class LeaderAndIsrResponseReceived(leaderAndIsrResponse: LeaderAndIsrResponse, brokerId: Int) extends ControllerEvent { def state = ControllerState.LeaderAndIsrResponseReceived } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ff0a2cf5b4e3..3b7f8027edb4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -102,7 +102,7 @@ class KafkaApis(val requestChannel: RequestChannel, val quotas: QuotaManagers, val fetchManager: FetchManager, brokerTopicStats: BrokerTopicStats, - val clusterId: String, // GRR FIXME: any guarantee this is non-null? + val clusterId: String, time: Time, val tokenManager: DelegationTokenManager) extends Logging { @@ -132,8 +132,6 @@ class KafkaApis(val requestChannel: RequestChannel, try { trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") -// info(s"GRR DEBUG (TEMPORARY): Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + -// s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") request.header.apiKey match { case ApiKeys.PRODUCE => handleProduceRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) @@ -222,7 +220,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // FIXME? request arg is NOT USED private def doHandleLeaderAndIsrRequest(request: RequestChannel.Request, correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest): LeaderAndIsrResponse = { def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]): Unit = { // for each new leader or follower, call coordinator to handle consumer group migration. @@ -265,7 +262,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // FIXME? request arg is NOT USED private def doHandleStopReplicaRequest(request: RequestChannel.Request, stopReplicaRequest: StopReplicaRequest): StopReplicaResponse = { val (result, error) = replicaManager.stopReplicas(stopReplicaRequest) // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we @@ -297,14 +293,16 @@ class KafkaApis(val requestChannel: RequestChannel, val updateMetadataRequest = request.body[UpdateMetadataRequest] authorizeClusterOperation(request, CLUSTER_ACTION) - // GRR FIXME: not 100% clear whether staleness criterion should apply to updates coming from other physical - // clusters, but based on KIP-380 description, seems like we probably do need it in order to deal with same - // problems KIP-380 was intended to solve (i.e., "cluster A" controller bounce around same time as "cluster - // B" remote UpdateMetadataRequest); also implies that local controllers must track brokerEpochs of remote - // controllers - // (separate question is why check isn't needed by LI's combined control request, which skips directly to - // doHandleUpdateMetadataRequest(): probable BUG) - if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.maxBrokerEpoch)) { + if (shouldForwardUpdateMetadataRequest(updateMetadataRequest)) { + // The metadata propagation follows an eventual consistency model, which means + // an UpdateMetadataRequest is not specific to a particular broker, or to a particular broker epoch. + // For example, if a newly restarted broker accepts an UpdateMetadataRequest intended for its previous + // epoch, there won't be any correctness violations. + // For UpdateMetadataRequests from foreign clusters, there is no need to check the brokerEpoch or maxBrokerEpoch + controller.forwardUpdateMetadataRequest(updateMetadataRequest, updateMetadataResponse => { + sendResponseExemptThrottle(request, updateMetadataResponse) + }) + } else if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.maxBrokerEpoch)) { // When the broker restarts very quickly, it is possible for this broker to receive request intended // for its previous generation so the broker should skip the stale request. info("Received update metadata request with stale broker epoch info " + @@ -318,24 +316,12 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def shouldForwardUpdateMetadataRequest(updateMetadataRequest: UpdateMetadataRequest): Boolean = { + config.liFederationEnable && clusterId.equals(updateMetadataRequest.originClusterId()) && updateMetadataRequest.routingClusterId() == null + } + // [unlike the other "doHandle..." methods, this one DOES use the request arg] private def doHandleUpdateMetadataRequest(request: RequestChannel.Request, correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): UpdateMetadataResponse = { - - // Since handleLiCombinedControlRequest() calls us directly (bypassing handleUpdateMetadataRequest() and its - // stale broker-epoch check), this seems like the most appropriate place for the new federation "router" to - // live: rest of (original) method is the legacy "broker half" logic. - if (!config.liFederationEnable || clusterId.equals(updateMetadataRequest.originClusterId) || clusterId.equals(updateMetadataRequest.routingClusterId)) { - // This is either a local/legacy/non-federated request (from our ZK => originClusterId matches) or one our controller - // has already rewritten (received from a remote controller => routingClusterId matches), so do the normal, - // broker-half processing below. -// info(s"GRR DEBUG: brokerId=${brokerId} received updateMetadataRequest: controllerId=${updateMetadataRequest.controllerId}, originClusterId=${updateMetadataRequest.originClusterId}, routingClusterId=${updateMetadataRequest.routingClusterId}") - if (updateMetadataRequest.originClusterId != null && clusterId.equals(updateMetadataRequest.routingClusterId)) { - info(s"GRR DEBUG: local brokerId=${brokerId} in clusterId=${clusterId} received rewritten updateMetadataRequest from remote clusterId=${updateMetadataRequest.originClusterId}") - } - // [The following block is NOT properly indented in order to simplify upstream merges.] - - - info(s"GRR DEBUG: brokerId=${brokerId} calling maybeUpdateMetadataCache() with correlationId=${correlationId} and updateMetadataRequest from clusterId=${updateMetadataRequest.originClusterId}") val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest) if (deletedPartitions.nonEmpty) groupCoordinator.handleDeletedPartitions(deletedPartitions) @@ -361,17 +347,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code)) - - - - } else { - // [Federation only.] This is an incoming remote request (i.e., from another physical cluster in the federation), - // so hand it off to our controller half for validation, rewriting, and rerouting. - info(s"GRR DEBUG: local brokerId=${brokerId} in clusterId=${clusterId} received new updateMetadataRequest from remote controllerId=${updateMetadataRequest.controllerId} in clusterId=${updateMetadataRequest.originClusterId}; sending to controller for validation and rewrite") - controller.rewriteAndForwardRemoteUpdateMetadataRequest(updateMetadataRequest) // modifies UMR in place, returns response - // same method ^^^ stuffs the rewritten UMR into the processing queue, which lives in controller's - // ControllerEventManager (KafkaController's eventManager member var) - } } def handleControlledShutdownRequest(request: RequestChannel.Request): Unit = { @@ -3159,11 +3134,6 @@ class KafkaApis(val requestChannel: RequestChannel, val responseData = new LiCombinedControlResponseData() decomposedRequest.leaderAndIsrRequest match { - // GRR FIXME: this code path skips the "stale broker epoch" check that the main path (through - // handleLeaderAndIsrRequest()) makes: BUG? (seems like it) (maybe intention was to add single - // stale-epoch check in handleLiCombinedControlRequest(), but forgot to do so?) - // [separate question: why was LAIR's top-level BrokerEpoch moved into LCCR's LeaderAndIsrPartitionState - // struct? why is MaxBrokerEpoch missing? is LCCR out of date?] case Some(leaderAndIsrRequest) => { val leaderAndIsrResponse = doHandleLeaderAndIsrRequest(request, correlationId, leaderAndIsrRequest) responseData.setLeaderAndIsrErrorCode(leaderAndIsrResponse.errorCode()) @@ -3173,9 +3143,6 @@ class KafkaApis(val requestChannel: RequestChannel, } decomposedRequest.updateMetadataRequest match { - // GRR FIXME: this code path skips the "stale broker epoch" check that the main path (through - // handleUpdateMetadataRequest()) makes: BUG? (seems like it) - // [separate question: why was UMR's top-level BrokerEpoch not copied to LCCR?] case Some(updateMetadataRequest) => { val updateMetadataResponse = doHandleUpdateMetadataRequest(request, correlationId, updateMetadataRequest) responseData.setUpdateMetadataErrorCode(updateMetadataResponse.errorCode()) @@ -3186,8 +3153,6 @@ class KafkaApis(val requestChannel: RequestChannel, val stopReplicaRequests = decomposedRequest.stopReplicaRequests val stopReplicaPartitionErrors = new util.ArrayList[StopReplicaPartitionError]() stopReplicaRequests.foreach{ stopReplicaRequest => { - // GRR FIXME: this code path skips the "stale broker epoch" check that the main path (through - // handleStopReplicaRequest()) makes: BUG? (seems like it) val stopReplicaResponse = doHandleStopReplicaRequest(request, stopReplicaRequest) responseData.setStopReplicaErrorCode(stopReplicaResponse.errorCode()) stopReplicaPartitionErrors.addAll(LiCombinedControlTransformer.transformStopReplicaPartitionErrors(stopReplicaResponse.partitionErrors()))