Skip to content

Commit

Permalink
feat: implement field renaming mapper (#14204)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Oct 8, 2024
1 parent 4351c7f commit 44544ed
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 38 deletions.
1 change: 1 addition & 0 deletions airbyte-api/server-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11610,6 +11610,7 @@ components:
type: string
enum:
- hashing
- field-renaming
ConfiguredStreamMapper:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.commons.json.Jsons

object MapperOperationName {
const val HASHING = "hashing"
const val FIELD_RENAMING = "field-renaming"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ interface AirbyteRecord {

fun remove(fieldName: String)

fun rename(
oldFieldName: String,
newFieldName: String,
)

fun <T : Any> set(
fieldName: String,
value: T,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@ class JsonValueAdapter(private val node: JsonNode) : Value {
}

data class AirbyteJsonRecordAdapter(private val message: AirbyteMessage) : AirbyteRecord {
override val asProtocol: AirbyteMessage
get() = message
override val streamDescriptor: StreamDescriptor = StreamDescriptor().withNamespace(message.record.namespace).withName(message.record.stream)
override val asProtocol: AirbyteMessage = message
override val streamDescriptor: StreamDescriptor =
StreamDescriptor()
.withNamespace(message.record.namespace)
.withName(message.record.stream)
private val data: ObjectNode = message.record.data as ObjectNode

override fun has(fieldName: String): Boolean = data.has(fieldName)

override fun get(fieldName: String): Value = JsonValueAdapter(data.get(fieldName))
override fun get(fieldName: String): Value = JsonValueAdapter(data[fieldName])

override fun remove(fieldName: String) {
data.remove(fieldName)
}

override fun rename(
oldFieldName: String,
newFieldName: String,
) {
data.set<JsonNode>(newFieldName, data[oldFieldName])
data.remove(oldFieldName)
}

override fun <T : Any> set(
fieldName: String,
value: T,
Expand All @@ -51,16 +61,11 @@ data class AirbyteJsonRecordAdapter(private val message: AirbyteMessage) : Airby
.withField(fieldName)
.withReason(reason.toProtocol())

// handling all the cascading layers of potential null objects
// very thread-unsafe
if (message.record != null) {
if (message.record.meta == null) {
message.record.withMeta(AirbyteRecordMessageMeta().withChanges(mutableListOf()))
}
if (message.record.meta.changes == null) {
message.record.meta.withChanges(mutableListOf())
}
message.record.meta.changes.add(metaChange)
// Ensure thread-safe modification of shared mutable state
synchronized(message.record) {
val meta = message.record.meta ?: AirbyteRecordMessageMeta().also { message.record.withMeta(it) }
val changes = meta.changes ?: mutableListOf<AirbyteRecordMessageMetaChange>().also { meta.withChanges(it) }
changes.add(metaChange)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package io.airbyte.mappers.helpers

import com.google.common.base.Preconditions
import io.airbyte.config.ConfiguredAirbyteCatalog
import io.airbyte.config.ConfiguredAirbyteStream
import io.airbyte.config.ConfiguredMapper
import io.airbyte.config.Field
import io.airbyte.config.MapperOperationName
import io.airbyte.mappers.transformations.FieldRenamingMapper.Companion.NEW_FIELD_NAME
import io.airbyte.mappers.transformations.FieldRenamingMapper.Companion.ORIGINAL_FIELD_NAME
import io.airbyte.mappers.transformations.HashingMapper

internal const val DEFAULT_HASHING_METHOD = HashingMapper.SHA256
Expand Down Expand Up @@ -36,33 +39,91 @@ fun getHashedFieldName(mapper: ConfiguredMapper): String {

/**
* Validates that the configured mappers in a configured catalog are valid.
* For now this only checks that the hashing mapper is correctly configured, but we should move to using mapper specs in the future.
*/
fun validateConfiguredMappers(configuredCatalog: ConfiguredAirbyteCatalog) {
for (configuredStream in configuredCatalog.streams) {
val fields = configuredStream.fields
for (mapper in configuredStream.mappers) {
Preconditions.checkArgument(MapperOperationName.HASHING == mapper.name, "Mapping operation %s is not supported.", mapper.name)
Preconditions.checkNotNull(fields, "Fields must be set in order to use mappers.")
validateConfiguredStream(configuredStream)
}
}

val mappedField = getHashedFieldName(mapper)
Preconditions.checkArgument(
fields!!.stream().anyMatch { f: Field -> f.name == mappedField },
"Hashed field %s not found in stream %s.",
mappedField,
configuredStream.stream.name,
)
private fun validateConfiguredStream(configuredStream: ConfiguredAirbyteStream) {
val fields = configuredStream.fields
Preconditions.checkNotNull(fields, "Fields must be set in order to use mappers.")

for (mapper in configuredStream.mappers) {
validateMapper(configuredStream, mapper, fields!!)
}
}

val mappedFieldName =
getHashedFieldName(
mapper,
) + mapper.config.getOrDefault(HashingMapper.FIELD_NAME_SUFFIX_CONFIG_KEY, DEFAULT_HASHING_SUFFIX)
private fun validateMapper(
configuredStream: ConfiguredAirbyteStream,
mapper: ConfiguredMapper,
fields: List<Field>,
) {
when (mapper.name) {
MapperOperationName.HASHING -> validateHashingMapper(configuredStream, mapper, fields)
MapperOperationName.FIELD_RENAMING -> validateFieldRenamingMapper(configuredStream, mapper, fields)
else ->
Preconditions.checkArgument(
fields.stream().noneMatch { f: Field -> f.name == mappedFieldName },
"Hashed field %s already exists in stream %s.",
mappedFieldName,
configuredStream.stream.name,
false,
"Mapping operation %s is not supported.",
mapper.name,
)
}
}
}

private fun validateHashingMapper(
configuredStream: ConfiguredAirbyteStream,
mapper: ConfiguredMapper,
fields: List<Field>,
) {
val mappedField = getHashedFieldName(mapper)

Preconditions.checkArgument(
fields.any { it.name == mappedField },
"Hashed field '%s' not found in stream '%s'.",
mappedField,
configuredStream.stream.name,
)

val suffix =
mapper.config.getOrDefault(
HashingMapper.FIELD_NAME_SUFFIX_CONFIG_KEY,
DEFAULT_HASHING_SUFFIX,
)
val mappedFieldName = "$mappedField$suffix"

Preconditions.checkArgument(
fields.none { it.name == mappedFieldName },
"Hashed field '%s' already exists in stream '%s'.",
mappedFieldName,
configuredStream.stream.name,
)
}

private fun validateFieldRenamingMapper(
configuredStream: ConfiguredAirbyteStream,
mapper: ConfiguredMapper,
fields: List<Field>,
) {
val originalFieldName =
mapper.config[ORIGINAL_FIELD_NAME]
?: throw IllegalArgumentException("Config missing required key: originalFieldName")
val newFieldName =
mapper.config[NEW_FIELD_NAME]
?: throw IllegalArgumentException("Config missing required key: newFieldName")

Preconditions.checkArgument(
fields.any { it.name == originalFieldName },
"Original field '%s' not found in stream '%s'.",
originalFieldName,
configuredStream.stream.name,
)

Preconditions.checkArgument(
fields.none { it.name == newFieldName },
"New field '%s' already exists in stream '%s'.",
newFieldName,
configuredStream.stream.name,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DestinationCatalogGenerator(
}
}

internal fun applyCatalogMapperTransformations(stream: ConfiguredAirbyteStream): Map<ConfiguredMapper, MapperError> {
private fun applyCatalogMapperTransformations(stream: ConfiguredAirbyteStream): Map<ConfiguredMapper, MapperError> {
val (updateFields, _, errors) = applyMapperToFields(stream)

val jsonSchema =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.airbyte.mappers.transformations

import io.airbyte.config.ConfiguredMapper
import io.airbyte.config.MapperOperationName
import io.airbyte.config.MapperSpecification
import io.airbyte.config.MapperSpecificationFieldString
import io.airbyte.config.adapters.AirbyteRecord
import jakarta.inject.Named
import jakarta.inject.Singleton

@Singleton
@Named("FieldRenamingMapper")
class FieldRenamingMapper : Mapper {
companion object {
const val ORIGINAL_FIELD_NAME = "originalFieldName"
const val NEW_FIELD_NAME = "newFieldName"
}

override val name: String
get() = MapperOperationName.FIELD_RENAMING

override fun spec(): MapperSpecification {
return MapperSpecification(
name = name,
documentationUrl = "",
config =
mapOf(
ORIGINAL_FIELD_NAME to
MapperSpecificationFieldString(
title = "Original Field Name",
description = "The current name of the field to rename.",
),
NEW_FIELD_NAME to
MapperSpecificationFieldString(
title = "New Field Name",
description = "The new name for the field after renaming.",
),
),
)
}

override fun schema(
config: ConfiguredMapper,
slimStream: SlimStream,
): SlimStream {
val (originalFieldName, newFieldName) = getConfigValues(config.config)

return slimStream
.deepCopy()
.apply { redefineField(originalFieldName, newFieldName) }
}

private fun getConfigValues(config: Map<String, String>): FieldRenamingConfig {
val originalFieldName =
config[ORIGINAL_FIELD_NAME]
?: throw IllegalArgumentException("Config missing required key: $ORIGINAL_FIELD_NAME")
val newFieldName =
config[NEW_FIELD_NAME]
?: throw IllegalArgumentException("Config missing required key: $NEW_FIELD_NAME")
return FieldRenamingConfig(originalFieldName, newFieldName)
}

override fun map(
config: ConfiguredMapper,
record: AirbyteRecord,
) {
val (originalFieldName, newFieldName) = getConfigValues(config.config)

if (record.has(originalFieldName)) {
try {
record.rename(originalFieldName, newFieldName)
} catch (e: Exception) {
record.trackFieldError(
newFieldName,
AirbyteRecord.Change.NULLED,
AirbyteRecord.Reason.PLATFORM_SERIALIZATION_ERROR,
)
}
}
}

data class FieldRenamingConfig(
val originalFieldName: String,
val newFieldName: String,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ class TestRecordAdapter(override val streamDescriptor: StreamDescriptor, data: M
data.remove(fieldName)
}

override fun rename(
oldFieldName: String,
newFieldName: String,
) {
data[newFieldName] = data[oldFieldName] as Any
data.remove(oldFieldName)
}

override fun <T : Any> set(
fieldName: String,
value: T,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MapperHelperTest {
assertThrows<IllegalArgumentException> {
validateConfiguredMappers(catalog)
}
Assertions.assertEquals("Hashed field missing_field not found in stream my_stream.", exception.message)
Assertions.assertEquals("Hashed field 'missing_field' not found in stream 'my_stream'.", exception.message)
}

@Test
Expand All @@ -116,7 +116,7 @@ class MapperHelperTest {
assertThrows<IllegalArgumentException> {
validateConfiguredMappers(catalog)
}
Assertions.assertEquals("Hashed field my_field_hashed already exists in stream my_stream.", exception.message)
Assertions.assertEquals("Hashed field 'my_field_hashed' already exists in stream 'my_stream'.", exception.message)
}

@Test
Expand Down
Loading

0 comments on commit 44544ed

Please sign in to comment.