Skip to content

Commit

Permalink
LIKAFKA-41423: Kafka federation proof of concept using dual-cluster i…
Browse files Browse the repository at this point in the history
…ntegration test. (Code-review fixes: refactor MetadataCache into legacy and federated-subclass variants, and merge ControllerChannelManager's brokerStateInfo and remoteControllerStateInfo maps in favor of simple remoteControllerIds hashset to mark remote controllers.)
  • Loading branch information
groelofs committed Feb 16, 2022
1 parent 5e83597 commit 91da79d
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ public UpdateMetadataRequest build(short version) {
data.setUngroupedPartitionStates(partitionStates);
}

// clusterId == null implies federation is not enabled (though reverse may not be true): no point in
// wasting space on an unused field (TODO? could make it unconditional if useful for debugging purposes)
if (version >= 7 && clusterId != null) {
// clusterId == null implies federation is not enabled (though reverse may not be true); will be ignored
// during serialization (data.toStruct())
if (version >= 7) {
data.setClusterId(clusterId);
}

Expand Down Expand Up @@ -337,8 +337,9 @@ public void rewriteRemoteRequest(String routingClusterId, int controllerId, int
data.setRoutingClusterId(routingClusterId);
data.setControllerId(controllerId);
data.setControllerEpoch(controllerEpoch);
// brokerEpoch apparently gets rewritten by the controller for every receiving broker (somewhere...):
// shouldn't need to mess with it here, right? or should we remove it in the version >= 6 case? FIXME?
// brokerEpoch apparently gets rewritten by the controller for every receiving broker (somewhere...)
// before sending it to them: shouldn't need to mess with it here, right? or should we remove it in
// the version >= 6 case? FIXME?
if (version() >= 6) {
data.setMaxBrokerEpoch(maxBrokerEpoch);
}
Expand Down
42 changes: 15 additions & 27 deletions core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition}

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import scala.collection.{Seq, Map, Set, mutable}

object ControllerChannelManager {
Expand All @@ -56,16 +56,15 @@ class ControllerChannelManager(controllerContext: ControllerContext,
import ControllerChannelManager._

protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
protected val remoteControllerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
protected val remoteControllerIds = new HashSet[Int]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
val brokerResponseSensors: mutable.Map[ApiKeys, BrokerResponseTimeStats] = mutable.HashMap.empty
newGauge(
"TotalQueueSize",
new Gauge[Int] {
def value: Int = brokerLock synchronized {
brokerStateInfo.values.iterator.map(_.messageQueue.size).sum +
remoteControllerStateInfo.values.iterator.map(_.messageQueue.size).sum
brokerStateInfo.values.iterator.map(_.messageQueue.size).sum
}
}
)
Expand All @@ -74,17 +73,17 @@ class ControllerChannelManager(controllerContext: ControllerContext,
controllerContext.liveOrShuttingDownBrokers.foreach(addNewBroker)

brokerLock synchronized {
info(s"GRR DEBUG: about to iterate brokerStateInfo to start RequestSendThreads "
+ s"(${brokerStateInfo.size - remoteControllerIds.size} local brokers, ${remoteControllerIds.size} "
+ "remote controllers)")
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1, brokerState._2.requestSendThread))
info("GRR DEBUG: about to iterate remoteControllerStateInfo to start RequestSendThreads")
remoteControllerStateInfo.foreach(remoteState => startRequestSendThread(remoteState._1, remoteState._2.requestSendThread))
}
initBrokerResponseSensors()
}

def shutdown() = {
brokerLock synchronized {
brokerStateInfo.values.toList.foreach(removeExistingBroker)
remoteControllerStateInfo.values.toList.foreach(removeExistingBroker)
}
removeBrokerResponseSensors()
}
Expand All @@ -105,20 +104,14 @@ class ControllerChannelManager(controllerContext: ControllerContext,
def sendRequest(brokerId: Int, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
callback: AbstractResponse => Unit = null): Unit = {
// GRR FIXME: should we instrument the time spent waiting for this lock (and its other 5 call sites)?
// (would love to see histogram in leader-controller)
// (would love to see histogram in active controller)
brokerLock synchronized {
var stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(request.apiKey, request, callback, time.milliseconds()))
case None =>
stateInfoOpt = remoteControllerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(request.apiKey, request, callback, time.milliseconds()))
case None =>
warn(s"Not sending request $request to broker $brokerId, since it is offline.")
}
warn(s"Not sending request $request to broker $brokerId, since it is offline.")
}
}
}
Expand Down Expand Up @@ -173,9 +166,9 @@ class ControllerChannelManager(controllerContext: ControllerContext,
def addRemoteController(remoteBroker: Broker): Unit = {
info(s"GRR DEBUG: controllerId=${config.brokerId} adding remote controller [${remoteBroker}] for FEDERATION INTER-CLUSTER REQUESTS and starting its RequestSendThread")
brokerLock synchronized {
if (!remoteControllerStateInfo.contains(remoteBroker.id)) {
if (!remoteControllerIds.contains(remoteBroker.id)) {
addNewBroker(remoteBroker, false)
startRequestSendThread(remoteBroker.id, remoteControllerStateInfo(remoteBroker.id).requestSendThread)
startRequestSendThread(remoteBroker.id, brokerStateInfo(remoteBroker.id).requestSendThread)
}
}
}
Expand Down Expand Up @@ -260,15 +253,12 @@ class ControllerChannelManager(controllerContext: ControllerContext,
brokerMetricTags(broker.id)
)

//GRR FIXME: do sanity check whether same ID exists within sibling map (brokerStateInfo/remoteControllerStateInfo)
if (isLocal) {
brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
} else {
info(s"GRR DEBUG: adding ${brokerNode} info (network client, message queue, request thread, etc.) to new remoteControllerStateInfo map for federation inter-cluster requests")
remoteControllerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
if (!isLocal) {
info(s"GRR DEBUG: adding remote ${brokerNode} info (network client, message queue, request thread, etc.) to brokerStateInfo map for federation inter-cluster requests")
remoteControllerIds.add(broker.id)
}
brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
}

private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
Expand All @@ -286,8 +276,6 @@ class ControllerChannelManager(controllerContext: ControllerContext,
removeMetric(QueueSizeMetricName, brokerMetricTags(brokerState.brokerNode.id))
removeMetric(RequestRateAndQueueTimeMetricName, brokerMetricTags(brokerState.brokerNode.id))
brokerStateInfo.remove(brokerState.brokerNode.id)
//GRR FIXME?
remoteControllerStateInfo.remove(brokerState.brokerNode.id) // make conditional on "None" return from prev line?
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()

metadataCache = new MetadataCache(config.brokerId, clusterId, config.liFederationEnable)
metadataCache = if (config.liFederationEnable) new FederatedMetadataCache(config.brokerId, clusterId)
else new MetadataCache(config.brokerId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
Expand Down
Loading

0 comments on commit 91da79d

Please sign in to comment.