Skip to content

Commit

Permalink
[LI-HOTFIX] Event based fetcher part 2/3: adding the FetcherEventBus …
Browse files Browse the repository at this point in the history
…and FetcherEventManager (#123)

TICKET = KAFKA-10734
LI_DESCRIPTION = Part 2 of 3 PRs to change the fetcher into an event-based model
EXIT_CRITERIA = When KAFKA-10734 is closed and the changes are pulled in as a part of a release
  • Loading branch information
gitlw authored Mar 22, 2021
1 parent 770d097 commit e3a5936
Show file tree
Hide file tree
Showing 6 changed files with 721 additions and 5 deletions.
70 changes: 65 additions & 5 deletions core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package kafka.server

import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.function.Consumer

import kafka.cluster.BrokerEndPoint
import kafka.common.ClientIdAndBroker
import kafka.log.LogAppendInfo
Expand All @@ -31,19 +36,50 @@ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, OffsetsForLeaderEpochRequest}
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}

import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
import scala.math.min


sealed trait FetcherEvent extends Comparable[FetcherEvent] {
def priority: Int // an event with a higher priority value is more important
def state: FetcherState
override def compareTo(other: FetcherEvent): Int = {
// an event with a higher proirity value should be dequeued first in a PriorityQueue
other.priority.compareTo(this.priority)
}
}

// TODO: merge the AddPartitions and RemovePartitions into a single event
case class AddPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch], future: KafkaFutureImpl[Void]) extends FetcherEvent {
override def priority = 2
override def state = FetcherState.AddPartitions
}

case class RemovePartitions(topicPartitions: Set[TopicPartition], future: KafkaFutureImpl[Void]) extends FetcherEvent {
override def priority = 2
override def state = FetcherState.RemovePartitions
}

case class GetPartitionCount(future: KafkaFutureImpl[Int]) extends FetcherEvent {
override def priority = 2
override def state = FetcherState.GetPartitionCount
}

case object TruncateAndFetch extends FetcherEvent {
override def priority = 1
override def state = FetcherState.TruncateAndFetch
}


class DelayedFetcherEvent(delay: Long, val fetcherEvent: FetcherEvent) extends DelayedItem(delayMs = delay) {
}

abstract class AbstractAsyncFetcher(clientId: String,
val sourceBroker: BrokerEndPoint,
failedPartitions: FailedPartitions,
fetchBackOffMs: Int = 0) extends Logging {
fetchBackOffMs: Int = 0,
fetcherEventBus: FetcherEventBus) extends FetcherEventProcessor with Logging {

type FetchData = FetchResponse.PartitionData[Records]
type EpochData = OffsetsForLeaderEpochRequest.PartitionData
Expand Down Expand Up @@ -82,6 +118,15 @@ abstract class AbstractAsyncFetcher(clientId: String,
protected def isOffsetForLeaderEpochSupported: Boolean


def truncateAndFetch(): Unit = {
maybeTruncate()
if (maybeFetch()) {
fetcherEventBus.schedule(new DelayedFetcherEvent(fetchBackOffMs, TruncateAndFetch))
} else {
fetcherEventBus.put(TruncateAndFetch)
}
}

/**
* maybeFetch returns whether the fetching logic should back off
*/
Expand Down Expand Up @@ -113,6 +158,21 @@ abstract class AbstractAsyncFetcher(clientId: String,
}
}

override def process(event: FetcherEvent): Unit = {
event match {
case AddPartitions(initialFetchStates, future) =>
addPartitions(initialFetchStates)
future.complete(null)
case RemovePartitions(topicPartitions, future) =>
removePartitions(topicPartitions)
future.complete(null)
case GetPartitionCount(future) =>
future.complete(partitionStates.size())
case TruncateAndFetch =>
truncateAndFetch()
}
}

/**
* Builds offset for leader epoch requests for partitions that are in the truncating phase based
* on latest epochs of the future replicas (the one that is fetching)
Expand Down
186 changes: 186 additions & 0 deletions core/src/main/scala/kafka/server/FetcherEventBus.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.utils.Time
import java.util.{Comparator, PriorityQueue}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}

import kafka.utils.DelayedItem

/**
* A QueuedFetcherEvent can be put into the PriorityQueue of the FetcherEventBus
* and polled according to the priority of the FetcherEvent.
* If two events have the same priority, the one with the smaller sequence number
* will be polled first.
* @param event
* @param enqueueTimeMs
* @param sequenceNumber
*/
class QueuedFetcherEvent(val event: FetcherEvent,
val enqueueTimeMs: Long,
val sequenceNumber: Long) extends Comparable[QueuedFetcherEvent] {
override def compareTo(other: QueuedFetcherEvent): Int = {
val priorityDiff = event.compareTo(other.event)
if (priorityDiff != 0) {
priorityDiff
} else {
// the event with the smaller sequenceNumber will be polled first
this.sequenceNumber.compareTo(other.sequenceNumber)
}
}
}

/**
* The SimpleScheduler is not thread safe
*/
class SimpleScheduler[T <: DelayedItem] {
private val delayedQueue = new PriorityQueue[T](Comparator.naturalOrder[T]())

def schedule(item: T) : Unit = {
delayedQueue.add(item)
}

/**
* peek can be used to get the earliest item that has become current.
* There are 3 cases when peek() is called
* 1. There are no items whatsoever. peek would return (None, Long.MaxValue) to indicate that the caller needs to wait
* indefinitely until an item is inserted.
* 2. There are items, and yet none has become current. peek would return (None, delay) where delay represents
* the time to wait before the earliest item becomes current.
* 3. Some item has become current. peek would return (Some(item), 0L)
*/
def peek(): (Option[T], Long) = {
if (delayedQueue.isEmpty) {
(None, Long.MaxValue)
} else {
val delayedEvent = delayedQueue.peek()
val delayMs = delayedEvent.getDelay(TimeUnit.MILLISECONDS)
if (delayMs == 0) {
(Some(delayedQueue.peek()), 0L)
} else {
(None, delayMs)
}
}
}

/**
* poll() unconditionally removes the earliest item
* If there are no items, poll() has no effect.
*/
def poll(): Unit = {
delayedQueue.poll()
}

def size = delayedQueue.size
}

/**
* the ConditionFactory trait is defined such that a MockCondition can be
* created for the purpose of testing
*/
trait ConditionFactory {
def createCondition(lock: Lock): Condition
}

object DefaultConditionFactory extends ConditionFactory {
override def createCondition(lock: Lock): Condition = lock.newCondition()
}

/**
* The FetcherEventBus supports queued events and delayed events.
* Queued events are inserted via the {@link #put} method, and delayed events
* are inserted via the {@link #schedule} method.
* Events are polled via the {@link #getNextEvent} method, which returns
* either a queued event or a scheduled event.
* @param time
*/
class FetcherEventBus(time: Time, conditionFactory: ConditionFactory = DefaultConditionFactory) {
private val eventLock = new ReentrantLock()
private val newEventCondition = conditionFactory.createCondition(eventLock)

private val queue = new PriorityQueue[QueuedFetcherEvent]
private val nextSequenceNumber = new AtomicLong()
private val scheduler = new SimpleScheduler[DelayedFetcherEvent]
@volatile private var shutdownInitialized = false

def eventQueueSize() = queue.size

def scheduledEventQueueSize() = scheduler.size

/**
* close should be called in a thread different from the one calling getNextEvent()
*/
def close(): Unit = {
shutdownInitialized = true
inLock(eventLock) {
newEventCondition.signalAll()
}
}

def put(event: FetcherEvent): Unit = {
inLock(eventLock) {
queue.add(new QueuedFetcherEvent(event, time.milliseconds(), nextSequenceNumber.getAndIncrement()))
newEventCondition.signalAll()
}
}

def schedule(delayedEvent: DelayedFetcherEvent): Unit = {
inLock(eventLock) {
scheduler.schedule(delayedEvent)
newEventCondition.signalAll()
}
}

/**
* There are 2 cases when the getNextEvent() method is called
* 1. There is at least one delayed event that has become current or at least one queued event. We return either
* the delayed event with the earliest due time or the queued event, depending on their priority.
* 2. There are neither delayed events that have become current, nor queued events. We block until the earliest delayed
* event becomes current. A special case is that there are no delayed events at all, under which the call would block
* indefinitely until it is waken up by a new delayed or queued event.
*
* @return Either a QueuedFetcherEvent or a DelayedFetcherEvent that has become current. A special case is that the
* FetcherEventBus is shutdown before an event can be polled, under which null will be returned.
*/
def getNextEvent(): QueuedFetcherEvent = {
inLock(eventLock) {
var result : QueuedFetcherEvent = null

while (!shutdownInitialized && result == null) {
// check if any delayed event has become current. If so, move it to the queue
val (delayedFetcherEvent, delayMs) = scheduler.peek()
if (delayedFetcherEvent.nonEmpty) {
scheduler.poll()
queue.add(new QueuedFetcherEvent(delayedFetcherEvent.get.fetcherEvent, time.milliseconds(), nextSequenceNumber.getAndIncrement()))
}

if (!queue.isEmpty) {
result = queue.poll()
} else {
newEventCondition.await(delayMs, TimeUnit.MILLISECONDS)
}
}

result
}
}
}
Loading

0 comments on commit e3a5936

Please sign in to comment.