diff --git a/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala b/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala index 1547d24b0bff..c794ae4af0e7 100644 --- a/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala +++ b/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala @@ -17,6 +17,11 @@ package kafka.server +import java.nio.ByteBuffer +import java.util.Optional +import java.util.concurrent.TimeUnit +import java.util.function.Consumer + import kafka.cluster.BrokerEndPoint import kafka.common.ClientIdAndBroker import kafka.log.LogAppendInfo @@ -31,19 +36,50 @@ import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, OffsetsForLeaderEpochRequest} import org.apache.kafka.common.{InvalidRecordException, TopicPartition} -import java.nio.ByteBuffer -import java.util.Optional -import java.util.concurrent.TimeUnit -import java.util.function.Consumer import scala.collection.JavaConverters._ import scala.collection.{Map, Seq, Set, mutable} import scala.math.min +sealed trait FetcherEvent extends Comparable[FetcherEvent] { + def priority: Int // an event with a higher priority value is more important + def state: FetcherState + override def compareTo(other: FetcherEvent): Int = { + // an event with a higher proirity value should be dequeued first in a PriorityQueue + other.priority.compareTo(this.priority) + } +} + +// TODO: merge the AddPartitions and RemovePartitions into a single event +case class AddPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch], future: KafkaFutureImpl[Void]) extends FetcherEvent { + override def priority = 2 + override def state = FetcherState.AddPartitions +} + +case class RemovePartitions(topicPartitions: Set[TopicPartition], future: KafkaFutureImpl[Void]) extends FetcherEvent { + override def priority = 2 + override def state = FetcherState.RemovePartitions +} + +case class GetPartitionCount(future: KafkaFutureImpl[Int]) extends FetcherEvent { + override def priority = 2 + override def state = FetcherState.GetPartitionCount +} + +case object TruncateAndFetch extends FetcherEvent { + override def priority = 1 + override def state = FetcherState.TruncateAndFetch +} + + +class DelayedFetcherEvent(delay: Long, val fetcherEvent: FetcherEvent) extends DelayedItem(delayMs = delay) { +} + abstract class AbstractAsyncFetcher(clientId: String, val sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions, - fetchBackOffMs: Int = 0) extends Logging { + fetchBackOffMs: Int = 0, + fetcherEventBus: FetcherEventBus) extends FetcherEventProcessor with Logging { type FetchData = FetchResponse.PartitionData[Records] type EpochData = OffsetsForLeaderEpochRequest.PartitionData @@ -82,6 +118,15 @@ abstract class AbstractAsyncFetcher(clientId: String, protected def isOffsetForLeaderEpochSupported: Boolean + def truncateAndFetch(): Unit = { + maybeTruncate() + if (maybeFetch()) { + fetcherEventBus.schedule(new DelayedFetcherEvent(fetchBackOffMs, TruncateAndFetch)) + } else { + fetcherEventBus.put(TruncateAndFetch) + } + } + /** * maybeFetch returns whether the fetching logic should back off */ @@ -113,6 +158,21 @@ abstract class AbstractAsyncFetcher(clientId: String, } } + override def process(event: FetcherEvent): Unit = { + event match { + case AddPartitions(initialFetchStates, future) => + addPartitions(initialFetchStates) + future.complete(null) + case RemovePartitions(topicPartitions, future) => + removePartitions(topicPartitions) + future.complete(null) + case GetPartitionCount(future) => + future.complete(partitionStates.size()) + case TruncateAndFetch => + truncateAndFetch() + } + } + /** * Builds offset for leader epoch requests for partitions that are in the truncating phase based * on latest epochs of the future replicas (the one that is fetching) diff --git a/core/src/main/scala/kafka/server/FetcherEventBus.scala b/core/src/main/scala/kafka/server/FetcherEventBus.scala new file mode 100644 index 000000000000..687fdf19659e --- /dev/null +++ b/core/src/main/scala/kafka/server/FetcherEventBus.scala @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.CoreUtils.inLock +import org.apache.kafka.common.utils.Time +import java.util.{Comparator, PriorityQueue} +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} + +import kafka.utils.DelayedItem + +/** + * A QueuedFetcherEvent can be put into the PriorityQueue of the FetcherEventBus + * and polled according to the priority of the FetcherEvent. + * If two events have the same priority, the one with the smaller sequence number + * will be polled first. + * @param event + * @param enqueueTimeMs + * @param sequenceNumber + */ +class QueuedFetcherEvent(val event: FetcherEvent, + val enqueueTimeMs: Long, + val sequenceNumber: Long) extends Comparable[QueuedFetcherEvent] { + override def compareTo(other: QueuedFetcherEvent): Int = { + val priorityDiff = event.compareTo(other.event) + if (priorityDiff != 0) { + priorityDiff + } else { + // the event with the smaller sequenceNumber will be polled first + this.sequenceNumber.compareTo(other.sequenceNumber) + } + } +} + +/** + * The SimpleScheduler is not thread safe + */ +class SimpleScheduler[T <: DelayedItem] { + private val delayedQueue = new PriorityQueue[T](Comparator.naturalOrder[T]()) + + def schedule(item: T) : Unit = { + delayedQueue.add(item) + } + + /** + * peek can be used to get the earliest item that has become current. + * There are 3 cases when peek() is called + * 1. There are no items whatsoever. peek would return (None, Long.MaxValue) to indicate that the caller needs to wait + * indefinitely until an item is inserted. + * 2. There are items, and yet none has become current. peek would return (None, delay) where delay represents + * the time to wait before the earliest item becomes current. + * 3. Some item has become current. peek would return (Some(item), 0L) + */ + def peek(): (Option[T], Long) = { + if (delayedQueue.isEmpty) { + (None, Long.MaxValue) + } else { + val delayedEvent = delayedQueue.peek() + val delayMs = delayedEvent.getDelay(TimeUnit.MILLISECONDS) + if (delayMs == 0) { + (Some(delayedQueue.peek()), 0L) + } else { + (None, delayMs) + } + } + } + + /** + * poll() unconditionally removes the earliest item + * If there are no items, poll() has no effect. + */ + def poll(): Unit = { + delayedQueue.poll() + } + + def size = delayedQueue.size +} + +/** + * the ConditionFactory trait is defined such that a MockCondition can be + * created for the purpose of testing + */ +trait ConditionFactory { + def createCondition(lock: Lock): Condition +} + +object DefaultConditionFactory extends ConditionFactory { + override def createCondition(lock: Lock): Condition = lock.newCondition() +} + +/** + * The FetcherEventBus supports queued events and delayed events. + * Queued events are inserted via the {@link #put} method, and delayed events + * are inserted via the {@link #schedule} method. + * Events are polled via the {@link #getNextEvent} method, which returns + * either a queued event or a scheduled event. + * @param time + */ +class FetcherEventBus(time: Time, conditionFactory: ConditionFactory = DefaultConditionFactory) { + private val eventLock = new ReentrantLock() + private val newEventCondition = conditionFactory.createCondition(eventLock) + + private val queue = new PriorityQueue[QueuedFetcherEvent] + private val nextSequenceNumber = new AtomicLong() + private val scheduler = new SimpleScheduler[DelayedFetcherEvent] + @volatile private var shutdownInitialized = false + + def eventQueueSize() = queue.size + + def scheduledEventQueueSize() = scheduler.size + + /** + * close should be called in a thread different from the one calling getNextEvent() + */ + def close(): Unit = { + shutdownInitialized = true + inLock(eventLock) { + newEventCondition.signalAll() + } + } + + def put(event: FetcherEvent): Unit = { + inLock(eventLock) { + queue.add(new QueuedFetcherEvent(event, time.milliseconds(), nextSequenceNumber.getAndIncrement())) + newEventCondition.signalAll() + } + } + + def schedule(delayedEvent: DelayedFetcherEvent): Unit = { + inLock(eventLock) { + scheduler.schedule(delayedEvent) + newEventCondition.signalAll() + } + } + + /** + * There are 2 cases when the getNextEvent() method is called + * 1. There is at least one delayed event that has become current or at least one queued event. We return either + * the delayed event with the earliest due time or the queued event, depending on their priority. + * 2. There are neither delayed events that have become current, nor queued events. We block until the earliest delayed + * event becomes current. A special case is that there are no delayed events at all, under which the call would block + * indefinitely until it is waken up by a new delayed or queued event. + * + * @return Either a QueuedFetcherEvent or a DelayedFetcherEvent that has become current. A special case is that the + * FetcherEventBus is shutdown before an event can be polled, under which null will be returned. + */ + def getNextEvent(): QueuedFetcherEvent = { + inLock(eventLock) { + var result : QueuedFetcherEvent = null + + while (!shutdownInitialized && result == null) { + // check if any delayed event has become current. If so, move it to the queue + val (delayedFetcherEvent, delayMs) = scheduler.peek() + if (delayedFetcherEvent.nonEmpty) { + scheduler.poll() + queue.add(new QueuedFetcherEvent(delayedFetcherEvent.get.fetcherEvent, time.milliseconds(), nextSequenceNumber.getAndIncrement())) + } + + if (!queue.isEmpty) { + result = queue.poll() + } else { + newEventCondition.await(delayMs, TimeUnit.MILLISECONDS) + } + } + + result + } + } +} diff --git a/core/src/main/scala/kafka/server/FetcherEventManager.scala b/core/src/main/scala/kafka/server/FetcherEventManager.scala new file mode 100644 index 000000000000..d83e50685785 --- /dev/null +++ b/core/src/main/scala/kafka/server/FetcherEventManager.scala @@ -0,0 +1,148 @@ +package kafka.server + + +import java.util.concurrent.TimeUnit + +import com.yammer.metrics.core.Gauge +import kafka.cluster.BrokerEndPoint +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} +import kafka.utils.ShutdownableThread +import org.apache.kafka.common.internals.KafkaFutureImpl +import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.{KafkaFuture, TopicPartition} + +import scala.collection.{Map, Set} + +trait FetcherEventProcessor { + def process(event: FetcherEvent) + def fetcherStats: AsyncFetcherStats + def fetcherLagStats : AsyncFetcherLagStats + def sourceBroker: BrokerEndPoint + def close(): Unit +} + +object FetcherEventManager { + val EventQueueTimeMetricName = "EventQueueTimeMs" + val EventQueueSizeMetricName = "EventQueueSize" + val ScheduledEventQueueSizeMetricName = "ScheduledEventQueueSize" +} + +/** + * The FetcherEventManager can spawn a FetcherEventThread, whose main job is to take events from a + * FetcherEventBus and executes them in a FetcherEventProcessor. + * @param name + * @param fetcherEventBus + * @param processor + * @param time + */ +class FetcherEventManager(name: String, + fetcherEventBus: FetcherEventBus, + processor: FetcherEventProcessor, + time: Time) extends KafkaMetricsGroup { + + import FetcherEventManager._ + + val rateAndTimeMetrics: Map[FetcherState, KafkaTimer] = FetcherState.values.flatMap { state => + state.rateAndTimeMetricName.map { metricName => + state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + } + }.toMap + + @volatile private var _state: FetcherState = FetcherState.Idle + private[server] val thread = new FetcherEventThread(name) + + def fetcherStats: AsyncFetcherStats = processor.fetcherStats + def fetcherLagStats : AsyncFetcherLagStats = processor.fetcherLagStats + def sourceBroker: BrokerEndPoint = processor.sourceBroker + def isThreadFailed: Boolean = thread.isThreadFailed + + private val eventQueueTimeHist = newHistogram(EventQueueTimeMetricName) + + newGauge( + EventQueueSizeMetricName, + new Gauge[Int] { + def value: Int = { + fetcherEventBus.eventQueueSize() + } + } + ) + + newGauge( + ScheduledEventQueueSizeMetricName, + new Gauge[Int] { + def value: Int = { + fetcherEventBus.scheduledEventQueueSize() + } + } + ) + + def state: FetcherState = _state + + def start(): Unit = { + fetcherEventBus.put(TruncateAndFetch) + thread.start() + } + + def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): KafkaFuture[Void] = { + val future = new KafkaFutureImpl[Void] {} + fetcherEventBus.put(AddPartitions(initialFetchStates, future)) + future + } + + def removePartitions(topicPartitions: Set[TopicPartition]): KafkaFuture[Void] = { + val future = new KafkaFutureImpl[Void] {} + fetcherEventBus.put(RemovePartitions(topicPartitions, future)) + future + } + + def getPartitionsCount(): KafkaFuture[Int] = { + val future = new KafkaFutureImpl[Int]{} + fetcherEventBus.put(GetPartitionCount(future)) + future + } + + def close(): Unit = { + try { + thread.initiateShutdown() + fetcherEventBus.close() + thread.awaitShutdown() + } finally { + removeMetric(EventQueueTimeMetricName) + removeMetric(EventQueueSizeMetricName) + removeMetric(ScheduledEventQueueSizeMetricName) + } + + processor.close() + } + + + class FetcherEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) { + logIdent = s"[FetcherEventThread fetcherId=$name] " + + + /** + * This method is repeatedly invoked until the thread shuts down or this method throws an exception + */ + override def doWork(): Unit = { + val nextEvent = fetcherEventBus.getNextEvent() + if (nextEvent == null) { + // a null value will be returned when the fetcherEventBus has started shutting down + return + } + + val fetcherEvent = nextEvent.event + _state = fetcherEvent.state + eventQueueTimeHist.update(time.milliseconds() - nextEvent.enqueueTimeMs) + + try { + rateAndTimeMetrics(state).time { + processor.process(fetcherEvent) + } + } catch { + case e: Exception => error(s"Uncaught error processing event $fetcherEvent", e) + } + + _state = FetcherState.Idle + } + } +} diff --git a/core/src/main/scala/kafka/server/FetcherState.scala b/core/src/main/scala/kafka/server/FetcherState.scala new file mode 100644 index 000000000000..21dd0c43a0fe --- /dev/null +++ b/core/src/main/scala/kafka/server/FetcherState.scala @@ -0,0 +1,35 @@ +package kafka.server + +sealed abstract class FetcherState { + def value: Byte + + def rateAndTimeMetricName: Option[String] = + if (hasRateAndTimeMetric) Some(s"${toString}RateAndTimeMs") else None + + protected def hasRateAndTimeMetric: Boolean = true +} + +object FetcherState { + case object Idle extends FetcherState { + def value = 0 + override protected def hasRateAndTimeMetric: Boolean = false + } + + case object AddPartitions extends FetcherState { + def value = 1 + } + + case object RemovePartitions extends FetcherState { + def value = 2 + } + + case object GetPartitionCount extends FetcherState { + def value = 3 + } + + case object TruncateAndFetch extends FetcherState { + def value = 4 + } + + val values: Seq[FetcherState] = Seq(Idle, AddPartitions, RemovePartitions, GetPartitionCount, TruncateAndFetch) +} diff --git a/core/src/test/scala/integration/kafka/server/FetcherEventManagerTest.scala b/core/src/test/scala/integration/kafka/server/FetcherEventManagerTest.scala new file mode 100644 index 000000000000..2ba7b7511d92 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/FetcherEventManagerTest.scala @@ -0,0 +1,83 @@ +package integration.kafka.server + +import kafka.cluster.BrokerEndPoint +import kafka.server._ +import org.apache.kafka.common.internals.KafkaFutureImpl +import org.apache.kafka.common.utils.Time +import org.easymock.EasyMock.{createMock, expect, replay, verify} +import org.junit.Assert.assertEquals +import org.junit.Test + +class FetcherEventManagerTest { + + @Test + def testInitialState(): Unit = { + val time = Time.SYSTEM + val fetcherEventBus: FetcherEventBus = createMock(classOf[FetcherEventBus]) + expect(fetcherEventBus.put(TruncateAndFetch)).andVoid() + expect(fetcherEventBus.close()).andVoid() + replay(fetcherEventBus) + + val processor : FetcherEventProcessor = createMock(classOf[FetcherEventProcessor]) + val fetcherEventManager = new FetcherEventManager("thread-1", fetcherEventBus, processor, time) + + fetcherEventManager.start() + fetcherEventManager.close() + + verify(fetcherEventBus) + } + + @Test + def testEventExecution(): Unit = { + val time = Time.SYSTEM + val fetcherEventBus = new FetcherEventBus(time) + + @volatile var addPartitionsProcessed = 0 + @volatile var removePartitionsProcessed = 0 + @volatile var getPartitionsProcessed = 0 + @volatile var truncateAndFetchProcessed = 0 + val processor : FetcherEventProcessor = new FetcherEventProcessor { + override def process(event: FetcherEvent): Unit = { + event match { + case AddPartitions(initialFetchStates, future) => + addPartitionsProcessed += 1 + future.asInstanceOf[KafkaFutureImpl[Void]].complete(null) + case RemovePartitions(topicPartitions, future) => + removePartitionsProcessed += 1 + future.asInstanceOf[KafkaFutureImpl[Void]].complete(null) + case GetPartitionCount(future) => + getPartitionsProcessed += 1 + future.asInstanceOf[KafkaFutureImpl[Int]].complete(1) + case TruncateAndFetch => + truncateAndFetchProcessed += 1 + } + + } + + override def fetcherStats: AsyncFetcherStats = ??? + + override def fetcherLagStats: AsyncFetcherLagStats = ??? + + override def sourceBroker: BrokerEndPoint = ??? + + override def close(): Unit = {} + } + + val fetcherEventManager = new FetcherEventManager("thread-1", fetcherEventBus, processor, time) + val addPartitionsFuture = fetcherEventManager.addPartitions(Map.empty) + val removePartitionsFuture = fetcherEventManager.removePartitions(Set.empty) + val getPartitionCountFuture = fetcherEventManager.getPartitionsCount() + + fetcherEventManager.start() + addPartitionsFuture.get() + removePartitionsFuture.get() + getPartitionCountFuture.get() + + assertEquals(1, addPartitionsProcessed) + assertEquals(1, removePartitionsProcessed) + assertEquals(1, getPartitionsProcessed) + fetcherEventManager.close() + } + +} + diff --git a/core/src/test/scala/unit/kafka/server/FetcherEventBusTest.scala b/core/src/test/scala/unit/kafka/server/FetcherEventBusTest.scala new file mode 100644 index 000000000000..62d00204989e --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/FetcherEventBusTest.scala @@ -0,0 +1,204 @@ +package unit.kafka.server + +import java.util.Date +import java.util.concurrent.locks.{Condition, Lock} +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} + +import kafka.server._ +import kafka.utils.MockTime +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Test + +import scala.collection.Map +import scala.collection.mutable.ArrayBuffer + + +class FetcherEventBusTest { + @Test + def testGetWhileEmpty(): Unit = { + // test the getNextEvent method while the fetcherEventBus is empty + val fetcherEventBus = new FetcherEventBus(new MockTime()) + + val service = Executors.newSingleThreadExecutor() + @volatile var counter = 0 + val runnableFinished = new CountDownLatch(1) + service.submit(new Runnable { + override def run(): Unit = { + // trying to call get will block indefinitely + fetcherEventBus.getNextEvent() + counter = 1 + runnableFinished.countDown() + } + }) + + // the runnable should still be blocked + assertTrue(counter == 0) + + // put a event to unblock the runnable + fetcherEventBus.put(AddPartitions(Map.empty, null)) + service.shutdown() + runnableFinished.await() + assertTrue(counter == 1) + } + + @Test + def testQueuedEvent(): Unit = { + val fetcherEventBus = new FetcherEventBus(new MockTime()) + val addPartitions = AddPartitions(Map.empty, null) + fetcherEventBus.put(addPartitions) + assertTrue(fetcherEventBus.getNextEvent().event == addPartitions) + } + + + @Test + def testQueuedEventsWithDifferentPriorities(): Unit = { + val fetcherEventBus = new FetcherEventBus(new MockTime()) + + val lowPriorityTask = TruncateAndFetch + fetcherEventBus.put(lowPriorityTask) + + val highPriorityTask = AddPartitions(Map.empty, null) + fetcherEventBus.put(highPriorityTask) + + val expectedSequence = Seq(highPriorityTask, lowPriorityTask) + + val actualSequence = ArrayBuffer[FetcherEvent]() + + for (_ <- 0 until 2) { + actualSequence += fetcherEventBus.getNextEvent().event + } + + assertEquals(expectedSequence, actualSequence) + } + + @Test + def testQueuedEventsWithSamePriority(): Unit = { + // Two queued events with the same priority should be polled + // according to their sequence numbers in a FIFO manner + val fetcherEventBus = new FetcherEventBus(new MockTime()) + + val task1 = AddPartitions(Map.empty, null) + fetcherEventBus.put(task1) + + val task2 = AddPartitions(Map.empty, null) + fetcherEventBus.put(task2) + + val expectedSequence = Seq(task1, task2) + + val actualSequence = ArrayBuffer[FetcherEvent]() + + for (_ <- 0 until 2) { + actualSequence += fetcherEventBus.getNextEvent().event + } + + assertEquals(expectedSequence, actualSequence) + } + + class MockCondition extends Condition { + override def await(): Unit = ??? + + override def awaitUninterruptibly(): Unit = ??? + + override def awaitNanos(nanosTimeout: Long): Long = ??? + + override def await(time: Long, unit: TimeUnit): Boolean = { + awaitCalled = true + false // false indicates that no further waiting is needed + } + + override def awaitUntil(deadline: Date): Boolean = ??? + + override def signal(): Unit = ??? + + override def signalAll(): Unit = {} + + var awaitCalled = false + } + + class MockConditionFactory(condition: MockCondition) extends ConditionFactory { + override def createCondition(lock: Lock): Condition = condition + } + + @Test + def testDelayedEvent(): Unit = { + val time = new MockTime() + val condition = new MockCondition + val fetcherEventBus = new FetcherEventBus(time, new MockConditionFactory(condition)) + val addPartitions = AddPartitions(Map.empty, null) + val delay = 1000 + fetcherEventBus.schedule(new DelayedFetcherEvent(delay, addPartitions)) + assertEquals(1, fetcherEventBus.scheduledEventQueueSize()) + val service = Executors.newSingleThreadExecutor() + + val future = service.submit(new Runnable { + override def run(): Unit = { + assertTrue(fetcherEventBus.getNextEvent().event == addPartitions) + assertTrue(condition.awaitCalled) + assertEquals(0, fetcherEventBus.scheduledEventQueueSize()) + } + }) + + future.get() + service.shutdown() + } + + @Test + def testDelayedEventsWithDifferentDueTimes(): Unit = { + val time = new MockTime() + val condition = new MockCondition + val fetcherEventBus = new FetcherEventBus(time, new MockConditionFactory(condition)) + val secondTask = AddPartitions(Map.empty, null) + fetcherEventBus.schedule(new DelayedFetcherEvent(200, secondTask)) + + val firstTask = RemovePartitions(Set.empty, null) + fetcherEventBus.schedule(new DelayedFetcherEvent(100, firstTask)) + + val service = Executors.newSingleThreadExecutor() + + val expectedSequence = Seq(firstTask, secondTask) + + val actualSequence = ArrayBuffer[FetcherEvent]() + val future = service.submit(new Runnable { + override def run(): Unit = { + for (_ <- 0 until 2) { + actualSequence += fetcherEventBus.getNextEvent().event + } + } + }) + + future.get() + assertEquals(expectedSequence, actualSequence) + service.shutdown() + } + + @Test + def testBothDelayedAndQueuedEvent(): Unit = { + val time = new MockTime() + val condition = new MockCondition + val fetcherEventBus = new FetcherEventBus(time, new MockConditionFactory(condition)) + + val queuedEvent = RemovePartitions(Set.empty, null) + fetcherEventBus.put(queuedEvent) + + val delay = 10 + val scheduledEvent = AddPartitions(Map.empty, null) + fetcherEventBus.schedule(new DelayedFetcherEvent(delay, scheduledEvent)) + + val service = Executors.newSingleThreadExecutor() + + @volatile var receivedEvents = 0 + val expectedEvents = 2 + val future = service.submit(new Runnable { + override def run(): Unit = { + for (_ <- 0 until expectedEvents) { + fetcherEventBus.getNextEvent() + receivedEvents += 1 + } + } + }) + + future.get() + assertTrue(receivedEvents == expectedEvents) + service.shutdown() + } +}