Skip to content

Commit

Permalink
[th2-5149] Read-db should create event with query details, unique que…
Browse files Browse the repository at this point in the history
…ry id and start/end times when user execute query (#16)

* gRPC execute method generates unique id for each execution and puts it into related event and messages.
* added readme for oracle redo log
* [TH2-5152] Catch exceptions during query execution to notify the listener
* gRPC server implementation skips columns with null value after fix.

---------

Co-authored-by: Oleg <oleg.smirnov@exactprosystems.com>
  • Loading branch information
Nikita-Smirnov-Exactpro and OptimumCode authored Jan 26, 2024
1 parent 5a979f4 commit ff5fb71
Show file tree
Hide file tree
Showing 26 changed files with 1,333 additions and 52 deletions.
67 changes: 63 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-read-db 0.6.0
# th2-read-db 0.7.0

The read-db is a component for extracting data from databases using JDBC technology. If database has JDBC driver the read can work with the database

Expand Down Expand Up @@ -44,6 +44,9 @@ publication:
queueSize: 1000
maxDelayMillis: 1000
maxBatchSize: 100
eventPublication:
maxBatchSizeInItems: 100
maxFlushTime: 1000
```
## Parameters
Expand All @@ -65,6 +68,7 @@ The list of queries that can be executed by read-db.
It might contain parameters in the following format: `${<name>[:<type>]}`.
The **type** part can be omitted if the type is `varchar`.
Examples: `${id:integer}`, `${registration_time:timestamp}`, `${first_name}`
[Types](https://docs.oracle.com/javase/8/docs/api/java/sql/JDBCType.html): bit, tinyint, smallint, integer, bigint, float, real, double, numeric, decimal, char, varchar, longvarchar, date, time, timestamp, binary, varbinary, longvarbinary, null, other, java_object, distinct, struct, array, blob, clob, ref, datalink, boolean, rowid, nchar, nvarchar, longnvarchar, nclob, sqlxml, ref_cursor, time_with_timezone, timestamp_with_timezone
+ defaultParameters - the default values for parameters. They will be used if the parameter was not specified in the request
+ messageType - the message type that should be associated with this query.
If it is set the read-db will set a property `th2.csv.override_message_type` with specified value
Expand Down Expand Up @@ -126,7 +130,10 @@ You can interact with read-db via gRPC. It supports executing direct queries and
# Publication

The read-db publishes all extracted data to MQ as raw messages in CSV format. The alias matches the **data source id**.
Message might contain property `th2.csv.override_message_type` with value that should be used as message type for the row message
Message might contain properties
* `th2.csv.override_message_type` with value that should be used as message type for the row message
* `th2.read-db.execute.uid` with unique identifier of query execution
* `th2.pull_task.update_hash` with hash of source and query configuration used pull query execution

# gRPC

Expand All @@ -135,6 +142,39 @@ Message might contain property `th2.csv.override_message_type` with value that s
Pull task tries to load the last message published to Cradle instead of initialise from the start
if you connect read-db to a data-provider using `com.exactpro.th2.dataprovider.lw.grpc.DataProviderService`.

## Server

### Execute method

User can trigger a query execution on a data source using this method. the method includes the activities:
* generation of growing unique id.
* query execution.
* publication results of the query execution to MQ where each message has `th2.read-db.execute.uid` property with the unique id
* publication event with data source, query, request parameters and the unique id.
Start/End even times correspond to the beginning/ending the query execution.
Body example:
```json
[
{
"dataSource": {
"url":"jdbc url for data base connection",
"username":"user name"
},
"query": {
"query":"SQL query text"
},
"parameters": {
"parameter": [
"parameter value"
]
},
"executionId": 123
}
]
```
NOTE: the event hasn't got attached message because the query can produce a lot of rows.
* streaming results of the query execution with the unique id as gRPC response.

# CR example
## infra 1
```yaml
Expand All @@ -144,7 +184,7 @@ metadata:
name: read-db
spec:
image-name: ghcr.io/th2-net/th2-read-db
image-version: 0.0.1
image-version: 0.7.0-dev
type: th2-read
custom-config:
dataSources:
Expand Down Expand Up @@ -182,6 +222,9 @@ spec:
queueSize: 1000
maxDelayMillis: 1000
maxBatchSize: 100
eventPublication:
maxBatchSizeInItems: 100
maxFlushTime: 1000
useTransport: true
pins:
- name: client
Expand Down Expand Up @@ -217,7 +260,7 @@ metadata:
name: read-db
spec:
imageName: ghcr.io/th2-net/th2-read-db
imageVersion: 0.0.1
imageVersion: 0.7.0-dev
type: th2-read
customConfig:
dataSources:
Expand Down Expand Up @@ -256,6 +299,9 @@ spec:
queueSize: 1000
maxDelayMillis: 1000
maxBatchSize: 100
eventPublication:
maxBatchSizeInItems: 100
maxFlushTime: 1000
useTransport: true
pins:
mq:
Expand Down Expand Up @@ -288,8 +334,21 @@ spec:
cpu: 50m
```

### Oracle redo logs
[How to configure th2-read-db to pull data from redo log](oracle-log-miner.md)

## Changes

### 0.7.0

#### Feature:

+ gRPC execute method generates unique id for each execution and puts it into related event and messages.

#### Fix:

+ gRPC Execute method doesn't respond rows with null values. gRPC server implementation skips columns with null value after fix.

### 0.6.0

#### Feature:
Expand Down
2 changes: 1 addition & 1 deletion app/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
release_version=0.6.0
release_version=0.7.0
description=read-db component for extracting data from databases using JDBC technology
4 changes: 4 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind"

testImplementation "org.junit.jupiter:junit-jupiter:5.10.0"
testImplementation "org.jetbrains.kotlin:kotlin-test-junit"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutines_version"
testImplementation "org.mockito.kotlin:mockito-kotlin:5.1.0"
testImplementation "io.strikt:strikt-core:0.34.1"
Expand All @@ -66,6 +67,9 @@ dependencies {
testImplementation "org.testcontainers:testcontainers"
testImplementation "org.testcontainers:mysql"
testImplementation "org.testcontainers:oracle-xe"
testImplementation "io.grpc:grpc-testing"

testImplementation 'com.exactpro.th2:junit-jupiter-integration:0.0.1-master-6956603819-5241ee5-SNAPSHOT'

testRuntimeOnly("com.mysql:mysql-connector-j:8.1.0") {
because("mysql support")
Expand Down
2 changes: 1 addition & 1 deletion core/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
release_version=0.6.0
release_version=0.7.0
description=core part of read db to create an application with required JDBC drivers in the classpath
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,21 @@ class DataBaseReader(
) {
scope.launch {
with(request) {
dataBaseService.executeQuery(sourceId, before, queryId, after, parameters)
.onCompletion {
if (it == null) {
listener.onComplete()
} else {
listener.onError(it)
try {
dataBaseService.executeQuery(sourceId, before, queryId, after, parameters)
.onCompletion {
if (it == null) {
listener.onComplete()
} else {
listener.onError(it)
}
}.collect {
rowTransformer(it).transferTo(sourceId, listener, rowListener)
}
}.collect {
rowTransformer(it).transferTo(sourceId, listener, rowListener)
}
} catch (ex: Exception) {
LOGGER.error(ex) { "cannot execute query '${request.queryId}' for '${request.sourceId}'" }
listener.onError(ex)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DataBaseReaderConfiguration(
val queries: Map<QueryId, QueryConfiguration>,
val startupTasks: List<StartupTaskConfiguration> = emptyList(),
val publication: PublicationConfiguration = PublicationConfiguration(),
val eventPublication: EventPublicationConfiguration = EventPublicationConfiguration(),
val useTransport: Boolean = false
)

Expand All @@ -40,6 +41,11 @@ class PublicationConfiguration(
val maxBatchSize: Int = 100,
)

class EventPublicationConfiguration(
val maxBatchSizeInItems: Int = 100,
val maxFlushTime: Long = 1000,
)

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package com.exactpro.th2.read.db.app
import com.exactpro.th2.read.db.core.DataSourceId
import com.exactpro.th2.read.db.core.QueryId
import com.exactpro.th2.read.db.core.QueryParametersValues
import com.fasterxml.jackson.annotation.JsonInclude
import java.time.Duration

@JsonInclude(JsonInclude.Include.NON_EMPTY)
class ExecuteQueryRequest(
val sourceId: DataSourceId,
val before: List<QueryId>,
val before: List<QueryId> = emptyList(),
val queryId: QueryId,
val after: List<QueryId>,
val parameters: QueryParametersValues,
val after: List<QueryId> = emptyList(),
val parameters: QueryParametersValues = emptyMap(),
)

class PullTableRequest(
Expand Down
51 changes: 44 additions & 7 deletions core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.exactpro.th2.read.db.bootstrap

import com.exactpro.th2.common.grpc.Direction.FIRST
import com.exactpro.th2.common.grpc.EventBatch
import com.exactpro.th2.common.message.direction
import com.exactpro.th2.common.message.plusAssign
import com.exactpro.th2.common.message.sequence
Expand All @@ -33,7 +34,9 @@ import com.exactpro.th2.common.schema.message.QueueAttribute
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage
import com.exactpro.th2.common.utils.event.EventBatcher
import com.exactpro.th2.common.utils.message.transport.toGroup
import com.exactpro.th2.common.utils.shutdownGracefully
import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService
import com.exactpro.th2.lwdataprovider.MessageSearcher
import com.exactpro.th2.read.db.app.DataBaseReader
Expand Down Expand Up @@ -132,6 +135,9 @@ internal fun setupApp(

val appScope = createScope(closeResource)
val componentBookName = factory.boxConfiguration.bookName
val rootEventId = factory.rootEventId
val maxEventBatchSize = factory.cradleManager.storage.entitiesFactory.maxTestEventBatchSize

val messageLoader: MessageLoader = createMessageLoader(factory, componentBookName)
val reader = if (cfg.useTransport) {
val messageRouter: MessageRouter<GroupBatch> = factory.transportGroupBatchRouter
Expand Down Expand Up @@ -174,7 +180,17 @@ internal fun setupApp(
createReader(cfg, appScope, messageQueue, closeResource, TableRow::toProtoMessage, messageLoader)
}

val handler = DataBaseReaderGrpcServer(reader)
val eventBatcher = configureEventStoring(cfg, maxEventBatchSize, closeResource, factory.eventBatchRouter::send)

val handler = DataBaseReaderGrpcServer(
reader,
{ cfg.dataSources[it] ?: error("'$it' data source isn't found in custom config") },
{ cfg.queries[it] ?: error("'$it' query isn't found in custom config") },
) { event, parentEventId ->
eventBatcher.onEvent(
event.toProto(parentEventId ?: rootEventId)
)
}

val server = factory.grpcRouter.startServer(handler)
.start()
Expand Down Expand Up @@ -271,6 +287,32 @@ private fun createScope(closeResource: (name: String, resource: () -> Unit) -> U
return appScope
}

private fun configureEventStoring(
cfg: DataBaseReaderConfiguration,
maxEventBatchSize: Int,
closeResource: (name: String, resource: () -> Unit) -> Unit,
send: (EventBatch) -> Unit
): EventBatcher {
val executor = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder()
.setNameFormat("event-saver-%d")
.build()
)

closeResource("event storing") {
LOGGER.info { "Shutdown executor" }
executor.shutdownGracefully(1, TimeUnit.MINUTES)
}

return EventBatcher(
maxEventBatchSize.toLong(),
cfg.eventPublication.maxBatchSizeInItems,
cfg.eventPublication.maxFlushTime,
executor,
send
)
}

private fun <BUILDER, DIRECTION> configureTransportMessageStoring(
cfg: DataBaseReaderConfiguration,
keyExtractor: (BUILDER) -> SessionKey<DIRECTION>,
Expand Down Expand Up @@ -305,12 +347,7 @@ private fun <BUILDER, DIRECTION> configureTransportMessageStoring(
LOGGER.error(ex) { "cannot complete drain task in specified timeout" }
}
LOGGER.info { "Shutdown executor" }
executor.shutdown()
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
LOGGER.error { "executor was not shutdown during specified timeout. Force shutdown" }
val runnables = executor.shutdownNow()
LOGGER.error { "${runnables.size} task(s) left" }
}
executor.shutdownGracefully(1, TimeUnit.MINUTES)
}
}
return messagesQueue
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@ import java.math.BigDecimal
import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage

internal const val TH2_CSV_OVERRIDE_MESSAGE_TYPE_PROPERTY = "th2.csv.override_message_type"
private const val TH2_READ_DB_UNIQUE_ID = "th2.read-db.execute.uid"
private const val SEPARATOR = ','

internal fun TableRow.toProtoMessage(dataSourceId: DataSourceId, properties: Map<String, String>): ProtoRawMessage.Builder {
Expand Down Expand Up @@ -65,6 +66,9 @@ internal fun TableRow.toTransportMessage(dataSourceId: DataSourceId, properties:
if (associatedMessageType != null) {
builder.addMetadataProperty(TH2_CSV_OVERRIDE_MESSAGE_TYPE_PROPERTY, associatedMessageType)
}
if (executionId != null) {
builder.addMetadataProperty(TH2_READ_DB_UNIQUE_ID, executionId.toString())
}
properties.forEach(builder::addMetadataProperty)

return builder
Expand Down Expand Up @@ -124,7 +128,7 @@ internal fun MessageSearchResponse.toTableRow(): TableRow {
}
}

private fun Any.toStringValue(): String = when (this) {
internal fun Any.toStringValue(): String = when (this) {
is BigDecimal -> stripTrailingZeros().toPlainString()
is Double -> toBigDecimal().toStringValue()
is Float -> toBigDecimal().toStringValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@

package com.exactpro.th2.read.db.core

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.annotation.JsonProperty

@JsonInclude(JsonInclude.Include.NON_EMPTY)
data class DataSourceConfiguration(
val url: String,
val username: String? = null,
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
val password: String? = null,
val properties: Map<String, String> = emptyMap(),
)
Loading

0 comments on commit ff5fb71

Please sign in to comment.