Skip to content

Commit

Permalink
[TH2-5004] support transport protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jul 28, 2023
1 parent 3e4a533 commit 9ccc20b
Show file tree
Hide file tree
Showing 55 changed files with 1,700 additions and 865 deletions.
52 changes: 35 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,18 @@ id: event / message id | null | null | null


# Configuration

schema component description example (rpt-data-provider.yml):
```
apiVersion: th2.exactpro.com/v1
apiVersion: th2.exactpro.com/v2
kind: Th2CoreBox
metadata:
name: rpt-data-provider
spec:
image-name: ghcr.io/th2-net/th2-rpt-data-provider
image-version: 2.2.5 // change this line if you want to use a newer version
imageName: ghcr.io/th2-net/th2-rpt-data-provider
imageVersion: 5.7.0 // change this line if you want to use a newer version
type: th2-rpt-data-provider
custom-config:
customConfig:
hostname: localhost
port: 8080
responseTimeout: 60000 // maximum request processing time in milliseconds
Expand Down Expand Up @@ -244,28 +245,44 @@ spec:
searchBySessionGroup: true // if true data-provider uses the session alias to group cache and translates http / gRPC requests by session alias to group th2 storage request
aliasToGroupCacheSize: 1000 // the size of cache for the mapping between session alias and group.
useTransportMode: true // if true data-provider uses th2 transport protocol to interact with thw codecs
pins: // pins are used to communicate with codec components to parse message data
- name: to_codec
connection-type: mq
attributes:
- to_codec
- raw
- publish
- name: from_codec
connection-type: mq
attributes:
mq:
subscribers:
- name: from_codec
attributes:
- from_codec
- parsed
- transport-group
- subscribe
extended-settings:
chart-cfg:
linkTo:
- box: codec-fix
pin: out_codec_decode
publishers:
- name: to_codec_fix
attributes:
- to_codec
- transport-group
- publish
filters:
- metadata:
- fieldName: "session_group"
expectedValue: "fix*"
operation: WILDCARD
extendedSettings:
chartCfg:
ref: schema-stable
path: custom-component
service:
enabled: false
nodePort: '31275'
envVariables:
JAVA_TOOL_OPTIONS: "-XX:+ExitOnOutOfMemoryError -Ddatastax-java-driver.advanced.connection.init-query-timeout=\"5000 milliseconds\""
JAVA_TOOL_OPTIONS: >
-XX:+ExitOnOutOfMemoryError
-Ddatastax-java-driver.advanced.connection.init-query-timeout="5000 milliseconds"
-XX:+UseContainerSupport
-XX:MaxRAMPercentage=85
resources:
limits:
memory: 2000Mi
Expand All @@ -283,6 +300,7 @@ spec:
### Feature
+ Added optional session alias to group cache to translate cradle queries from `messages` to `grouped_messages` tables.
User can enable this feature by `searchBySessionGroup` option and set cache size in entries by `aliasToGroupCacheSize` option
+ Added mode to interact with codec by th2 transport protocol

### Updated
+ gradle
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
exclude group: 'com.exactpro.th2', module: 'cradle-core'
exclude group: 'com.exactpro.th2', module: 'cradle-cassandra'
}
implementation 'com.exactpro.th2:common-utils:2.1.0-transport-protocol-+'

implementation "com.exactpro.th2:cradle-core:${cradleVersion}"

Expand Down
22 changes: 15 additions & 7 deletions src/main/kotlin/com/exactpro/th2/rptdataprovider/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.exactpro.cradle.testevents.*
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.MessageID
import com.exactpro.th2.common.message.toTimestamp
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId
import com.fasterxml.jackson.databind.ObjectMapper
import io.prometheus.client.Gauge
import io.prometheus.client.Histogram
Expand Down Expand Up @@ -200,12 +201,19 @@ fun StoredTestEventBatch.tryToGetTestEvents(parentEventId: StoredTestEventId? =
}


fun StoredMessageId.convertToProto(): MessageID {
return MessageID.newBuilder()
fun StoredMessageId.convertToProto(): MessageID = MessageID.newBuilder()
.setSequence(this.sequence)
.setDirection(cradleDirectionToGrpc(direction))
.setConnectionId(ConnectionID.newBuilder().setSessionAlias(this.sessionAlias))
.setTimestamp(timestamp.toTimestamp())
.setBookName(bookId.name)
.build()

fun StoredMessageId.convertToTransport(): MessageId {
return MessageId.builder()
.setSequence(this.sequence)
.setDirection(cradleDirectionToGrpc(direction))
.setConnectionId(ConnectionID.newBuilder().setSessionAlias(this.sessionAlias))
.setTimestamp(timestamp.toTimestamp())
.setBookName(bookId.name)
.setDirection(cradleDirectionToTransport(direction))
.setSessionAlias(this.sessionAlias)
.setTimestamp(timestamp)
.build()
}
}
134 changes: 93 additions & 41 deletions src/main/kotlin/com/exactpro/th2/rptdataprovider/Main.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,17 +12,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
*/
package com.exactpro.th2.rptdataprovider

import com.exactpro.th2.common.grpc.EventBatch
import com.exactpro.th2.common.metrics.LIVENESS_MONITOR
import com.exactpro.th2.common.metrics.READINESS_MONITOR
import com.exactpro.th2.common.metrics.liveness
import com.exactpro.th2.common.metrics.readiness
import com.exactpro.th2.common.schema.factory.CommonFactory
import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration
import com.exactpro.th2.rptdataprovider.entities.configuration.CustomConfigurationClass
import com.exactpro.th2.rptdataprovider.entities.mappers.ProtoMessageMapper
import com.exactpro.th2.rptdataprovider.entities.mappers.TransportMessageMapper
import com.exactpro.th2.rptdataprovider.server.GrpcServer
import com.exactpro.th2.rptdataprovider.server.HttpServer
import com.exactpro.th2.rptdataprovider.server.ServerType
Expand All @@ -44,43 +43,61 @@ import kotlin.system.exitProcess

private val logger = KotlinLogging.logger {}

class Main {
class Main @InternalAPI constructor(args: Array<String>) {

private val commonFactory: CommonFactory
private val configuration: Configuration

private val context: Context
private val protoContext: ProtoContext?
private val transportContext: TransportContext?

private val resources: Deque<AutoCloseable> = ConcurrentLinkedDeque()
private val lock = ReentrantLock()
private val condition: Condition = lock.newCondition()

@InternalAPI
constructor(args: Array<String>) {

init {
configureShutdownHook(resources, lock, condition)

commonFactory = CommonFactory.createFromArguments(*args)
resources += commonFactory

val configuration =
configuration =
Configuration(commonFactory.getCustomConfiguration(CustomConfigurationClass::class.java))

context = Context(
configuration,

serverType = ServerType.valueOf(configuration.serverType.value),

cradleManager = commonFactory.cradleManager.also {
resources += it
},
messageRouterRawBatch = commonFactory.messageRouterMessageGroupBatch.also {
resources += it
},
messageRouterParsedBatch = commonFactory.messageRouterMessageGroupBatch.also {
resources += it
},
grpcConfig = commonFactory.grpcConfiguration
)
if (configuration.useTransportMode.value.toBoolean()) {
protoContext = ProtoContext(
configuration,

serverType = ServerType.valueOf(configuration.serverType.value),

cradleManager = commonFactory.cradleManager.also {
resources += it
},
protoMessageRouterPublisher = commonFactory.messageRouterMessageGroupBatch.also {
resources += it
},
protoMessageRouterSubscriber = commonFactory.messageRouterMessageGroupBatch.also {
resources += it
},
grpcConfig = commonFactory.grpcConfiguration
)
transportContext = null
} else {
protoContext = null
transportContext = TransportContext(
configuration,

serverType = ServerType.valueOf(configuration.serverType.value),

cradleManager = commonFactory.cradleManager.also {
resources += it
},
transportMessageRouterPublisher = commonFactory.transportGroupBatchRouter.also {
resources += it
},
transportMessageRouterSubscriber = commonFactory.transportGroupBatchRouter.also {
resources += it
},
grpcConfig = commonFactory.grpcConfiguration
)
}
}


Expand Down Expand Up @@ -108,19 +125,54 @@ class Main {
@InternalAPI
private fun startServer() {

System.setProperty(IO_PARALLELISM_PROPERTY_NAME, context.configuration.ioDispatcherThreadPoolSize.value)
if (configuration.useTransportMode.value.toBoolean()) {
requireNotNull(transportContext) {
"Transport context can't be null"
}

System.setProperty(
IO_PARALLELISM_PROPERTY_NAME,
transportContext.configuration.ioDispatcherThreadPoolSize.value
)

when (transportContext.serverType) {
HTTP -> {
HttpServer(transportContext, TransportMessageMapper::convertToHttpMessage).run()
}
GRPC -> {
val grpcRouter = commonFactory.grpcRouter
resources += grpcRouter

when (context.serverType) {
HTTP -> {
HttpServer(context).run()
val grpcServer = GrpcServer(transportContext, grpcRouter, TransportMessageMapper::convertToTransportMessageData)

resources += AutoCloseable { grpcServer.stop() }
grpcServer.start()
}
}
} else {
requireNotNull(protoContext) {
"Protobuf context can't be null"
}
GRPC -> {
val grpcRouter = commonFactory.grpcRouter
resources += grpcRouter

val grpcServer = GrpcServer(context, grpcRouter)
resources += AutoCloseable { grpcServer.stop() }
grpcServer.start()
System.setProperty(
IO_PARALLELISM_PROPERTY_NAME,
protoContext.configuration.ioDispatcherThreadPoolSize.value
)

when (protoContext.serverType) {
HTTP -> {
HttpServer(protoContext, ProtoMessageMapper::convertToHttpMessage).run()
}

GRPC -> {
val grpcRouter = commonFactory.grpcRouter
resources += grpcRouter

val grpcServer = GrpcServer(protoContext, grpcRouter, ProtoMessageMapper::convertToGrpcMessageData)

resources += AutoCloseable { grpcServer.stop() }
grpcServer.start()
}
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/main/kotlin/com/exactpro/th2/rptdataprovider/Utils.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
/*
* Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
*/

package com.exactpro.th2.rptdataprovider

Expand All @@ -29,6 +29,13 @@ fun cradleDirectionToGrpc(direction: Direction): com.exactpro.th2.common.grpc.Di
com.exactpro.th2.common.grpc.Direction.SECOND
}

fun cradleDirectionToTransport(direction: Direction): com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction {
return if (direction == Direction.FIRST)
com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction.INCOMING
else
com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction.OUTGOING
}

fun grpcDirectionToCradle(direction: com.exactpro.th2.common.grpc.Direction): Direction {
return if (direction == com.exactpro.th2.common.grpc.Direction.FIRST)
Direction.FIRST
Expand Down
Loading

0 comments on commit 9ccc20b

Please sign in to comment.