Skip to content

Commit

Permalink
Merge branch 'master' into pnilan/airbyte-cdk/low-code-consolidate-de…
Browse files Browse the repository at this point in the history
…coders
  • Loading branch information
pnilan authored Oct 4, 2024
2 parents 1b55803 + 4c84f58 commit 75f41b3
Show file tree
Hide file tree
Showing 197 changed files with 15,548 additions and 4,626 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/publish-bulk-cdk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ jobs:
gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkBuild

- name: Integration test Bulk CDK
uses: burrunan/gradle-cache-action@v1
env:
CI: true
with:
job-id: bulk-cdk-publish
concurrent: true
gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkIntegrationTest

- name: Publish Poms and Jars to CloudRepo
uses: burrunan/gradle-cache-action@v1
env:
Expand Down
1 change: 1 addition & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output
airbyte-ci/connectors/pipelines/tests/test_changelog/result_files
airbyte-integrations/bases/connector-acceptance-test/unit_tests/data/docs
airbyte-integrations/connectors/destination-*/src/test-integration/resources/expected-spec*.json

# Ignore manifest files in manifest-only connectors
# This is done due to prettier being overly opinionated on the formatting of quotes
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/bulk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ allprojects {
}
}

tasks.register('bulkCdkIntegrationTest').configure {
// findByName returns the task, or null if no such task exists.
// we need this because not all submodules have an integrationTest task.
dependsOn allprojects.collect {it.tasks.findByName('integrationTest')}.findAll {it != null}
}

if (buildNumberFile.exists()) {
tasks.register('bulkCdkBuild').configure {
dependsOn allprojects.collect {it.tasks.named('build')}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<Property name="container-log-pattern">%d{yyyy-MM-dd'T'HH:mm:ss,SSS}{GMT+0}`%replace{%X{log_source}}{^ -}{} > %replace{%m}{$${env:LOG_SCRUB_PATTERN:-\*\*\*\*\*}}{*****}%n</Property>
<!-- Always log INFO by default. -->
<Property name="log-level">${sys:LOG_LEVEL:-${env:LOG_LEVEL:-INFO}}</Property>
<Property name="logDir">target/test-logs/${date:yyyy-MM-dd'T'HH:mm:ss}</Property>
<Property name="logDir">build/test-logs/${date:yyyy-MM-dd'T'HH:mm:ss}</Property>
</Properties>

<Appenders>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.read

import io.airbyte.cdk.command.SourceConfiguration
import io.micronaut.context.annotation.DefaultImplementation
import jakarta.inject.Inject
import jakarta.inject.Singleton
import kotlinx.coroutines.sync.Semaphore

Expand All @@ -19,12 +20,13 @@ fun interface Resource<T : Resource.Acquired> {
fun tryAcquire(): T?
}

@Singleton
/** A [Resource] used to manage concurrency. */
class ConcurrencyResource(val config: SourceConfiguration) :
Resource<ConcurrencyResource.AcquiredThread> {
@Singleton
class ConcurrencyResource(maxConcurrency: Int) : Resource<ConcurrencyResource.AcquiredThread> {

@Inject constructor(configuration: SourceConfiguration) : this(configuration.maxConcurrency)

private val semaphore = Semaphore(config.maxConcurrency)
private val semaphore = Semaphore(maxConcurrency)

val available: Int
get() = semaphore.availablePermits
Expand Down
28 changes: 28 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// simply declaring the source sets is sufficient to populate them with
// src/integrationTest/java+resources + src/integrationTest/kotlin.
sourceSets {
integrationTest {
}
}
kotlin {
sourceSets {
testIntegration {
}
}
}

dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation 'org.apache.commons:commons-lang3:3.17.0'
Expand All @@ -10,3 +23,18 @@ dependencies {
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
}

task integrationTest(type: Test) {
description = 'Runs the integration tests.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
useJUnitPlatform()
mustRunAfter tasks.check
}
configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
}
// These tests are lightweight enough to run on every PR.
rootProject.check.dependsOn(integrationTest)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.NoopNameMapper
import io.airbyte.cdk.test.write.BasicFunctionalityIntegrationTest

class MockBasicFunctionalityIntegrationTest :
BasicFunctionalityIntegrationTest(
MockDestinationSpecification(),
MockDestinationDataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
NoopNameMapper
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.test.util.DestinationDataDumper
import io.airbyte.cdk.test.util.OutputRecord
import java.util.concurrent.ConcurrentHashMap

object MockDestinationBackend {
private val files: MutableMap<String, MutableList<OutputRecord>> = ConcurrentHashMap()

fun insert(filename: String, vararg records: OutputRecord) {
getFile(filename).addAll(records)
}

fun readFile(filename: String): List<OutputRecord> {
return getFile(filename)
}

private fun getFile(filename: String): MutableList<OutputRecord> {
return files.getOrPut(filename) { mutableListOf() }
}
}

object MockDestinationDataDumper : DestinationDataDumper {
override fun dumpRecords(streamName: String, streamNamespace: String?): List<OutputRecord> {
return MockDestinationBackend.readFile(
MockStreamLoader.getFilename(streamNamespace, streamName)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.check.DestinationChecker
import javax.inject.Singleton

@Singleton
class MockDestinationChecker : DestinationChecker<MockDestinationConfiguration> {
override fun check(config: MockDestinationConfiguration) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationConfigurationFactory
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

class MockDestinationConfiguration : DestinationConfiguration()

@Singleton class MockDestinationSpecification : ConfigurationSpecification()

@Singleton
class MockDestinationConfigurationFactory :
DestinationConfigurationFactory<MockDestinationSpecification, MockDestinationConfiguration> {

override fun makeWithoutExceptionHandling(
pojo: MockDestinationSpecification
): MockDestinationConfiguration {
return MockDestinationConfiguration()
}
}

@Factory
class MockDestinationConfigurationProvider(private val config: DestinationConfiguration) {
@Singleton
fun get(): MockDestinationConfiguration {
return config as MockDestinationConfiguration
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.data.ObjectValue
import io.airbyte.cdk.message.Batch
import io.airbyte.cdk.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch
import io.airbyte.cdk.test.util.OutputRecord
import io.airbyte.cdk.write.DestinationWriter
import io.airbyte.cdk.write.StreamLoader
import java.time.Instant
import java.util.UUID
import javax.inject.Singleton

@Singleton
class MockDestinationWriter : DestinationWriter {
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return MockStreamLoader(stream)
}
}

class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
data class LocalBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.LOCAL
}
data class PersistedBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.PERSISTED
}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
): Batch {
return LocalBatch(records.asSequence().toList())
}

override suspend fun processBatch(batch: Batch): Batch {
return when (batch) {
is LocalBatch -> {
batch.records.forEach {
MockDestinationBackend.insert(
getFilename(it.stream),
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
Instant.ofEpochMilli(System.currentTimeMillis()),
stream.generationId,
it.data as ObjectValue,
OutputRecord.Meta(changes = it.meta?.changes, syncId = stream.syncId),
)
)
}
PersistedBatch(batch.records)
}
is PersistedBatch -> SimpleBatch(state = Batch.State.COMPLETE)
else -> throw IllegalStateException("Unexpected batch type: $batch")
}
}

companion object {
fun getFilename(stream: DestinationStream.Descriptor) =
getFilename(stream.namespace, stream.name)
fun getFilename(namespace: String?, name: String) = "(${namespace},${name})"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This is a minimal metadata.yaml that allows a destination connector to run.
# A real metadata.yaml obviously contains much more stuff, but we don't strictly
# need any of it at runtime.
data:
dockerRepository: "airbyte/fake-destination"
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ interface Batch {
COMPLETE
}

fun isPersisted(): Boolean =
when (state) {
State.PERSISTED,
State.COMPLETE -> true
else -> false
}

val state: State
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlinx.coroutines.flow.flow
* terminate when maxBytes has been read, or when the stream is complete.
*/
interface MessageQueueReader<K, T> {
suspend fun readChunk(key: K, maxBytes: Long): Flow<T>
suspend fun read(key: K): Flow<T>
}

@Singleton
Expand All @@ -26,32 +26,22 @@ class DestinationMessageQueueReader(
) : MessageQueueReader<DestinationStream.Descriptor, DestinationRecordWrapped> {
private val log = KotlinLogging.logger {}

override suspend fun readChunk(
key: DestinationStream.Descriptor,
maxBytes: Long
): Flow<DestinationRecordWrapped> = flow {
log.info { "Reading chunk of $maxBytes bytes from stream $key" }

var totalBytesRead = 0L
var recordsRead = 0L
while (totalBytesRead < maxBytes) {
when (val wrapped = messageQueue.getChannel(key).receive()) {
is StreamRecordWrapped -> {
totalBytesRead += wrapped.sizeBytes
emit(wrapped)
}
is StreamCompleteWrapped -> {
messageQueue.getChannel(key).close()
emit(wrapped)
log.info { "Read end-of-stream for $key" }
return@flow
override suspend fun read(key: DestinationStream.Descriptor): Flow<DestinationRecordWrapped> =
flow {
log.info { "Reading from stream $key" }

while (true) {
when (val wrapped = messageQueue.getChannel(key).receive()) {
is StreamRecordWrapped -> {
emit(wrapped)
}
is StreamCompleteWrapped -> {
messageQueue.getChannel(key).close()
emit(wrapped)
log.info { "Read end-of-stream for $key" }
return@flow
}
}
}
recordsRead++
}

log.info { "Read $recordsRead records (${totalBytesRead}b) from stream $key" }

return@flow
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DestinationMessageQueueWriter(
/* If an end-of-stream marker. */
is DestinationStreamComplete,
is DestinationStreamIncomplete -> {
val wrapped = StreamCompleteWrapped(index = manager.countEndOfStream())
val wrapped = StreamCompleteWrapped(index = manager.markEndOfStream())
messageQueue.getChannel(message.stream).send(wrapped)
}
}
Expand Down
Loading

0 comments on commit 75f41b3

Please sign in to comment.