Skip to content

Commit

Permalink
[LI-HOTFIX] Add process time breakdown log for LeaderAndIsrRequest (#378
Browse files Browse the repository at this point in the history
)

TICKET = LIKAFKA-46169
LI_DESCRIPTION =
To track down why in 3.0 control requests needs more time to process,
we need logs to track time for each part to process.
EXIT_CRITERIA = When better mechanism to track is implemented upstream
  • Loading branch information
lmr3796 authored Aug 25, 2022
1 parent e8f54c6 commit 62905d7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
31 changes: 31 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1463,10 +1463,19 @@ class ReplicaManager(val config: KafkaConfig,
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {

val breakdown = mutable.Map[String, Long]().withDefaultValue(0L)

val startMs = time.milliseconds()
val intervalMarker = new CoreUtils.TimeIntervalMarker(startMs)

replicaStateChangeLock synchronized {
intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
breakdown("acquireReplicaStateChangeLock") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
breakdown("convertLeaderAndIsrRequestPartitionStates") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
Expand Down Expand Up @@ -1502,6 +1511,7 @@ class ReplicaManager(val config: KafkaConfig,
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
allRequestPartitions.add(topicPartition)

intervalMarker.markTime(time.milliseconds())
val partitionOpt = getPartition(topicPartition) match {
case HostedPartition.Offline =>
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
Expand All @@ -1519,6 +1529,7 @@ class ReplicaManager(val config: KafkaConfig,
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
Some(partition)
}
breakdown("validateTopicPartitionInfo") += intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

// Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
Expand Down Expand Up @@ -1577,12 +1588,18 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(topicPartition, error)
}
}

breakdown("validateTopicIdInfo") += intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
}


if (leaderAndIsrRequest.`type`() == LeaderAndIsrRequestType.FULL) {
stateChangeLogger.trace("received FULL LeaderAndISR request")
intervalMarker.markTime(time.milliseconds())
val partitionsToDelete = findStrayPartitions(allRequestPartitions, logManager.allLogs)
breakdown("findStrayPartition") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
deleteStrayReplicas(partitionsToDelete)
breakdown("deleteStrayReplicas") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
} else if (leaderAndIsrRequest.`type`() == LeaderAndIsrRequestType.INCREMENTAL) {
stateChangeLogger.trace("received INCREMENTAL LeaderAndISR request, skip finding of stray partitions")
} else {
Expand All @@ -1594,20 +1611,25 @@ class ReplicaManager(val config: KafkaConfig,
}
val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }

intervalMarker.markTime(time.milliseconds())
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
breakdown("generateHighWatermarkCheckpoints") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
highWatermarkCheckpoints, topicIdFromRequest)
else
Set.empty[Partition]
breakdown("makeLeaders") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints, topicIdFromRequest)
else
Set.empty[Partition]
breakdown("makeFollowers") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
updateLeaderAndFollowerMetrics(followerTopicSet)
breakdown("updateLeaderAndFollowerMetrics") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

leaderAndIsrRequest.partitionStates.forEach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
Expand All @@ -1620,19 +1642,25 @@ class ReplicaManager(val config: KafkaConfig,
if (localLog(topicPartition).isEmpty)
markPartitionOffline(topicPartition)
}
breakdown("markPartitionOffline") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
startHighWatermarkCheckPointThread()
breakdown("startHighWatermarkCheckPointThread") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

maybeAddLogDirFetchers(partitionStates.keySet, highWatermarkCheckpoints, topicIdFromRequest)
breakdown("maybeAddLogDirFetchers") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

replicaFetcherManager.shutdownIdleFetcherThreads()
// replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
breakdown("shutdownIdleFetcherThreads") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

remoteLogManager.foreach(rlm => rlm.onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower, topicIds))
breakdown("remoteLogManagerOnLeadershipChangeCallback") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
breakdown("localLogManagerOnLeadershipChangeCallback") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())

val data = new LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code)
/* In version 5 and below, response contains a list of topic-partitions with
Expand All @@ -1658,13 +1686,16 @@ class ReplicaManager(val config: KafkaConfig,
.setErrorCode(error.code))
}
}
breakdown("constructLeaderAndIsrResponse") = intervalMarker.markTimeAndReturnLatestInterval(time.milliseconds())
new LeaderAndIsrResponse(data, leaderAndIsrRequest.version)
}
}
val endMs = time.milliseconds()
val elapsedMs = endMs - startMs
stateChangeLogger.info(s"Finished LeaderAndIsr request in ${elapsedMs}ms correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
stateChangeLogger.info(s"Breakdown time in milliseconds for LeaderAndIsr request with correlationId $correlationId from controller " +
s"$controllerId: " + breakdown)
response
}
}
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/kafka/utils/CoreUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,26 @@ object CoreUtils {
TimeUnit.NANOSECONDS.toMicros(positiveNanos).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
}

/**
* An easy helper to help calculate breakdown times
* Meant not to be complicated like handling thread-safety
* @param startTime
*/
class TimeIntervalMarker(startTime: Long) {
var previous = startTime
var current = startTime

def latestInterval = current - previous

def markTime(mark: Long): Unit = {
previous = current
current = mark
}

def markTimeAndReturnLatestInterval(mark: Long): Long = {
markTime(mark)
latestInterval
}
}

}

0 comments on commit 62905d7

Please sign in to comment.