Skip to content

Commit

Permalink
Bulk load cdk: split the destination process stuff into multiple file…
Browse files Browse the repository at this point in the history
…s in their own package (#46362)
  • Loading branch information
edgao authored Oct 4, 2024
1 parent 679a5c4 commit 4c84f58
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.airbyte.cdk.test.util.FakeDataDumper
import io.airbyte.cdk.test.util.IntegrationTest
import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.TestDeploymentMode
import io.airbyte.cdk.test.util.destination_process.TestDeploymentMode
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.charset.StandardCharsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import com.deblock.jsondiff.matcher.LenientJsonObjectPartialMatcher
import com.deblock.jsondiff.matcher.StrictJsonArrayPartialMatcher
import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher
import com.deblock.jsondiff.viewer.OnlyErrorDiffViewer
import io.airbyte.cdk.test.util.DestinationProcessFactory
import io.airbyte.cdk.test.util.FakeDataDumper
import io.airbyte.cdk.test.util.IntegrationTest
import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.TestDeploymentMode
import io.airbyte.cdk.test.util.destination_process.DestinationProcessFactory
import io.airbyte.cdk.test.util.destination_process.TestDeploymentMode
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.file.Files
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.message.DestinationMessage
import io.airbyte.cdk.test.util.destination_process.DestinationProcessFactory
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util.destination_process

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog

/**
* Represents a destination process, whether running in-JVM via micronaut, or as a separate Docker
* container. The general lifecycle is:
* 1. `val dest = DestinationProcessFactory.createDestinationProcess(...)`
* 2. `launch { dest.run() }`
* 3. [sendMessage] as many times as you want
* 4. [readMessages] as needed (e.g. to check that state messages are emitted during the sync)
* 5. [shutdown] once you have no more messages to send to the destination
*/
interface DestinationProcess {
/**
* Run the destination process. Callers who want to interact with the destination should
* `launch` this method.
*/
fun run()

fun sendMessage(message: AirbyteMessage)

/** Return all messages the destination emitted since the last call to [readMessages]. */
fun readMessages(): List<AirbyteMessage>

/**
* Wait for the destination to terminate, then return all messages it emitted since the last
* call to [readMessages].
*/
fun shutdown()
}

enum class TestDeploymentMode {
CLOUD,
OSS
}

interface DestinationProcessFactory {
fun createDestinationProcess(
command: String,
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
deploymentMode: TestDeploymentMode = TestDeploymentMode.OSS,
): DestinationProcess
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util.destination_process

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import java.io.ByteArrayOutputStream
import java.io.InputStream

// TODO define a factory for this class + @Require(env = CI_master_merge)
class DockerizedDestination(
val command: String,
val config: JsonNode?,
val catalog: ConfiguredAirbyteCatalog?,
) : DestinationProcess {
override fun run() {
TODO("launch a docker container")
}

override fun sendMessage(message: AirbyteMessage) {
// push a message to the docker process' stdin
TODO("Not yet implemented")
}

override fun readMessages(): List<AirbyteMessage> {
// read everything from the process' stdout
TODO("Not yet implemented")
}

override fun shutdown() {
// close stdin, wait until process exits
TODO("Not yet implemented")
}
}

// This is currently unused, but we'll need it for the Docker version.
// it exists right now b/c I wrote it prior to the CliRunner retooling.
/**
* There doesn't seem to be a built-in equivalent to this? Scanner and BufferedReader both have
* `hasNextLine` methods which block until the stream has data to read, which we don't want to do.
*
* This class simply buffers the next line in-memory until it reaches a newline or EOF.
*/
private class LazyInputStreamReader(private val input: InputStream) {
private val buffer: ByteArrayOutputStream = ByteArrayOutputStream()
private var eof = false

/**
* Returns the next line of data, or null if no line is available. Doesn't block if the
* inputstream has no data.
*/
fun nextLine(): MaybeLine {
if (eof) {
return NoLine.EOF
}
while (input.available() != 0) {
when (val read = input.read()) {
-1 -> {
eof = true
val line = Line(buffer.toByteArray().toString(Charsets.UTF_8))
buffer.reset()
return line
}
'\n'.code -> {
val bytes = buffer.toByteArray()
buffer.reset()
return Line(bytes.toString(Charsets.UTF_8))
}
else -> {
buffer.write(read)
}
}
}
return NoLine.NOT_YET_AVAILABLE
}

companion object {
interface MaybeLine
enum class NoLine : MaybeLine {
EOF,
NOT_YET_AVAILABLE
}
data class Line(val line: String) : MaybeLine
}
}
Loading

0 comments on commit 4c84f58

Please sign in to comment.