Skip to content

Commit

Permalink
[TH2-5123] read-db pull task should be able to invalidate state read …
Browse files Browse the repository at this point in the history
…data (#14)

Co-authored-by: Oleg Smirnov <oleg.smirnov@exactprosystems.com>
  • Loading branch information
Nikita-Smirnov-Exactpro and OptimumCode authored Nov 24, 2023
1 parent f40e6c3 commit 224c77f
Show file tree
Hide file tree
Showing 18 changed files with 501 additions and 52 deletions.
26 changes: 22 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-read-db 0.4.0
# th2-read-db 0.5.0

The read-db is a component for extracting data from databases using JDBC technology. If database has JDBC driver the read can work with the database

Expand Down Expand Up @@ -88,7 +88,13 @@ The read tasks tries to read all data from the specified data source using speci
Pulls updates from the specified data source using the specified queries.

+ dataSource - the id of the source that should be used
+ startFromLastReadRow - task tries to load previous state via `data-provider` if this option is `true`
+ startFromLastReadRow - task tries to load previous state via `data-provider` if this option is `true`
+ resetStateParameters - optional parameters to scheduled reset internal state and re-init task.
+ afterDate - optional parameter with date time in [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT) format, for example: `"2023-11-14T12:12:34.567890123Z"`
The option is used to set single reset at date.
+ afterTime - optional parameter with time in [ISO_LOCAL_TIME](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_TIME) format, for example: `"12:12:34.567890123"`
The time value must be specified in the UTC zone.
The option is used to set every day reset at time.
+ initQueryId - the id of the query that should be used to retrieve the current state of the database.
NOTE: this parameter is used to initialize state and read-db doesn't publish retrieved messages to MQ router.
+ initParameters - the parameters that should be used in the init query. Also, The task uses these parameters to configure the first `updateQuery` execution if `initQuery` parameter is not specified
Expand All @@ -102,8 +108,9 @@ Pulls updates from the specified data source using the specified queries.
This type of task work by the algorithm:

1) Initialize parameters for the first `updateQuery`
* task tris to load the last message with `th2.pull_task.update_hash` property published to Cradle if startFromLastReadRow is `true`.<br>
NOTE: if read-db isn't connected to a data-provider [Go to gRPC client configuration](#client), the task failures.
* task tris to load the last message with `th2.pull_task.update_hash` property published to Cradle if startFromLastReadRow is `true`.
the time boundary for message loading is the nearest reset time calculated by `resetStateParameters` option if set, otherwise the execution time minus one day<br>
**NOTE**: if read-db isn't connected to a data-provider [Go to gRPC client configuration](#client), the task failures.
* if `startFromLastReadRow` is `false` or no one message hasn't been published into Cradle by related session alias, task tries to execute init query.
* if init query is `null`, task uses `initProperties` to initialize property for the first `updateQuery` run.<br>
NOTE: if `initProperties` doesn't defined, the first `updateQuery` is run with `NULL` value for all used parameters
Expand Down Expand Up @@ -283,6 +290,16 @@ spec:

## Changes

### 0.5.0

#### Feature:

+ added the `reset state parameters` option to configure static or dynamic dates of reset

#### Update:

+ grpc-read-db: `0.0.5`

### 0.4.0

#### Feature:
Expand All @@ -292,6 +309,7 @@ spec:
+ pull task optionally loads the last message for initialisation from a data-provider via gRPC

#### Update:

+ common: `5.7.1-dev`
+ grpc-service-generator: `3.5.1`
+ grpc-read-db: `0.0.4`
Expand Down
2 changes: 1 addition & 1 deletion app/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
release_version=0.4.0
release_version=0.5.0
description=read-db component for extracting data from databases using JDBC technology
2 changes: 1 addition & 1 deletion core/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
release_version=0.4.0
release_version=0.5.0
description=core part of read db to create an application with required JDBC drivers in the classpath
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.launch
import mu.KotlinLogging
import java.time.Clock
import java.time.Duration

class DataBaseReader(
Expand Down Expand Up @@ -67,6 +68,7 @@ class DataBaseReader(
scope.submitTask(
task.dataSource,
task.startFromLastReadRow,
task.resetStateParameters,
task.initQueryId,
task.initParameters,
task.useColumns,
Expand Down Expand Up @@ -113,6 +115,7 @@ class DataBaseReader(
scope.submitTask(
dataSourceId,
startFromLastReadRow,
resetStateParameters,
initQueryId,
initParameters,
useColumns,
Expand Down Expand Up @@ -187,12 +190,13 @@ class DataBaseReader(
pullingListener: UpdateListener,
rowListener: RowListener,
messageLoader: MessageLoader,
clock: Clock = Clock.systemDefaultZone()
): DataBaseReader {
val sourceProvider: DataSourceProvider = BaseDataSourceProvider(configuration.dataSources)
val queryProvider: QueryProvider = BaseQueryProvider(configuration.queries)
val dataBaseService: DataBaseService = DataBaseServiceImpl(sourceProvider, queryProvider)
val hashService: HashService = BaseHashServiceImpl(sourceProvider, queryProvider)
val monitorService: DataBaseMonitorService = DataBaseMonitorServiceImpl(dataBaseService, hashService)
val monitorService: DataBaseMonitorService = DataBaseMonitorServiceImpl(dataBaseService, hashService, clock)
return DataBaseReader(
dataBaseService,
monitorService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import com.exactpro.th2.read.db.core.QueryId
import com.exactpro.th2.read.db.core.QueryParametersValues
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonTypeName
import java.time.Instant
import java.time.LocalTime

class DataBaseReaderConfiguration(
val dataSources: Map<DataSourceId, DataSourceConfiguration>,
Expand Down Expand Up @@ -57,6 +59,7 @@ data class ReadTaskConfiguration(
data class PullTaskConfiguration(
val dataSource: DataSourceId,
val startFromLastReadRow: Boolean = false,
val resetStateParameters: ResetState = ResetState(),
val initQueryId: QueryId?,
val initParameters: QueryParametersValues = emptyMap(),
val updateQueryId: QueryId,
Expand All @@ -65,6 +68,11 @@ data class PullTaskConfiguration(
val interval: Long,
) : StartupTaskConfiguration

data class ResetState(
val afterDate: Instant? = null,
val afterTime: LocalTime? = null,
)

fun DataBaseReaderConfiguration.validate(): List<String> {
val sourceIDs = dataSources.keys
val queryIDs = queries.keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ExecuteQueryRequest(
class PullTableRequest(
val dataSourceId: DataSourceId,
val startFromLastReadRow: Boolean,
val resetStateParameters: ResetState,
val initQueryId: QueryId?,
val initParameters: QueryParametersValues,
val useColumns: Set<String>,
Expand Down
17 changes: 14 additions & 3 deletions core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import com.exactpro.th2.read.db.core.TableRow
import com.exactpro.th2.read.db.core.UpdateListener
import com.exactpro.th2.read.db.impl.grpc.DataBaseReaderGrpcServer
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.google.protobuf.Timestamp
import com.google.protobuf.util.Timestamps
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -195,14 +197,23 @@ private fun createMessageLoader(
componentBookName: String
) = runCatching {
MessageSearcher.create(factory.grpcRouter.getService(DataProviderService::class.java)).run {
MessageLoader { dataSourceId, properties ->
MessageLoader { dataSourceId, horizon, properties ->
val horizonTimestamp: Timestamp? = horizon?.toTimestamp()
val searcherInterval: Duration = horizon?.let {
// FIXME: add overload with explicit end time for findLastOrNull method
Duration.between(it, Instant.now()).plusHours(1)
} ?: Duration.ofDays(1)

findLastOrNull(
book = componentBookName,
sessionAlias = dataSourceId.id,
direction = FIRST,
searchInterval = Duration.ofDays(1),
searchInterval = searcherInterval,
) {
properties.all { (key, value) -> it.message.getMessagePropertiesOrDefault(key, null) == value }
with(it.message) {
(horizon?.let { Timestamps.compare(horizonTimestamp, messageId.timestamp) < 0 } ?: true)
&& properties.all { (key, value) -> getMessagePropertiesOrDefault(key, null) == value }
}
}?.toTableRow()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@

package com.exactpro.th2.read.db.core

import com.exactpro.th2.read.db.app.ResetState
import kotlinx.coroutines.CoroutineScope
import mu.KotlinLogging
import java.time.Duration
import java.time.Instant
import java.time.LocalTime
import java.time.ZoneOffset
import java.time.temporal.ChronoUnit

interface DataBaseMonitorService : AutoCloseable {
fun CoroutineScope.submitTask(
dataSourceId: DataSourceId,
startFromLastReadRow: Boolean,
resetStateParameters: ResetState,
initQueryId: QueryId?,
initParameters: QueryParametersValues,
useColumns: Set<String>,
Expand All @@ -36,6 +43,47 @@ interface DataBaseMonitorService : AutoCloseable {
suspend fun cancelTask(id: TaskId)

companion object {
private val LOGGER = KotlinLogging.logger { }
internal const val TH2_PULL_TASK_UPDATE_HASH_PROPERTY = "th2.pull_task.update_hash"

/**
* Calculates the nearest reset date before or equal [current]. [current] time is excluded
* @return null when:<br>
* * [ResetState.afterDate] and [ResetState.afterTime] are null
* * [ResetState.afterDate] is after [current] and [ResetState.afterTime] is null
*
* otherwise [Instant]
*/
internal fun ResetState.calculateNearestResetDate(current: Instant): Instant? {
val boundaryByAfterDate: Instant = afterDate?.let {
if (it > current) {
null
} else {
it
}
} ?: Instant.MIN
val boundaryAfterTime: Instant = afterTime?.let {
val currentRestDate = current.withTime(it)
if (currentRestDate > current) {
currentRestDate.minus(1L, ChronoUnit.DAYS)
} else {
currentRestDate
}

} ?: Instant.MIN

val result: Instant = maxOf(boundaryByAfterDate, boundaryAfterTime)

return (if (result == Instant.MIN) null else result).also {
LOGGER.trace { "Calculated nearest reset date, result: $it, data boundary: $boundaryByAfterDate, time boundary: $boundaryAfterTime, after date: $afterDate, after time: $afterTime, now: $current" }
}
}

private fun Instant.withTime(time: LocalTime): Instant = atZone(ZoneOffset.UTC)
.withHour(time.hour)
.withMinute(time.minute)
.withSecond(time.second)
.withNano(time.nano)
.toInstant()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

package com.exactpro.th2.read.db.core

import java.time.Instant

fun interface MessageLoader {
/**
* Loads the most recent message from the Cradle produced by data source with [dataSourceId]
* that has a specified set of [properties] in its metadata
* that message timestamp is after or equal [lowerBoundary] and has a specified set of [properties] in its metadata
* @param dataSourceId is used for session alias determination related to the data source
* @param lowerBoundary the time boundary for message loading. Only messages with a timestamp greater or equal to this value will be loaded if they exist.
* Must be lower than the current time if set
* @param properties is used for filtering messages to find the first suitable
*/
fun load(dataSourceId: DataSourceId, properties: Map<String, String>): TableRow?
fun load(dataSourceId: DataSourceId, lowerBoundary: Instant?, properties: Map<String, String>): TableRow?

companion object {
@JvmField
val DISABLED = MessageLoader { _, _ -> error("Message loader doesn't configured to execute request") }
val DISABLED = MessageLoader { _, _, _ -> error("Message loader doesn't configured to execute request") }
}
}
Loading

0 comments on commit 224c77f

Please sign in to comment.