diff --git a/.github/workflows/dev-release-docker-publish.yml b/.github/workflows/dev-release-docker-publish.yml deleted file mode 100644 index a0bbd32..0000000 --- a/.github/workflows/dev-release-docker-publish.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: Build and publish Docker distributions to Github Container Registry ghcr.io - -on: - workflow_dispatch: - push: - branches: - - dev-version-* - paths: - - gradle.properties - -jobs: - build-job: - uses: th2-net/.github/.github/workflows/compound-java.yml@main - with: - build-target: 'Docker' - docker-username: ${{ github.actor }} - devRelease: true - secrets: - docker-password: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/build.gradle b/build.gradle index 362718a..8eba1ea 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,7 @@ apply plugin: 'kotlin-kapt' dependencies { api platform("com.exactpro.th2:bom:4.5.0") implementation "com.exactpro.th2:common:5.4.0-dev" - implementation "com.exactpro.th2:codec:5.3.0-new-proto-+" + implementation "com.exactpro.th2:codec:5.3.0-new-proto-6072683424-03077b6-SNAPSHOT" implementation "com.exactpro.th2:common-utils:2.2.0-dev" implementation "org.apache.commons:commons-lang3" @@ -29,10 +29,4 @@ dependencies { compileOnly "com.google.auto.service:auto-service-annotations:1.1.1" annotationProcessor "com.google.auto.service:auto-service:1.1.1" kapt "com.google.auto.service:auto-service:1.1.1" -} - -configurations { - compileClasspath { - resolutionStrategy.activateDependencyLocking() - } } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index f40f5e8..1f2e9e1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,2 @@ release_version=5.1.0 -app_main_class=com.exactpro.th2.codec.MainKt -docker_image_name= \ No newline at end of file +app_main_class=com.exactpro.th2.codec.MainKt \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java index 477b162..29fc44c 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java +++ b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java @@ -32,14 +32,7 @@ @AutoService(IPipelineCodecFactory.class) public class CodecFactory implements IPipelineCodecFactory { - public static final String PROTOCOL = "csv"; - private static final Set PROTOCOLS = Collections.singleton(PROTOCOL); - - @NotNull - @Override - public String getProtocol() { - return PROTOCOL; - } + private static final Set PROTOCOLS = Collections.singleton("csv"); @NotNull @Override diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt new file mode 100644 index 0000000..6650354 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt @@ -0,0 +1,199 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.csvreader.CsvReader +import com.exactpro.th2.codec.DecodeException +import mu.KotlinLogging +import java.io.ByteArrayInputStream +import java.io.IOException +import java.nio.charset.Charset +import kotlin.math.min + +abstract class AbstractDecoder( + private val charset: Charset, + private val csvDelimiter: Char, + private val defaultHeader: Array?, + private val publishHeader: Boolean, + private val validateLength: Boolean +) { + protected abstract val RAW_MESSAGE.messageMetadata: Map + protected abstract val RAW_MESSAGE.messageSessionAlias: String + protected abstract val RAW_MESSAGE.rawBody: ByteArray + protected abstract val RAW_MESSAGE.messageProtocol: String + protected abstract val RAW_MESSAGE.logId: String + protected abstract val RAW_MESSAGE.logData: String + protected abstract val ANY_MESSAGE.isParsed: Boolean + protected abstract val ANY_MESSAGE.isRaw: Boolean + protected abstract val ANY_MESSAGE.asRaw: RAW_MESSAGE + + protected abstract fun createParsedMessage(sourceMessage: RAW_MESSAGE, outputMessageType: String, body: Map, currentIndex: Int): ANY_MESSAGE + protected abstract fun String.toFieldValue(): BODY_FIELD_VALUE + protected abstract fun Array.toFieldValue(): BODY_FIELD_VALUE + + fun decode(messageGroup: List): List { + val groupBuilder = mutableListOf() // ProtoMessageGroup.newBuilder() + val errors: MutableCollection> = mutableListOf() + for (anyMessage in messageGroup) { + if (anyMessage.isParsed) { + groupBuilder += anyMessage + continue + } + if (!anyMessage.isRaw) { + LOGGER.error { "Message should either have a raw or parsed message but has nothing: $anyMessage" } + continue + } + val rawMessage = anyMessage.asRaw + val protocol = rawMessage.messageProtocol + if ("" != protocol && !"csv".equals(protocol, ignoreCase = true)) { + LOGGER.error { "Wrong protocol: message should have empty or 'csv' protocol but has $protocol" } + continue + } + val data = decodeValues(rawMessage.rawBody) + if (data.isEmpty()) { + LOGGER.error { "The raw message does not contains any data: ${rawMessage.logData}" } + errors.add(ErrorHolder("The raw message does not contains any data", rawMessage)) + continue + } + + decodeCsvData(errors, groupBuilder, rawMessage, data) + } + if (errors.isNotEmpty()) { + throw DecodeException( + "Cannot decode some messages:\n" + errors.joinToString("\n") { + "Message ${it.originalMessage.logId} cannot be decoded because ${it.text}" + } + ) + } + return groupBuilder + } + + private fun decodeCsvData( + errors: MutableCollection>, + groupBuilder: MutableList, + rawMessage: RAW_MESSAGE, + data: Iterable> + ) { + val originalMetadata = rawMessage.messageMetadata + val outputMessageType = originalMetadata.getOrDefault( + OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, + CSV_MESSAGE_TYPE + ) + var currentIndex = 0 + var header: Array? = defaultHeader + for (strings in data) { + currentIndex++ + if (strings.isEmpty()) { + LOGGER.error { "Empty raw at $currentIndex index (starts with 1). Data: $data" } + + errors.add(ErrorHolder("Empty raw at $currentIndex index (starts with 1)", rawMessage)) + continue + } + trimEachElement(strings) + if (header == null) { + LOGGER.debug { "Set header to: ${strings.contentToString()}" } + header = strings + if (publishHeader) { + //groupBuilder += createHeadersMessage(rawMessage, strings, currentIndex) + groupBuilder += createParsedMessage(rawMessage, + HEADER_MSG_TYPE, mapOf(HEADER_FIELD_NAME to strings.toFieldValue()), currentIndex) + } + continue + } + if (strings.size != header.size && validateLength) { + val msg = String.format( + "Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", + header.size, strings.size, rawMessage.messageSessionAlias + ) + LOGGER.error(msg) + LOGGER.debug { rawMessage.toString() } + errors.add(ErrorHolder(msg, rawMessage, strings)) + } + + val headerLength = header.size + val rowLength = strings.size + var i = 0 + val body = mutableMapOf() + while (i < headerLength && i < rowLength) { + val extraLength = getHeaderArrayLength(header, i) + if (extraLength == 1) { + body[header[i]] = strings[i].toFieldValue() + i++ + } else { + val values = copyArray(strings, i, i + extraLength) + body[header[i]] = values.toFieldValue() + i += extraLength + } + } + + groupBuilder += createParsedMessage(rawMessage, outputMessageType, body, currentIndex) + } + } + + private fun copyArray(original: Array, from: Int, to: Int) = original.copyOfRange(from, + min(to, original.size) + ) + + private fun getHeaderArrayLength(header: Array, index: Int): Int { + var length = 1 + var i = index + 1 + while (i < header.size && header[i].isEmpty()) { + length++ + i++ + } + return length + } + + private fun decodeValues(body: ByteArray): List> { + try { + ByteArrayInputStream(body).use { + val reader = CsvReader(it, csvDelimiter, charset) + return try { + val result: MutableList> = ArrayList() + while (reader.readRecord()) { + result.add(reader.values) + } + result + } finally { + reader.close() + } + } + } catch (e: IOException) { + throw RuntimeException("cannot read data from raw bytes", e) + } + } + + private class ErrorHolder( + val text: String, + val originalMessage: T, + val currentRow: Array = emptyArray() + ) + + private fun trimEachElement(elements: Array) { + for (i in elements.indices) { + elements[i] = elements[i].trim() + } + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + private const val HEADER_MSG_TYPE = "Csv_Header" + private const val CSV_MESSAGE_TYPE = "Csv_Message" + private const val HEADER_FIELD_NAME = "Header" + private const val OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt index 57f1de9..6b5a48c 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt @@ -16,32 +16,13 @@ package com.exactpro.th2.codec.csv -import com.csvreader.CsvReader -import com.exactpro.th2.codec.DecodeException import com.exactpro.th2.codec.api.IPipelineCodec import com.exactpro.th2.codec.api.IReportingContext import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration -import com.exactpro.th2.common.grpc.AnyMessage as ProtoAnyMessage -import com.exactpro.th2.common.grpc.Value as ProtoValue import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup -import com.exactpro.th2.common.grpc.Message as ProtoMessage -import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage -import com.exactpro.th2.common.grpc.RawMessage as ProtoParsedMessage -import com.exactpro.th2.common.grpc.MessageMetadata as ProtoMessageMetadata -import com.exactpro.th2.common.grpc.RawMessageMetadata as ProtoRawMessageMetadata -import com.exactpro.th2.common.grpc.MessageID as ProtoMessageID -import com.exactpro.th2.common.message.toJson -import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.* -import com.exactpro.th2.common.utils.message.id -import com.exactpro.th2.common.utils.message.logId -import com.exactpro.th2.common.utils.message.sessionAlias -import com.exactpro.th2.common.utils.message.transport.logId -import com.exactpro.th2.common.value.toValue +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup import mu.KotlinLogging -import java.io.ByteArrayInputStream -import java.io.IOException import java.nio.charset.Charset -import kotlin.math.min class CsvCodec(private val config: CsvCodecConfiguration) : IPipelineCodec { @@ -54,275 +35,17 @@ class CsvCodec(private val config: CsvCodecConfiguration) : IPipelineCodec { private val protoDecoder = ProtoDecoder(charset, config.delimiter, defaultHeader, config.isPublishHeader, config.validateLength) private val transportDecoder = TransportDecoder(charset, config.delimiter, defaultHeader, config.isPublishHeader, config.validateLength) - override fun decode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup = decode(messageGroup) - override fun decode(messageGroup: ProtoMessageGroup): ProtoMessageGroup { + override fun decode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup { val decodedMessages = protoDecoder.decode(messageGroup.messagesList) return ProtoMessageGroup.newBuilder().addAllMessages(decodedMessages).build() } - override fun decode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup = decode(messageGroup) - override fun decode(messageGroup: MessageGroup): MessageGroup { + override fun decode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup { val decodedMessages = transportDecoder.decode(messageGroup.messages) return MessageGroup(decodedMessages) } - private abstract class Decoder( - private val charset: Charset, - private val csvDelimiter: Char, - private val defaultHeader: Array?, - private val publishHeader: Boolean, - private val validateLength: Boolean - ) { - protected abstract val RAW_MESSAGE.messageMetadata: Map - protected abstract val RAW_MESSAGE.messageSessionAlias: String - protected abstract val RAW_MESSAGE.rawBody: ByteArray - protected abstract val RAW_MESSAGE.messageProtocol: String - protected abstract val RAW_MESSAGE.logId: String - protected abstract val RAW_MESSAGE.logData: String - protected abstract val ANY_MESSAGE.isParsed: Boolean - protected abstract val ANY_MESSAGE.isRaw: Boolean - protected abstract val ANY_MESSAGE.asRaw: RAW_MESSAGE - - protected abstract fun createParsedMessage(sourceMessage: RAW_MESSAGE, outputMessageType: String, body: Map, currentIndex: Int): ANY_MESSAGE - protected abstract fun String.toFieldValue(): BODY_FIELD_VALUE - protected abstract fun Array.toFieldValue(): BODY_FIELD_VALUE - - fun decode(messageGroup: List): List { - val groupBuilder = mutableListOf() // ProtoMessageGroup.newBuilder() - val errors: MutableCollection> = mutableListOf() - for (anyMessage in messageGroup) { - if (anyMessage.isParsed) { - groupBuilder += anyMessage - continue - } - if (!anyMessage.isRaw) { - LOGGER.error { "Message should either have a raw or parsed message but has nothing: $anyMessage" } - continue - } - val rawMessage = anyMessage.asRaw - val protocol = rawMessage.messageProtocol - if ("" != protocol && !"csv".equals(protocol, ignoreCase = true)) { - LOGGER.error { "Wrong protocol: message should have empty or 'csv' protocol but has $protocol" } - continue - } - val data = decodeValues(rawMessage.rawBody) - if (data.isEmpty()) { - LOGGER.error { "The raw message does not contains any data: ${rawMessage.logData}" } - errors.add(ErrorHolder("The raw message does not contains any data", rawMessage)) - continue - } - - decodeCsvData(errors, groupBuilder, rawMessage, data) - } - if (errors.isNotEmpty()) { - throw DecodeException( - "Cannot decode some messages:\n" + errors.joinToString("\n") { - "Message ${it.originalMessage.logId} cannot be decoded because ${it.text}" - } - ) - } - return groupBuilder - } - - private fun decodeCsvData( - errors: MutableCollection>, - groupBuilder: MutableList, - rawMessage: RAW_MESSAGE, - data: Iterable> - ) { - val originalMetadata = rawMessage.messageMetadata - val outputMessageType = originalMetadata.getOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE) - var currentIndex = 0 - var header: Array? = defaultHeader - for (strings in data) { - currentIndex++ - if (strings.isEmpty()) { - LOGGER.error { "Empty raw at $currentIndex index (starts with 1). Data: $data" } - - errors.add(ErrorHolder("Empty raw at $currentIndex index (starts with 1)", rawMessage)) - continue - } - trimEachElement(strings) - if (header == null) { - LOGGER.debug { "Set header to: ${strings.contentToString()}" } - header = strings - if (publishHeader) { - //groupBuilder += createHeadersMessage(rawMessage, strings, currentIndex) - groupBuilder += createParsedMessage(rawMessage, HEADER_MSG_TYPE, mapOf(HEADER_FIELD_NAME to strings.toFieldValue()), currentIndex) - } - continue - } - if (strings.size != header.size && validateLength) { - val msg = String.format( - "Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", - header.size, strings.size, rawMessage.messageSessionAlias - ) - LOGGER.error(msg) - LOGGER.debug { rawMessage.toString() } - errors.add(ErrorHolder(msg, rawMessage, strings)) - } - - val headerLength = header.size - val rowLength = strings.size - var i = 0 - val body = mutableMapOf() - while (i < headerLength && i < rowLength) { - val extraLength = getHeaderArrayLength(header, i) - if (extraLength == 1) { - body[header[i]] = strings[i].toFieldValue() - i++ - } else { - val values = copyArray(strings, i, i + extraLength) - body[header[i]] = values.toFieldValue() - i += extraLength - } - } - - groupBuilder += createParsedMessage(rawMessage, outputMessageType, body, currentIndex) - } - } - - private fun copyArray(original: Array, from: Int, to: Int) = original.copyOfRange(from, min(to, original.size)) - - private fun getHeaderArrayLength(header: Array, index: Int): Int { - var length = 1 - var i = index + 1 - while (i < header.size && header[i].isEmpty()) { - length++ - i++ - } - return length - } - - private fun decodeValues(body: ByteArray): List> { - try { - ByteArrayInputStream(body).use { - val reader = CsvReader(it, csvDelimiter, charset) - return try { - val result: MutableList> = ArrayList() - while (reader.readRecord()) { - result.add(reader.values) - } - result - } finally { - reader.close() - } - } - } catch (e: IOException) { - throw RuntimeException("cannot read data from raw bytes", e) - } - } - - private class ErrorHolder( - val text: String, - val originalMessage: T, - val currentRow: Array = emptyArray() - ) - - private fun trimEachElement(elements: Array) { - for (i in elements.indices) { - elements[i] = elements[i].trim() - } - } - - } - - private class ProtoDecoder( - charset: Charset, - csvDelimiter: Char, - defaultHeader: Array?, - publishHeader: Boolean, - validateLength: Boolean - ) : Decoder(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { - - override val ProtoRawMessage.messageMetadata: Map get() = metadata.propertiesMap - override val ProtoRawMessage.messageSessionAlias: String get() = sessionAlias ?: error("No session alias in message") - - override val ProtoRawMessage.rawBody: ByteArray get() = body.toByteArray() - override val ProtoRawMessage.messageProtocol: String get() = metadata.protocol - override val ProtoRawMessage.logId: String get() = this.id.logId - override val com.exactpro.th2.common.grpc.RawMessage.logData: String get() = toJson() - override val ProtoAnyMessage.isParsed: Boolean get() = hasMessage() - override val ProtoAnyMessage.isRaw: Boolean get() = hasRawMessage() - override val ProtoAnyMessage.asRaw: ProtoRawMessage get() = rawMessage - - override fun createParsedMessage(sourceMessage: ProtoRawMessage, outputMessageType: String, body: Map, currentIndex: Int): ProtoAnyMessage { - val builder = ProtoMessage.newBuilder() - .putAllFields(body) - .setParentEventId(sourceMessage.parentEventId) - - // Not set message type - setMetadata(sourceMessage.metadata, builder, outputMessageType, currentIndex) - return ProtoAnyMessage.newBuilder().setMessage(builder).build() - } - - override fun String.toFieldValue(): ProtoValue = toValue() - override fun Array.toFieldValue(): ProtoValue = toValue() - - private fun setMetadata( - originalMetadata: ProtoRawMessageMetadata, - messageBuilder: ProtoMessage.Builder, - messageType: String, - currentIndex: Int - ) { - messageBuilder.setMetadata( - ProtoMessageMetadata - .newBuilder() - .setId( - ProtoMessageID.newBuilder(originalMetadata.id) - .setTimestamp(originalMetadata.id.timestamp) - .addSubsequence(currentIndex) - .build() - ) - .putAllProperties(originalMetadata.propertiesMap) - .setMessageType(messageType) - ) - } - } - - private class TransportDecoder( - charset: Charset, - csvDelimiter: Char, - defaultHeader: Array?, - publishHeader: Boolean, - validateLength: Boolean - ) : Decoder, RawMessage, ParsedMessage, Any>(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { - - override val RawMessage.messageMetadata: Map get() = this.metadata - override val RawMessage.messageSessionAlias: String get() = id.sessionAlias - - override val RawMessage.rawBody: ByteArray get() = body.toByteArray() - override val RawMessage.messageProtocol: String get() = protocol - override val RawMessage.logId: String get() = id.logId - override val RawMessage.logData: String get() = this.toString() - override val Message<*>.isParsed: Boolean get() = this is ParsedMessage - override val Message<*>.isRaw: Boolean get() = this is RawMessage - override val Message<*>.asRaw: RawMessage get() = this as RawMessage - - override fun createParsedMessage( - sourceMessage: RawMessage, - outputMessageType: String, - body: Map, - currentIndex: Int - ): Message<*> { - return ParsedMessage( - id = sourceMessage.id.toBuilder().addSubsequence(currentIndex).build(), - eventId = sourceMessage.eventId, - type = outputMessageType, - metadata = sourceMessage.metadata, - body = body - ) - } - - override fun String.toFieldValue(): String = this - override fun Array.toFieldValue(): Any = this - } - companion object { private val LOGGER = KotlinLogging.logger {} - private const val HEADER_MSG_TYPE = "Csv_Header" - private const val CSV_MESSAGE_TYPE = "Csv_Message" - private const val HEADER_FIELD_NAME = "Header" - private const val OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type" } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt new file mode 100644 index 0000000..1588289 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.common.grpc.Message +import com.exactpro.th2.common.grpc.AnyMessage +import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.grpc.MessageMetadata +import com.exactpro.th2.common.grpc.RawMessageMetadata +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.common.grpc.Value +import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.utils.message.id +import com.exactpro.th2.common.utils.message.logId +import com.exactpro.th2.common.utils.message.sessionAlias +import com.exactpro.th2.common.value.toValue +import java.nio.charset.Charset + +class ProtoDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean +) : AbstractDecoder(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { + + override val RawMessage.messageMetadata: Map get() = metadata.propertiesMap + override val RawMessage.messageSessionAlias: String get() = sessionAlias ?: error("No session alias in message") + + override val RawMessage.rawBody: ByteArray get() = body.toByteArray() + override val RawMessage.messageProtocol: String get() = metadata.protocol + override val RawMessage.logId: String get() = this.id.logId + override val RawMessage.logData: String get() = toJson() + override val AnyMessage.isParsed: Boolean get() = hasMessage() + override val AnyMessage.isRaw: Boolean get() = hasRawMessage() + override val AnyMessage.asRaw: RawMessage get() = rawMessage + + override fun createParsedMessage(sourceMessage: RawMessage, outputMessageType: String, body: Map, currentIndex: Int): AnyMessage { + val builder = Message.newBuilder() + .putAllFields(body) + .setParentEventId(sourceMessage.parentEventId) + + // Not set message type + setMetadata(sourceMessage.metadata, builder, outputMessageType, currentIndex) + return AnyMessage.newBuilder().setMessage(builder).build() + } + + override fun String.toFieldValue(): Value = toValue() + override fun Array.toFieldValue(): Value = toValue() + + private fun setMetadata( + originalMetadata: RawMessageMetadata, + messageBuilder: Message.Builder, + messageType: String, + currentIndex: Int + ) { + messageBuilder.setMetadata( + MessageMetadata.newBuilder() + .setId( + MessageID.newBuilder(originalMetadata.id) + .setTimestamp(originalMetadata.id.timestamp) + .addSubsequence(currentIndex) + .build() + ) + .putAllProperties(originalMetadata.propertiesMap) + .setMessageType(messageType) + ) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt new file mode 100644 index 0000000..b3323cf --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Message +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.toByteArray +import com.exactpro.th2.common.utils.message.transport.logId +import java.nio.charset.Charset + +class TransportDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean +) : AbstractDecoder, RawMessage, ParsedMessage, Any>(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { + + override val RawMessage.messageMetadata: Map get() = this.metadata + override val RawMessage.messageSessionAlias: String get() = id.sessionAlias + + override val RawMessage.rawBody: ByteArray get() = body.toByteArray() + override val RawMessage.messageProtocol: String get() = protocol + override val RawMessage.logId: String get() = id.logId + override val RawMessage.logData: String get() = this.toString() + override val Message<*>.isParsed: Boolean get() = this is ParsedMessage + override val Message<*>.isRaw: Boolean get() = this is RawMessage + override val Message<*>.asRaw: RawMessage get() = this as RawMessage + + override fun createParsedMessage( + sourceMessage: RawMessage, + outputMessageType: String, + body: Map, + currentIndex: Int + ): Message<*> { + return ParsedMessage( + id = sourceMessage.id.toBuilder().addSubsequence(currentIndex).build(), + eventId = sourceMessage.eventId, + type = outputMessageType, + metadata = sourceMessage.metadata, + body = body + ) + } + + override fun String.toFieldValue(): String = this + override fun Array.toFieldValue(): Any = this +} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index 8b1b5ef..dc56098 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; @@ -27,8 +28,10 @@ import java.util.Map; import java.util.function.Supplier; +import com.exactpro.th2.codec.api.IReportingContext; +import com.exactpro.th2.codec.api.impl.ReportingContext; +import com.exactpro.th2.common.grpc.ConnectionID; import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -46,11 +49,14 @@ import com.google.protobuf.ByteString; class TestCsvCodec { + private final IReportingContext reportingContext = new ReportingContext(); + private static final String TEST_SESSION = "test-session"; + @Nested class TestPositive { @Test -void decodeArrayWithDifferentLength() throws IOException { + void decodeArrayWithDifferentLength() throws IOException { CsvCodecConfiguration configuration = new CsvCodecConfiguration(); configuration.setValidateLength(false); configuration.setPublishHeader(true); @@ -58,7 +64,7 @@ void decodeArrayWithDifferentLength() throws IOException { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B, , ,", "1,2,3,4")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -102,7 +108,7 @@ void decodeArrayInEnd() throws IOException { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B,C ,", "1,2,3")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -142,7 +148,7 @@ void decodeArrayInMiddle() throws IOException { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B, ,C", "1,2,3,4")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -184,7 +190,7 @@ void decodesDataAndSkipsHeader() { .addMessages(createCsvMessage("A,B,C", "1,2,3")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -223,7 +229,7 @@ void skipsHeaderPublishing() { .addMessages(createCsvMessage("A,B,C", "1,2,3")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(1, value.getMessagesCount()); Message message = getMessage(value, 0); @@ -256,7 +262,7 @@ void settingMessageTypeFromIncomingMessage() { .addMessages(csvMessage) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(1, value.getMessagesCount()); Message message = getMessage(value, 0); @@ -283,7 +289,7 @@ void trimsEndOfTheLine() { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B,C\n\r1,2,3\n")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -323,7 +329,7 @@ void decodesDataUsingDefaultHeader() { createCsvMessage("1,2,3") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(1, value.getMessagesCount()); Message message = getMessage(value, 0); @@ -349,7 +355,7 @@ void decodesDataWithEscapedCharacters() { createCsvMessage("A,B", "\"1,2\",\"\"\"value\"\"\"") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -390,7 +396,7 @@ void decodesDataCustomDelimiter() { createCsvMessage("A;B", "1,2;3") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -428,7 +434,7 @@ void trimsWhitespacesDuringDecoding() { createCsvMessage("A, B, C", "1, , 3 3") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -464,19 +470,35 @@ class TestNegative { @Test void reportsErrorIfNotDataFound() { CsvCodec codec = createCodec(); - Assertions.assertThrows(DecodeException.class, () -> - codec.decode(MessageGroup.newBuilder().addMessages(createCsvMessage("")).build())); + assertThrows(DecodeException.class, () -> + codec.decode(MessageGroup.newBuilder().addMessages(createCsvMessage("")).build(), reportingContext)); } @Test void reportsErrorIfRawDataIsEmpty() { CsvCodec codec = createCodec(); - Assertions.assertThrows(DecodeException.class, () -> - codec.decode(MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B,C")) - .addMessages(createCsvMessage("")) - .build()) + assertThrows(DecodeException.class, () -> + codec.decode( + MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B,C")) + .addMessages(createCsvMessage("")) + .build(), + reportingContext) + ); + } + + @Test + void reportsErrorIfDefaultHeaderAndDataHaveDifferentSize() { + CsvCodecConfiguration configuration = new CsvCodecConfiguration(); + configuration.setDefaultHeader(List.of("A", "B")); + CsvCodec codec = createCodec(configuration); + + assertThrows(DecodeException.class, () -> + codec.decode( + MessageGroup.newBuilder().addMessages(createCsvMessage("1,2,3")).build(), + reportingContext + ) ); } } @@ -499,7 +521,10 @@ private AnyMessage createCsvMessage(Map metadataProps, String... Builder builder = RawMessage.newBuilder() .setBody(ByteString.copyFrom(String.join(StringUtils.LF, data).getBytes(StandardCharsets.UTF_8))); RawMessageMetadata.Builder metadataBuilder = RawMessageMetadata.newBuilder() - .setId(MessageID.newBuilder().setSequence(System.nanoTime()).build()) + .setId(MessageID.newBuilder() + .setSequence(System.nanoTime()) + .setConnectionId(ConnectionID.newBuilder().setSessionAlias(TEST_SESSION).build()) + .build()) .putAllProperties(metadataProps); builder.setMetadata(metadataBuilder.build()); return AnyMessage.newBuilder().setRawMessage(builder).build(); diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt index 79ed76b..d79ed18 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt @@ -16,10 +16,12 @@ package com.exactpro.th2.codec.csv +import com.exactpro.th2.codec.api.IReportingContext +import com.exactpro.th2.codec.api.impl.ReportingContext import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup -import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction import io.netty.buffer.Unpooled @@ -30,13 +32,15 @@ import java.nio.charset.StandardCharsets import java.time.Instant class TestCsvCodecTransport { + private val reportingContext: IReportingContext = ReportingContext() + @Test fun decodesDataUsingDefaultHeader() { val configuration = CsvCodecConfiguration() configuration.defaultHeader = listOf("A", "B", "C") val codec = CsvCodec(configuration) val group = MessageGroup(listOf(createCsvMessage("1,2,3"))) - val decodedGroup = codec.decode(group) + val decodedGroup = codec.decode(group,reportingContext) Assertions.assertEquals(1, decodedGroup.messages.size) val message = decodedGroup.messages[0] as ParsedMessage