Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK Stream Incomplete Prefactor 2: Drop Queue Reader and Si… #46516

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
package io.airbyte.cdk.message

import io.airbyte.cdk.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.state.MemoryManager
import io.github.oshai.kotlinlogging.KotlinLogging
import io.airbyte.cdk.state.Reserved
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import kotlinx.coroutines.runBlocking

interface Sized {
val sizeBytes: Long
}

/**
* Wrapper for record messages published to the message queue, containing metadata like index and
Expand All @@ -37,56 +38,25 @@ data class StreamCompleteWrapped(
override val sizeBytes: Long = 0L
}

class DestinationRecordQueue : ChannelMessageQueue<Reserved<DestinationRecordWrapped>>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this just be a type alias? https://kotlinlang.org/docs/type-aliases.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no? Because I need to instantiate it when initializing the QueueSupplier?


/**
* Message queue to which @[DestinationRecordWrapped] messages can be published on a @
* [DestinationStream] key.
*
* It maintains a map of @[QueueChannel]s by stream, and tracks the memory usage across all
* channels, blocking when the maximum is reached.
*
* This maximum is expected to be low, as the assumption is that data will be spooled to disk as
* quickly as possible.
* A supplier of message queues to which ([MemoryManager.reserveBlocking]'d) @
* [DestinationRecordWrapped] messages can be published on a @ [DestinationStream] key. The queues
* themselves do not manage memory.
*/
@Singleton
class DestinationMessageQueue(
catalog: DestinationCatalog,
config: DestinationConfiguration,
private val memoryManager: MemoryManager,
private val queueChannelFactory: QueueChannelFactory<DestinationRecordWrapped>
) : MessageQueue<DestinationStream.Descriptor, DestinationRecordWrapped> {
private val channels:
ConcurrentHashMap<DestinationStream.Descriptor, QueueChannel<DestinationRecordWrapped>> =
ConcurrentHashMap()

private val totalQueueSizeBytes = AtomicLong(0L)
private val reservedMemory: MemoryManager.Reservation
private val reservedMemoryManager: MemoryManager
private val memoryLock = ReentrantLock()
private val memoryLockCondition = memoryLock.newCondition()
@Secondary
class DestinationRecordQueueSupplier(catalog: DestinationCatalog) :
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>> {
private val queues = ConcurrentHashMap<DestinationStream.Descriptor, DestinationRecordQueue>()

init {
catalog.streams.forEach { channels[it.descriptor] = queueChannelFactory.make(this) }
val adjustedRatio =
config.maxMessageQueueMemoryUsageRatio /
(1.0 + config.estimatedRecordMemoryOverheadRatio)
reservedMemory = runBlocking { memoryManager.reserveRatio(adjustedRatio) }
reservedMemoryManager = reservedMemory.getReservationManager()
catalog.streams.forEach { queues[it.descriptor] = DestinationRecordQueue() }
}

override suspend fun acquireQueueBytesBlocking(bytes: Long) {
reservedMemoryManager.reserveBlocking(bytes)
override fun get(key: DestinationStream.Descriptor): DestinationRecordQueue {
return queues[key]
?: throw IllegalArgumentException("Reading from non-existent record stream: $key")
}

override suspend fun releaseQueueBytes(bytes: Long) {
reservedMemoryManager.release(bytes)
}

override suspend fun getChannel(
key: DestinationStream.Descriptor,
): QueueChannel<DestinationRecordWrapped> {
return channels[key]
?: throw IllegalArgumentException("Reading from non-existent QueueChannel: ${key}")
}

private val log = KotlinLogging.logger {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,26 @@

package io.airbyte.cdk.message

import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow

interface Sized {
val sizeBytes: Long
}

interface MessageQueue<K, T : Sized> {
suspend fun acquireQueueBytesBlocking(bytes: Long)
suspend fun releaseQueueBytes(bytes: Long)
suspend fun getChannel(key: K): QueueChannel<T>
}

interface QueueChannel<T : Sized> {
interface MessageQueue<T> {
suspend fun publish(message: T)
suspend fun consume(): Flow<T>
suspend fun close()
suspend fun isClosed(): Boolean
suspend fun send(message: T)
suspend fun receive(): T
}

/** A channel that blocks when its parent queue has no available memory. */
interface BlockingQueueChannel<T : Sized> : QueueChannel<T> {
val messageQueue: MessageQueue<*, T>
val channel: Channel<T>

override suspend fun send(message: T) {
if (isClosed()) {
throw IllegalStateException("Send to closed QueueChannel")
}
val estimatedSize = message.sizeBytes
messageQueue.acquireQueueBytesBlocking(estimatedSize)
channel.send(message)
}

override suspend fun receive(): T {
if (isClosed()) {
throw IllegalStateException("Receive from closed QueueChannel")
}
val message = channel.receive()
val estimatedSize = message.sizeBytes
messageQueue.releaseQueueBytes(estimatedSize)
return message
}
}

interface QueueChannelFactory<T : Sized> {
fun make(messageQueue: MessageQueue<*, T>): QueueChannel<T>
}

/**
* The default queue channel is just a dumb wrapper around an unlimited kotlin channel of wrapped
* records.
*
* Note: we wrap channel closedness in an atomic boolean because the @[Channel.isClosedForSend] and
* @[Channel.isClosedForReceive] apis are marked as delicate/experimental.
*/
class DefaultQueueChannel(override val messageQueue: MessageQueue<*, DestinationRecordWrapped>) :
BlockingQueueChannel<DestinationRecordWrapped> {
override val channel = Channel<DestinationRecordWrapped>(Channel.UNLIMITED)
private val closed = AtomicBoolean(false)
abstract class ChannelMessageQueue<T> : MessageQueue<T> {
val channel = Channel<T>(Channel.UNLIMITED)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
override suspend fun close() {
if (closed.compareAndSet(false, true)) {
channel.close()
}
channel.close()
}

override suspend fun isClosed(): Boolean = closed.get()
}

@Singleton
class DefaultQueueChannelFactory : QueueChannelFactory<DestinationRecordWrapped> {
override fun make(
messageQueue: MessageQueue<*, DestinationRecordWrapped>
): QueueChannel<DestinationRecordWrapped> = DefaultQueueChannel(messageQueue)
interface MessageQueueSupplier<K, T> {
fun get(key: K): MessageQueue<T>
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ package io.airbyte.cdk.message

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.state.CheckpointManager
import io.airbyte.cdk.state.MemoryManager
import io.airbyte.cdk.state.Reserved
import io.airbyte.cdk.state.SyncManager
import jakarta.inject.Singleton
import kotlinx.coroutines.runBlocking

/** A publishing interface for writing messages to a message queue. */
interface MessageQueueWriter<T : Any> {
Expand All @@ -28,12 +32,26 @@ interface MessageQueueWriter<T : Any> {
)
@Singleton
class DestinationMessageQueueWriter(
private val config: DestinationConfiguration,
private val catalog: DestinationCatalog,
private val messageQueue: MessageQueue<DestinationStream.Descriptor, DestinationRecordWrapped>,
private val queueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
private val syncManager: SyncManager,
private val checkpointManager:
CheckpointManager<DestinationStream.Descriptor, CheckpointMessage>
CheckpointManager<DestinationStream.Descriptor, CheckpointMessage>,
systemMemoryManager: MemoryManager
) : MessageQueueWriter<DestinationMessage> {
private val queueReservation = runBlocking {
systemMemoryManager.reserveRatio(config.maxMessageQueueMemoryUsageRatio, this)
}
private val memoryManager = queueReservation.getReservationManager()

private suspend fun reserve(sized: DestinationRecordWrapped) =
memoryManager.reserveBlocking(
(sized.sizeBytes * config.estimatedRecordMemoryOverheadRatio).toLong(),
sized
)

/**
* Deserialize and route the message to the appropriate channel.
*
Expand All @@ -53,14 +71,16 @@ class DestinationMessageQueueWriter(
sizeBytes = sizeBytes,
record = message
)
messageQueue.getChannel(message.stream).send(wrapped)
queueSupplier.get(message.stream).publish(reserve(wrapped))
}

/* If an end-of-stream marker. */
is DestinationStreamComplete,
is DestinationStreamIncomplete -> {
val wrapped = StreamCompleteWrapped(index = manager.markEndOfStream())
messageQueue.getChannel(message.stream).send(wrapped)
val queue = queueSupplier.get(message.stream)
queue.publish(memoryManager.reserveBlocking(0L, wrapped))
queue.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,31 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Releasable reservation of memory. For large blocks (ie, from [MemoryManager.reserveRatio],
* provides a submanager that can be used to manage allocating the reservation).
*/
class Reserved<T>(
private val memoryManager: MemoryManager,
val bytesReserved: Long,
val value: T,
) : CloseableCoroutine {
private var released = AtomicBoolean(false)

suspend fun release() {
if (!released.compareAndSet(false, true)) {
return
}
memoryManager.release(bytesReserved)
}

fun getReservationManager(): MemoryManager = MemoryManager(bytesReserved)

override suspend fun close() {
release()
}
}

/**
* Manages memory usage for the destination.
*
Expand All @@ -36,32 +61,11 @@ class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
private val mutex = Mutex()
private val syncChannel = Channel<Unit>(Channel.UNLIMITED)

/**
* Releasable reservation of memory. For large blocks (ie, from [reserveRatio], provides a
* submanager that can be used to manage allocating the reservation).
*/
inner class Reservation(val bytes: Long) : CloseableCoroutine {
private var released = AtomicBoolean(false)

suspend fun release() {
if (!released.compareAndSet(false, true)) {
return
}
release(bytes)
}

fun getReservationManager(): MemoryManager = MemoryManager(bytes)

override suspend fun close() {
release()
}
}

val remainingMemoryBytes: Long
get() = totalMemoryBytes - usedMemoryBytes.get()

/* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */
suspend fun reserveBlocking(memoryBytes: Long): Reservation {
suspend fun <T> reserveBlocking(memoryBytes: Long, reservedFor: T): Reserved<T> {
if (memoryBytes > totalMemoryBytes) {
throw IllegalArgumentException(
"Requested ${memoryBytes}b memory exceeds ${totalMemoryBytes}b total"
Expand All @@ -74,14 +78,13 @@ class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
}
usedMemoryBytes.addAndGet(memoryBytes)

return Reservation(memoryBytes)
return Reserved(this, memoryBytes, reservedFor)
}
}

suspend fun reserveRatio(ratio: Double): Reservation {
suspend fun <T> reserveRatio(ratio: Double, reservedFor: T): Reserved<T> {
val estimatedSize = (totalMemoryBytes.toDouble() * ratio).toLong()
reserveBlocking(estimatedSize)
return Reservation(estimatedSize)
return reserveBlocking(estimatedSize, reservedFor)
}

suspend fun release(memoryBytes: Long) {
Expand Down
Loading
Loading