Skip to content

Commit

Permalink
Add LiMinSegmentRollMs to prevent log roll to be so fast (#521)
Browse files Browse the repository at this point in the history
Today, the log roll time can be very short if maxCompactionLagMs and logRollTimeJitterMillis are not set properly. E.g. if logRollTimeJitterMillis is larger than maxCompactionLagMs, the log roll time can be as short as 0 millisecond so that new segments are rolling out on every new message. This will cause too many open file handles error and crash the process.

This fix adds a min interval between segments rollout, and the interval will be able to configured at server side via config LiMinSegmentRollMs. In this PR, we set the default value to be 15 minutes, so that no new segments can be rollout within 15 minutes.
  • Loading branch information
CCisGG authored Sep 4, 2024
1 parent cfa6f94 commit e6b20e6
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 7 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,18 @@ case class RollParams(maxSegmentMs: Long,
maxTimestampInMessages: Long,
maxOffsetInMessages: Long,
messagesSize: Int,
now: Long)
now: Long,
liMinLogRollMs: Long = 0)

object RollParams {
def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
new RollParams(config.maxSegmentMs,
config.segmentSize,
appendInfo.maxTimestamp,
appendInfo.lastOffset,
messagesSize, now)
messagesSize,
now,
config.liMinSegmentRollMs)
}
}

Expand Down Expand Up @@ -2052,7 +2055,6 @@ class Log(@volatile private var _dir: File,
)
}
// FIXME: this code path involves not only data plane segments but also KRaft metadata logs. Should find a way to distinguish after moving to KRaft.

// XXX: An internal dashboard depends on parsing this warn log line. Get SRE reviews before changing the format.
warn(s"Attempted truncating to offset $targetOffset. Resulted in truncated to $offsetTruncatedTo from the original log end offset $originalLogEndOffset, " +
s"with $messagesTruncated messages and $bytesTruncated bytes truncated")
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object Defaults {
val SegmentSize = kafka.server.Defaults.LogSegmentBytes
val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L
val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L
val LiMinSegmentRollMs = kafka.server.Defaults.LiMinSegmentRollMs
val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages
val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs
val RetentionSize = kafka.server.Defaults.LogRetentionBytes
Expand Down Expand Up @@ -87,6 +88,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
val segmentSize = getInt(LogConfig.SegmentBytesProp)
val segmentMs = getLong(LogConfig.SegmentMsProp)
val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)
val liMinSegmentRollMs = getLong(KafkaConfig.LiMinLogRollTimeMillisProp)
val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp)
val flushInterval = getLong(LogConfig.FlushMessagesProp)
val flushMs = getLong(LogConfig.FlushMsProp)
Expand Down Expand Up @@ -312,6 +314,8 @@ object LogConfig {
KafkaConfig.LogRollTimeMillisProp)
.define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc,
KafkaConfig.LogRollTimeJitterMillisProp)
.define(KafkaConfig.LiMinLogRollTimeMillisProp, LONG, Defaults.LiMinSegmentRollMs, atLeast(0), MEDIUM, KafkaConfig.LiMinLogRollTimeMillisDoc,
KafkaConfig.LiMinLogRollTimeMillisProp)
.define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc,
KafkaConfig.LogIndexSizeMaxBytesProp)
.define(FlushMessagesProp, LONG, Defaults.FlushInterval, atLeast(0), MEDIUM, FlushIntervalDoc,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class LogSegment private[log] (val log: FileRecords,
def timeIndex: TimeIndex = lazyTimeIndex.get

def shouldRoll(rollParams: RollParams): Boolean = {
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) >
math.max(rollParams.maxSegmentMs - rollJitterMs, rollParams.liMinLogRollMs)
size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ object Defaults {
val LogRollHours = 24 * 7
val LogRollJitterHours = 0
val LogRetentionHours = 24 * 7
val LiMinSegmentRollMs = 15 * 60 * 1000L

val LogRetentionBytes = -1L
val LogCleanupIntervalMs = 5 * 60 * 1000L
Expand Down Expand Up @@ -515,6 +516,8 @@ object KafkaConfig {
val LogRollTimeJitterMillisProp = "log.roll.jitter.ms"
val LogRollTimeJitterHoursProp = "log.roll.jitter.hours"

val LiMinLogRollTimeMillisProp = "li.min.log.roll.ms"

val LogRetentionTimeMillisProp = "log.retention.ms"
val LogRetentionTimeMinutesProp = "log.retention.minutes"
val LogRetentionTimeHoursProp = "log.retention.hours"
Expand Down Expand Up @@ -919,6 +922,8 @@ object KafkaConfig {
val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used"
val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property"

val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) before a log segment can be forced to roll over due to time-based rolling. Segments may still roll faster due to size limits."

val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used. If set to -1, no time limit is applied."
val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used"
val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property"
Expand Down
36 changes: 34 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ class LogTest {
@Test
def testTimeBasedLogRoll(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, liMinSegmentRollMs = 0L)

// create a log
val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
Expand Down Expand Up @@ -1665,7 +1665,7 @@ class LogTest {
var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val maxJitter = 20 * 60L
// create a log
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, liMinSegmentRollMs = 0L)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
log.appendAsLeader(set, leaderEpoch = 0)
Expand All @@ -1682,6 +1682,38 @@ class LogTest {
"Log should roll after segmentMs adjusted by random jitter")
}

/**
* Test for jitter s for time based log roll with liMinSegmentRollMs not 0
*/
@Test
def testTimeBasedLogRollJitterWithMinSegmentRollMs(): Unit = {
var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val maxJitter = 20 * 60L
// create a log
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter,
liMinSegmentRollMs = 10 * 60 * 60L)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
log.appendAsLeader(set, leaderEpoch = 0)

mockTime.sleep(log.config.segmentMs - maxJitter)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(1, log.numberOfSegments,
"Log does not roll on this append because it occurs earlier than max jitter")
mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(1, log.numberOfSegments,
"Log should not roll before liMinSegmentRollMs")

mockTime.sleep(log.config.liMinSegmentRollMs)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(2, log.numberOfSegments,
"Log should roll as it passes liMinSegmentRollMs")
}

/**
* Test that appending more than the maximum segment size rolls the log
*/
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.log
import java.io.File
import java.util.Properties
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, KafkaConfig, LogDirFailureChannel}
import kafka.utils.{Scheduler, TestUtils}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
Expand Down Expand Up @@ -48,6 +48,7 @@ object LogTestUtils {

def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
segmentBytes: Int = Defaults.SegmentSize,
liMinSegmentRollMs: Long = Defaults.LiMinSegmentRollMs,
retentionMs: Long = Defaults.RetentionMs,
retentionBytes: Long = Defaults.RetentionSize,
localRetentionMs: Long = Defaults.LocalRetentionMs,
Expand Down Expand Up @@ -75,6 +76,7 @@ object LogTestUtils {
logProps.put(LogConfig.RemoteLogStorageEnableProp, remoteLogStorageEnable: java.lang.Boolean)
logProps.put(LogConfig.LocalLogRetentionMsProp, localRetentionMs: java.lang.Long)
logProps.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes: java.lang.Long)
logProps.put(KafkaConfig.LiMinLogRollTimeMillisProp, liMinSegmentRollMs: java.lang.Long)
LogConfig(logProps)
}

Expand Down

0 comments on commit e6b20e6

Please sign in to comment.