From 2cdeedba77dbdd850742dbf34f975d81199241c4 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Mon, 31 Jul 2023 12:03:21 +0400 Subject: [PATCH] [TH2-5004] Added new files --- .../th2/rptdataprovider/ProtoContext.kt | 162 +++++++++++++++++ .../th2/rptdataprovider/TransportContext.kt | 163 ++++++++++++++++++ .../mappers/TransportMessageMapper.kt | 92 ++++++++++ 3 files changed, 417 insertions(+) create mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt create mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt create mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/mappers/TransportMessageMapper.kt diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt new file mode 100644 index 00000000..0ac708bc --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt @@ -0,0 +1,162 @@ +/* + * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.rptdataprovider + + +import com.exactpro.cradle.CradleManager +import com.exactpro.th2.common.grpc.Message +import com.exactpro.th2.common.grpc.MessageGroup +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration +import com.exactpro.th2.common.schema.message.MessageRouter +import com.exactpro.th2.rptdataprovider.cache.EventCache +import com.exactpro.th2.rptdataprovider.cache.MessageCache +import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration +import com.exactpro.th2.rptdataprovider.entities.filters.PredicateFactory +import com.exactpro.th2.rptdataprovider.entities.filters.events.AttachedMessageFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventBodyFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventNameFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventStatusFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventTypeFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.ParentEventIdFilter +import com.exactpro.th2.rptdataprovider.entities.filters.messages.AttachedEventFilters +import com.exactpro.th2.rptdataprovider.entities.filters.messages.MessageBodyBinaryFilter +import com.exactpro.th2.rptdataprovider.entities.filters.messages.MessageBodyFilter +import com.exactpro.th2.rptdataprovider.entities.filters.messages.MessageTypeFilter +import com.exactpro.th2.rptdataprovider.entities.internal.MessageWithMetadata +import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity +import com.exactpro.th2.rptdataprovider.handlers.ProtoSearchMessagesHandler +import com.exactpro.th2.rptdataprovider.handlers.SearchMessagesHandler +import com.exactpro.th2.rptdataprovider.producers.EventProducer +import com.exactpro.th2.rptdataprovider.producers.MessageProducer +import com.exactpro.th2.rptdataprovider.producers.ProtoMessageProducer +import com.exactpro.th2.rptdataprovider.serialization.InstantBackwardCompatibilitySerializer +import com.exactpro.th2.rptdataprovider.server.ServerType +import com.exactpro.th2.rptdataprovider.services.cradle.CradleService +import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService +import com.exactpro.th2.rptdataprovider.services.rabbitmq.ProtoRabbitMqService +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.ktor.http.* +import java.time.Instant + +@Suppress("MemberVisibilityCanBePrivate") +class ProtoContext( + configuration: Configuration, + + serverType: ServerType, + + timeout: Long = configuration.responseTimeout.value.toLong(), + cacheTimeout: Long = configuration.serverCacheTimeout.value.toLong(), + + jacksonMapper: ObjectMapper = jacksonObjectMapper() + .registerModule(JavaTimeModule()) + .registerModule(SimpleModule("backward_compatibility").apply { + addSerializer(Instant::class.java, InstantBackwardCompatibilitySerializer) + }) + .enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES), + + cradleManager: CradleManager, + + protoMessageRouterPublisher: MessageRouter, + protoMessageRouterSubscriber: MessageRouter, + + grpcConfig: GrpcConfiguration, + + cradleService: CradleService = CradleService( + configuration, + cradleManager + ), + + rabbitMqService: RabbitMqService = ProtoRabbitMqService( + configuration, + protoMessageRouterSubscriber, + protoMessageRouterPublisher + ), + + eventProducer: EventProducer = EventProducer(cradleService, jacksonMapper), + + eventCache: EventCache = EventCache(cacheTimeout, configuration.eventCacheSize.value.toLong(), eventProducer), + + messageProducer: MessageProducer = ProtoMessageProducer( + cradleService, + rabbitMqService + ), + + messageCache: MessageCache = MessageCache(configuration, messageProducer), + + eventFiltersPredicateFactory: PredicateFactory = PredicateFactory( + mapOf( + AttachedMessageFilter.filterInfo to AttachedMessageFilter.Companion::build, + ParentEventIdFilter.filterInfo to ParentEventIdFilter.Companion::build, + EventTypeFilter.filterInfo to EventTypeFilter.Companion::build, + EventNameFilter.filterInfo to EventNameFilter.Companion::build, + EventBodyFilter.filterInfo to EventBodyFilter.Companion::build, + EventStatusFilter.filterInfo to EventStatusFilter.Companion::build + ), cradleService + ), + + messageFiltersPredicateFactory: PredicateFactory> = PredicateFactory( + mapOf( + AttachedEventFilters.filterInfo to AttachedEventFilters.Companion::build, + MessageTypeFilter.filterInfo to MessageTypeFilter.Companion::build, + MessageBodyFilter.filterInfo to MessageBodyFilter.Companion::build, + MessageBodyBinaryFilter.filterInfo to MessageBodyBinaryFilter.Companion::build + ), cradleService + ), + + + enableCaching: Boolean = configuration.enableCaching.value.toBoolean(), + + keepAliveTimeout: Long = configuration.keepAliveTimeout.value.toLong(), + + cacheControlNotModified: CacheControl = configuration.notModifiedObjectsLifetime.value.toInt().let { + cacheControlConfig(it, enableCaching) + }, + + cacheControlRarelyModified: CacheControl = configuration.rarelyModifiedObjects.value.toInt().let { + cacheControlConfig(it, enableCaching) + } +): Context( + configuration, + serverType, + timeout, + cacheTimeout, + jacksonMapper, + cradleManager, + grpcConfig, + cradleService, + rabbitMqService, + eventProducer, + eventCache, + messageProducer, + messageCache, + eventFiltersPredicateFactory, + messageFiltersPredicateFactory, + enableCaching, + keepAliveTimeout, + cacheControlNotModified, + cacheControlRarelyModified +) { + override val searchMessagesHandler: SearchMessagesHandler = + ProtoSearchMessagesHandler(this) +} diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt new file mode 100644 index 00000000..5811efef --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt @@ -0,0 +1,163 @@ +/* + * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.rptdataprovider + + +import com.exactpro.cradle.CradleManager +import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration +import com.exactpro.th2.common.schema.message.MessageRouter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.rptdataprovider.cache.EventCache +import com.exactpro.th2.rptdataprovider.cache.MessageCache +import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration +import com.exactpro.th2.rptdataprovider.entities.filters.PredicateFactory +import com.exactpro.th2.rptdataprovider.entities.filters.events.AttachedMessageFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventBodyFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventNameFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventStatusFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.EventTypeFilter +import com.exactpro.th2.rptdataprovider.entities.filters.events.ParentEventIdFilter +import com.exactpro.th2.rptdataprovider.entities.filters.messages.AttachedEventFilters +import com.exactpro.th2.rptdataprovider.entities.filters.messages.MessageBodyBinaryFilter +import com.exactpro.th2.rptdataprovider.entities.filters.messages.MessageBodyFilter +import com.exactpro.th2.rptdataprovider.entities.filters.messages.MessageTypeFilter +import com.exactpro.th2.rptdataprovider.entities.internal.MessageWithMetadata +import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity +import com.exactpro.th2.rptdataprovider.handlers.SearchMessagesHandler +import com.exactpro.th2.rptdataprovider.handlers.TransportSearchMessagesHandler +import com.exactpro.th2.rptdataprovider.producers.EventProducer +import com.exactpro.th2.rptdataprovider.producers.MessageProducer +import com.exactpro.th2.rptdataprovider.producers.TransportMessageProducer +import com.exactpro.th2.rptdataprovider.serialization.InstantBackwardCompatibilitySerializer +import com.exactpro.th2.rptdataprovider.server.ServerType +import com.exactpro.th2.rptdataprovider.services.cradle.CradleService +import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService +import com.exactpro.th2.rptdataprovider.services.rabbitmq.TransportRabbitMqService +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.ktor.http.* +import java.time.Instant + +@Suppress("MemberVisibilityCanBePrivate") +class TransportContext( + configuration: Configuration, + + serverType: ServerType, + + timeout: Long = configuration.responseTimeout.value.toLong(), + cacheTimeout: Long = configuration.serverCacheTimeout.value.toLong(), + + jacksonMapper: ObjectMapper = jacksonObjectMapper() + .registerModule(JavaTimeModule()) + .registerModule(SimpleModule("backward_compatibility").apply { + addSerializer(Instant::class.java, InstantBackwardCompatibilitySerializer) + }) + .enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES), + + cradleManager: CradleManager, + + transportMessageRouterPublisher: MessageRouter, + transportMessageRouterSubscriber: MessageRouter, + + grpcConfig: GrpcConfiguration, + + cradleService: CradleService = CradleService( + configuration, + cradleManager + ), + + rabbitMqService: RabbitMqService = TransportRabbitMqService( + configuration, + transportMessageRouterPublisher, + transportMessageRouterSubscriber + ), + + eventProducer: EventProducer = EventProducer(cradleService, jacksonMapper), + + eventCache: EventCache = EventCache(cacheTimeout, configuration.eventCacheSize.value.toLong(), eventProducer), + + messageProducer: MessageProducer = TransportMessageProducer( + cradleService, + rabbitMqService + ), + + messageCache: MessageCache = MessageCache(configuration, messageProducer), + + eventFiltersPredicateFactory: PredicateFactory = PredicateFactory( + mapOf( + AttachedMessageFilter.filterInfo to AttachedMessageFilter.Companion::build, + ParentEventIdFilter.filterInfo to ParentEventIdFilter.Companion::build, + EventTypeFilter.filterInfo to EventTypeFilter.Companion::build, + EventNameFilter.filterInfo to EventNameFilter.Companion::build, + EventBodyFilter.filterInfo to EventBodyFilter.Companion::build, + EventStatusFilter.filterInfo to EventStatusFilter.Companion::build + ), cradleService + ), + + messageFiltersPredicateFactory: PredicateFactory> = PredicateFactory( + mapOf( + AttachedEventFilters.filterInfo to AttachedEventFilters.Companion::build, + MessageTypeFilter.filterInfo to MessageTypeFilter.Companion::build, + MessageBodyFilter.filterInfo to MessageBodyFilter.Companion::build, + MessageBodyBinaryFilter.filterInfo to MessageBodyBinaryFilter.Companion::build + ), cradleService + ), + + + enableCaching: Boolean = configuration.enableCaching.value.toBoolean(), + + keepAliveTimeout: Long = configuration.keepAliveTimeout.value.toLong(), + + cacheControlNotModified: CacheControl = configuration.notModifiedObjectsLifetime.value.toInt().let { + cacheControlConfig(it, enableCaching) + }, + + cacheControlRarelyModified: CacheControl = configuration.rarelyModifiedObjects.value.toInt().let { + cacheControlConfig(it, enableCaching) + } +): Context( + configuration, + serverType, + timeout, + cacheTimeout, + jacksonMapper, + cradleManager, + grpcConfig, + cradleService, + rabbitMqService, + eventProducer, + eventCache, + messageProducer, + messageCache, + eventFiltersPredicateFactory, + messageFiltersPredicateFactory, + enableCaching, + keepAliveTimeout, + cacheControlNotModified, + cacheControlRarelyModified +) { + override val searchMessagesHandler: SearchMessagesHandler = + TransportSearchMessagesHandler(this) + +} diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/mappers/TransportMessageMapper.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/mappers/TransportMessageMapper.kt new file mode 100644 index 00000000..62ea1017 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/mappers/TransportMessageMapper.kt @@ -0,0 +1,92 @@ +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.rptdataprovider.entities.mappers + +import com.exactpro.cradle.messages.StoredMessage +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.utils.message.transport.toProto +import com.exactpro.th2.dataprovider.grpc.MessageData +import com.exactpro.th2.rptdataprovider.convertToProto +import com.exactpro.th2.rptdataprovider.convertToTransport +import com.exactpro.th2.rptdataprovider.entities.internal.MessageWithMetadata +import com.exactpro.th2.rptdataprovider.entities.responses.HttpMessage +import mu.KotlinLogging +import java.util.* + +object TransportMessageMapper { + + private val logger = KotlinLogging.logger { } + + fun storedMessageToRawTransport(storedMessage: StoredMessage): RawMessage { + return RawMessage.builder() + .setId(storedMessage.id.convertToTransport()) + .setProtocol(storedMessage.protocol) + .setMetadata(storedMessage.metadata.toMap()) + .also { builder -> + storedMessage.content?.let(builder::setBody) ?: logger.error { + "Received stored message has no content. StoredMessageId: ${storedMessage.id}" + } + }.build() + } + + //FIXME: migrate to grpc interface 1.0.0+ + //FIXME: return raw message body + fun convertToTransportMessageData(messageWithMetadata: MessageWithMetadata): List { + return messageWithMetadata.message.parsedMessageGroup?.map { groupElement -> + MessageData.newBuilder() + .setMessageId(groupElement.id) + .setTimestamp(groupElement.id.timestamp) +// .setBodyBase64(messageWithMetadata.message.rawMessageBody.let { +// Base64.getEncoder().encodeToString(it) +// }) + .setMessageType(groupElement.messageType) + .setMessage( + groupElement.message.toProto( + messageWithMetadata.message.book, + messageWithMetadata.message.sessionGroup + ) + ) + .build() + } ?: listOf(messageWithMetadata.message.let { message -> + MessageData.newBuilder() + .setMessageId(message.id.convertToProto()) + .setTimestamp(message.timestamp.toTimestamp()) +// .setBodyBase64(message.rawMessageBody.let { Base64.getEncoder().encodeToString(it) }) + .build() + }) + } + + + fun convertToHttpMessage(messageWithMetadata: MessageWithMetadata): HttpMessage { + return with(messageWithMetadata) { + val httpMessage = HttpMessage( + timestamp = message.timestamp, + messageType = messageWithMetadata.message.parsedMessageGroup + ?.joinToString("/") { it.messageType } ?: messageWithMetadata.message.imageType ?: "", + direction = message.direction, + sessionId = message.sessionAlias, + attachedEventIds = message.attachedEventIds, + messageId = message.id.toString(), + body = MessageMapper.getBodyMessage(messageWithMetadata), + bodyBase64 = message.rawMessageBody.let { Base64.getEncoder().encodeToString(it) } + ) + httpMessage + } + } +} \ No newline at end of file