Skip to content

Commit

Permalink
Bulk Load CDK Stream Incomplete Prefactor 2: Drop Queue Reader and Si…
Browse files Browse the repository at this point in the history
…zed Queue
  • Loading branch information
johnny-schmidt committed Oct 5, 2024
1 parent fe77c7d commit 04a512e
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
package io.airbyte.cdk.message

import io.airbyte.cdk.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.state.MemoryManager
import io.github.oshai.kotlinlogging.KotlinLogging
import io.airbyte.cdk.state.Reserved
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import kotlinx.coroutines.runBlocking

interface Sized {
val sizeBytes: Long
}

/**
* Wrapper for record messages published to the message queue, containing metadata like index and
Expand All @@ -37,56 +38,25 @@ data class StreamCompleteWrapped(
override val sizeBytes: Long = 0L
}

class DestinationRecordQueue : ChannelMessageQueue<Reserved<DestinationRecordWrapped>>()

/**
* Message queue to which @[DestinationRecordWrapped] messages can be published on a @
* [DestinationStream] key.
*
* It maintains a map of @[QueueChannel]s by stream, and tracks the memory usage across all
* channels, blocking when the maximum is reached.
*
* This maximum is expected to be low, as the assumption is that data will be spooled to disk as
* quickly as possible.
* A supplier of message queues to which ([MemoryManager.reserveBlocking]'d) @
* [DestinationRecordWrapped] messages can be published on a @ [DestinationStream] key. The queues
* themselves do not manage memory.
*/
@Singleton
class DestinationMessageQueue(
catalog: DestinationCatalog,
config: DestinationConfiguration,
private val memoryManager: MemoryManager,
private val queueChannelFactory: QueueChannelFactory<DestinationRecordWrapped>
) : MessageQueue<DestinationStream.Descriptor, DestinationRecordWrapped> {
private val channels:
ConcurrentHashMap<DestinationStream.Descriptor, QueueChannel<DestinationRecordWrapped>> =
ConcurrentHashMap()

private val totalQueueSizeBytes = AtomicLong(0L)
private val reservedMemory: MemoryManager.Reservation
private val reservedMemoryManager: MemoryManager
private val memoryLock = ReentrantLock()
private val memoryLockCondition = memoryLock.newCondition()
@Secondary
class DestinationRecordQueueSupplier(catalog: DestinationCatalog) :
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>> {
private val queues = ConcurrentHashMap<DestinationStream.Descriptor, DestinationRecordQueue>()

init {
catalog.streams.forEach { channels[it.descriptor] = queueChannelFactory.make(this) }
val adjustedRatio =
config.maxMessageQueueMemoryUsageRatio /
(1.0 + config.estimatedRecordMemoryOverheadRatio)
reservedMemory = runBlocking { memoryManager.reserveRatio(adjustedRatio) }
reservedMemoryManager = reservedMemory.getReservationManager()
catalog.streams.forEach { queues[it.descriptor] = DestinationRecordQueue() }
}

override suspend fun acquireQueueBytesBlocking(bytes: Long) {
reservedMemoryManager.reserveBlocking(bytes)
override fun get(key: DestinationStream.Descriptor): DestinationRecordQueue {
return queues[key]
?: throw IllegalArgumentException("Reading from non-existent record stream: $key")
}

override suspend fun releaseQueueBytes(bytes: Long) {
reservedMemoryManager.release(bytes)
}

override suspend fun getChannel(
key: DestinationStream.Descriptor,
): QueueChannel<DestinationRecordWrapped> {
return channels[key]
?: throw IllegalArgumentException("Reading from non-existent QueueChannel: ${key}")
}

private val log = KotlinLogging.logger {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,26 @@

package io.airbyte.cdk.message

import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow

interface Sized {
val sizeBytes: Long
}

interface MessageQueue<K, T : Sized> {
suspend fun acquireQueueBytesBlocking(bytes: Long)
suspend fun releaseQueueBytes(bytes: Long)
suspend fun getChannel(key: K): QueueChannel<T>
}

interface QueueChannel<T : Sized> {
interface MessageQueue<T> {
suspend fun publish(message: T)
suspend fun consume(): Flow<T>
suspend fun close()
suspend fun isClosed(): Boolean
suspend fun send(message: T)
suspend fun receive(): T
}

/** A channel that blocks when its parent queue has no available memory. */
interface BlockingQueueChannel<T : Sized> : QueueChannel<T> {
val messageQueue: MessageQueue<*, T>
val channel: Channel<T>

override suspend fun send(message: T) {
if (isClosed()) {
throw IllegalStateException("Send to closed QueueChannel")
}
val estimatedSize = message.sizeBytes
messageQueue.acquireQueueBytesBlocking(estimatedSize)
channel.send(message)
}

override suspend fun receive(): T {
if (isClosed()) {
throw IllegalStateException("Receive from closed QueueChannel")
}
val message = channel.receive()
val estimatedSize = message.sizeBytes
messageQueue.releaseQueueBytes(estimatedSize)
return message
}
}

interface QueueChannelFactory<T : Sized> {
fun make(messageQueue: MessageQueue<*, T>): QueueChannel<T>
}

/**
* The default queue channel is just a dumb wrapper around an unlimited kotlin channel of wrapped
* records.
*
* Note: we wrap channel closedness in an atomic boolean because the @[Channel.isClosedForSend] and
* @[Channel.isClosedForReceive] apis are marked as delicate/experimental.
*/
class DefaultQueueChannel(override val messageQueue: MessageQueue<*, DestinationRecordWrapped>) :
BlockingQueueChannel<DestinationRecordWrapped> {
override val channel = Channel<DestinationRecordWrapped>(Channel.UNLIMITED)
private val closed = AtomicBoolean(false)
abstract class ChannelMessageQueue<T> : MessageQueue<T> {
val channel = Channel<T>(Channel.UNLIMITED)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
override suspend fun close() {
if (closed.compareAndSet(false, true)) {
channel.close()
}
channel.close()
}

override suspend fun isClosed(): Boolean = closed.get()
}

@Singleton
class DefaultQueueChannelFactory : QueueChannelFactory<DestinationRecordWrapped> {
override fun make(
messageQueue: MessageQueue<*, DestinationRecordWrapped>
): QueueChannel<DestinationRecordWrapped> = DefaultQueueChannel(messageQueue)
interface MessageQueueSupplier<K, T> {
fun get(key: K): MessageQueue<T>
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ package io.airbyte.cdk.message

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.state.CheckpointManager
import io.airbyte.cdk.state.MemoryManager
import io.airbyte.cdk.state.Reserved
import io.airbyte.cdk.state.SyncManager
import jakarta.inject.Singleton
import kotlinx.coroutines.runBlocking

/** A publishing interface for writing messages to a message queue. */
interface MessageQueueWriter<T : Any> {
Expand All @@ -28,12 +32,26 @@ interface MessageQueueWriter<T : Any> {
)
@Singleton
class DestinationMessageQueueWriter(
private val config: DestinationConfiguration,
private val catalog: DestinationCatalog,
private val messageQueue: MessageQueue<DestinationStream.Descriptor, DestinationRecordWrapped>,
private val queueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
private val syncManager: SyncManager,
private val checkpointManager:
CheckpointManager<DestinationStream.Descriptor, CheckpointMessage>
CheckpointManager<DestinationStream.Descriptor, CheckpointMessage>,
systemMemoryManager: MemoryManager
) : MessageQueueWriter<DestinationMessage> {
private val queueReservation = runBlocking {
systemMemoryManager.reserveRatio(config.maxMessageQueueMemoryUsageRatio, this)
}
private val memoryManager = queueReservation.getReservationManager()

private suspend fun reserve(sized: DestinationRecordWrapped) =
memoryManager.reserveBlocking(
(sized.sizeBytes * config.estimatedRecordMemoryOverheadRatio).toLong(),
sized
)

/**
* Deserialize and route the message to the appropriate channel.
*
Expand All @@ -53,14 +71,16 @@ class DestinationMessageQueueWriter(
sizeBytes = sizeBytes,
record = message
)
messageQueue.getChannel(message.stream).send(wrapped)
queueSupplier.get(message.stream).publish(reserve(wrapped))
}

/* If an end-of-stream marker. */
is DestinationStreamComplete,
is DestinationStreamIncomplete -> {
val wrapped = StreamCompleteWrapped(index = manager.markEndOfStream())
messageQueue.getChannel(message.stream).send(wrapped)
val queue = queueSupplier.get(message.stream)
queue.publish(memoryManager.reserveBlocking(0L, wrapped))
queue.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,31 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Releasable reservation of memory. For large blocks (ie, from [MemoryManager.reserveRatio],
* provides a submanager that can be used to manage allocating the reservation).
*/
class Reserved<T>(
private val memoryManager: MemoryManager,
val bytesReserved: Long,
val value: T,
) : CloseableCoroutine {
private var released = AtomicBoolean(false)

suspend fun release() {
if (!released.compareAndSet(false, true)) {
return
}
memoryManager.release(bytesReserved)
}

fun getReservationManager(): MemoryManager = MemoryManager(bytesReserved)

override suspend fun close() {
release()
}
}

/**
* Manages memory usage for the destination.
*
Expand All @@ -36,32 +61,11 @@ class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
private val mutex = Mutex()
private val syncChannel = Channel<Unit>(Channel.UNLIMITED)

/**
* Releasable reservation of memory. For large blocks (ie, from [reserveRatio], provides a
* submanager that can be used to manage allocating the reservation).
*/
inner class Reservation(val bytes: Long) : CloseableCoroutine {
private var released = AtomicBoolean(false)

suspend fun release() {
if (!released.compareAndSet(false, true)) {
return
}
release(bytes)
}

fun getReservationManager(): MemoryManager = MemoryManager(bytes)

override suspend fun close() {
release()
}
}

val remainingMemoryBytes: Long
get() = totalMemoryBytes - usedMemoryBytes.get()

/* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */
suspend fun reserveBlocking(memoryBytes: Long): Reservation {
suspend fun <T> reserveBlocking(memoryBytes: Long, reservedFor: T): Reserved<T> {
if (memoryBytes > totalMemoryBytes) {
throw IllegalArgumentException(
"Requested ${memoryBytes}b memory exceeds ${totalMemoryBytes}b total"
Expand All @@ -74,14 +78,13 @@ class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
}
usedMemoryBytes.addAndGet(memoryBytes)

return Reservation(memoryBytes)
return Reserved(this, memoryBytes, reservedFor)
}
}

suspend fun reserveRatio(ratio: Double): Reservation {
suspend fun <T> reserveRatio(ratio: Double, reservedFor: T): Reserved<T> {
val estimatedSize = (totalMemoryBytes.toDouble() * ratio).toLong()
reserveBlocking(estimatedSize)
return Reservation(estimatedSize)
return reserveBlocking(estimatedSize, reservedFor)
}

suspend fun release(memoryBytes: Long) {
Expand Down
Loading

0 comments on commit 04a512e

Please sign in to comment.