diff --git a/README.md b/README.md
index fafcc88..e8256c6 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# th2-read-db 0.4.0
+# th2-read-db 0.5.0
The read-db is a component for extracting data from databases using JDBC technology. If database has JDBC driver the read can work with the database
@@ -88,7 +88,13 @@ The read tasks tries to read all data from the specified data source using speci
Pulls updates from the specified data source using the specified queries.
+ dataSource - the id of the source that should be used
-+ startFromLastReadRow - task tries to load previous state via `data-provider` if this option is `true`
++ startFromLastReadRow - task tries to load previous state via `data-provider` if this option is `true`
++ resetStateParameters - optional parameters to scheduled reset internal state and re-init task.
+ + afterDate - optional parameter with date time in [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT) format, for example: `"2023-11-14T12:12:34.567890123Z"`
+ The option is used to set single reset at date.
+ + afterTime - optional parameter with time in [ISO_LOCAL_TIME](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_TIME) format, for example: `"12:12:34.567890123"`
+ The time value must be specified in the UTC zone.
+ The option is used to set every day reset at time.
+ initQueryId - the id of the query that should be used to retrieve the current state of the database.
NOTE: this parameter is used to initialize state and read-db doesn't publish retrieved messages to MQ router.
+ initParameters - the parameters that should be used in the init query. Also, The task uses these parameters to configure the first `updateQuery` execution if `initQuery` parameter is not specified
@@ -102,8 +108,9 @@ Pulls updates from the specified data source using the specified queries.
This type of task work by the algorithm:
1) Initialize parameters for the first `updateQuery`
- * task tris to load the last message with `th2.pull_task.update_hash` property published to Cradle if startFromLastReadRow is `true`.
- NOTE: if read-db isn't connected to a data-provider [Go to gRPC client configuration](#client), the task failures.
+ * task tris to load the last message with `th2.pull_task.update_hash` property published to Cradle if startFromLastReadRow is `true`.
+ the time boundary for message loading is the nearest reset time calculated by `resetStateParameters` option if set, otherwise the execution time minus one day
+ **NOTE**: if read-db isn't connected to a data-provider [Go to gRPC client configuration](#client), the task failures.
* if `startFromLastReadRow` is `false` or no one message hasn't been published into Cradle by related session alias, task tries to execute init query.
* if init query is `null`, task uses `initProperties` to initialize property for the first `updateQuery` run.
NOTE: if `initProperties` doesn't defined, the first `updateQuery` is run with `NULL` value for all used parameters
@@ -283,6 +290,16 @@ spec:
## Changes
+### 0.5.0
+
+#### Feature:
+
++ added the `reset state parameters` option to configure static or dynamic dates of reset
+
+#### Update:
+
++ grpc-read-db: `0.0.5`
+
### 0.4.0
#### Feature:
@@ -292,6 +309,7 @@ spec:
+ pull task optionally loads the last message for initialisation from a data-provider via gRPC
#### Update:
+
+ common: `5.7.1-dev`
+ grpc-service-generator: `3.5.1`
+ grpc-read-db: `0.0.4`
diff --git a/app/gradle.properties b/app/gradle.properties
index 3926126..6bd63e9 100644
--- a/app/gradle.properties
+++ b/app/gradle.properties
@@ -1,3 +1,3 @@
kotlin.code.style=official
-release_version=0.4.0
+release_version=0.5.0
description=read-db component for extracting data from databases using JDBC technology
\ No newline at end of file
diff --git a/core/gradle.properties b/core/gradle.properties
index 6272792..eed2de6 100644
--- a/core/gradle.properties
+++ b/core/gradle.properties
@@ -1,3 +1,3 @@
kotlin.code.style=official
-release_version=0.4.0
+release_version=0.5.0
description=core part of read db to create an application with required JDBC drivers in the classpath
\ No newline at end of file
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt
index 4886ac9..fec0382 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt
@@ -37,6 +37,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.launch
import mu.KotlinLogging
+import java.time.Clock
import java.time.Duration
class DataBaseReader(
@@ -67,6 +68,7 @@ class DataBaseReader(
scope.submitTask(
task.dataSource,
task.startFromLastReadRow,
+ task.resetStateParameters,
task.initQueryId,
task.initParameters,
task.useColumns,
@@ -113,6 +115,7 @@ class DataBaseReader(
scope.submitTask(
dataSourceId,
startFromLastReadRow,
+ resetStateParameters,
initQueryId,
initParameters,
useColumns,
@@ -187,12 +190,13 @@ class DataBaseReader(
pullingListener: UpdateListener,
rowListener: RowListener,
messageLoader: MessageLoader,
+ clock: Clock = Clock.systemDefaultZone()
): DataBaseReader {
val sourceProvider: DataSourceProvider = BaseDataSourceProvider(configuration.dataSources)
val queryProvider: QueryProvider = BaseQueryProvider(configuration.queries)
val dataBaseService: DataBaseService = DataBaseServiceImpl(sourceProvider, queryProvider)
val hashService: HashService = BaseHashServiceImpl(sourceProvider, queryProvider)
- val monitorService: DataBaseMonitorService = DataBaseMonitorServiceImpl(dataBaseService, hashService)
+ val monitorService: DataBaseMonitorService = DataBaseMonitorServiceImpl(dataBaseService, hashService, clock)
return DataBaseReader(
dataBaseService,
monitorService,
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt
index 341a88d..0ae938e 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt
@@ -23,6 +23,8 @@ import com.exactpro.th2.read.db.core.QueryId
import com.exactpro.th2.read.db.core.QueryParametersValues
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonTypeName
+import java.time.Instant
+import java.time.LocalTime
class DataBaseReaderConfiguration(
val dataSources: Map,
@@ -57,6 +59,7 @@ data class ReadTaskConfiguration(
data class PullTaskConfiguration(
val dataSource: DataSourceId,
val startFromLastReadRow: Boolean = false,
+ val resetStateParameters: ResetState = ResetState(),
val initQueryId: QueryId?,
val initParameters: QueryParametersValues = emptyMap(),
val updateQueryId: QueryId,
@@ -65,6 +68,11 @@ data class PullTaskConfiguration(
val interval: Long,
) : StartupTaskConfiguration
+data class ResetState(
+ val afterDate: Instant? = null,
+ val afterTime: LocalTime? = null,
+)
+
fun DataBaseReaderConfiguration.validate(): List {
val sourceIDs = dataSources.keys
val queryIDs = queries.keys
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt
index 3d7f438..f22a143 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt
@@ -30,6 +30,7 @@ class ExecuteQueryRequest(
class PullTableRequest(
val dataSourceId: DataSourceId,
val startFromLastReadRow: Boolean,
+ val resetStateParameters: ResetState,
val initQueryId: QueryId?,
val initParameters: QueryParametersValues,
val useColumns: Set,
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt
index d28f832..5efaa68 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt
@@ -45,6 +45,8 @@ import com.exactpro.th2.read.db.core.TableRow
import com.exactpro.th2.read.db.core.UpdateListener
import com.exactpro.th2.read.db.impl.grpc.DataBaseReaderGrpcServer
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import com.google.protobuf.Timestamp
+import com.google.protobuf.util.Timestamps
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
@@ -195,14 +197,23 @@ private fun createMessageLoader(
componentBookName: String
) = runCatching {
MessageSearcher.create(factory.grpcRouter.getService(DataProviderService::class.java)).run {
- MessageLoader { dataSourceId, properties ->
+ MessageLoader { dataSourceId, horizon, properties ->
+ val horizonTimestamp: Timestamp? = horizon?.toTimestamp()
+ val searcherInterval: Duration = horizon?.let {
+ // FIXME: add overload with explicit end time for findLastOrNull method
+ Duration.between(it, Instant.now()).plusHours(1)
+ } ?: Duration.ofDays(1)
+
findLastOrNull(
book = componentBookName,
sessionAlias = dataSourceId.id,
direction = FIRST,
- searchInterval = Duration.ofDays(1),
+ searchInterval = searcherInterval,
) {
- properties.all { (key, value) -> it.message.getMessagePropertiesOrDefault(key, null) == value }
+ with(it.message) {
+ (horizon?.let { Timestamps.compare(horizonTimestamp, messageId.timestamp) < 0 } ?: true)
+ && properties.all { (key, value) -> getMessagePropertiesOrDefault(key, null) == value }
+ }
}?.toTableRow()
}
}
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt
index 2810da4..33e0dac 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt
@@ -16,13 +16,20 @@
package com.exactpro.th2.read.db.core
+import com.exactpro.th2.read.db.app.ResetState
import kotlinx.coroutines.CoroutineScope
+import mu.KotlinLogging
import java.time.Duration
+import java.time.Instant
+import java.time.LocalTime
+import java.time.ZoneOffset
+import java.time.temporal.ChronoUnit
interface DataBaseMonitorService : AutoCloseable {
fun CoroutineScope.submitTask(
dataSourceId: DataSourceId,
startFromLastReadRow: Boolean,
+ resetStateParameters: ResetState,
initQueryId: QueryId?,
initParameters: QueryParametersValues,
useColumns: Set,
@@ -36,6 +43,47 @@ interface DataBaseMonitorService : AutoCloseable {
suspend fun cancelTask(id: TaskId)
companion object {
+ private val LOGGER = KotlinLogging.logger { }
internal const val TH2_PULL_TASK_UPDATE_HASH_PROPERTY = "th2.pull_task.update_hash"
+
+ /**
+ * Calculates the nearest reset date before or equal [current]. [current] time is excluded
+ * @return null when:
+ * * [ResetState.afterDate] and [ResetState.afterTime] are null
+ * * [ResetState.afterDate] is after [current] and [ResetState.afterTime] is null
+ *
+ * otherwise [Instant]
+ */
+ internal fun ResetState.calculateNearestResetDate(current: Instant): Instant? {
+ val boundaryByAfterDate: Instant = afterDate?.let {
+ if (it > current) {
+ null
+ } else {
+ it
+ }
+ } ?: Instant.MIN
+ val boundaryAfterTime: Instant = afterTime?.let {
+ val currentRestDate = current.withTime(it)
+ if (currentRestDate > current) {
+ currentRestDate.minus(1L, ChronoUnit.DAYS)
+ } else {
+ currentRestDate
+ }
+
+ } ?: Instant.MIN
+
+ val result: Instant = maxOf(boundaryByAfterDate, boundaryAfterTime)
+
+ return (if (result == Instant.MIN) null else result).also {
+ LOGGER.trace { "Calculated nearest reset date, result: $it, data boundary: $boundaryByAfterDate, time boundary: $boundaryAfterTime, after date: $afterDate, after time: $afterTime, now: $current" }
+ }
+ }
+
+ private fun Instant.withTime(time: LocalTime): Instant = atZone(ZoneOffset.UTC)
+ .withHour(time.hour)
+ .withMinute(time.minute)
+ .withSecond(time.second)
+ .withNano(time.nano)
+ .toInstant()
}
}
\ No newline at end of file
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt
index 8791595..8e0d966 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt
@@ -16,17 +16,21 @@
package com.exactpro.th2.read.db.core
+import java.time.Instant
+
fun interface MessageLoader {
/**
* Loads the most recent message from the Cradle produced by data source with [dataSourceId]
- * that has a specified set of [properties] in its metadata
+ * that message timestamp is after or equal [lowerBoundary] and has a specified set of [properties] in its metadata
* @param dataSourceId is used for session alias determination related to the data source
+ * @param lowerBoundary the time boundary for message loading. Only messages with a timestamp greater or equal to this value will be loaded if they exist.
+ * Must be lower than the current time if set
* @param properties is used for filtering messages to find the first suitable
*/
- fun load(dataSourceId: DataSourceId, properties: Map): TableRow?
+ fun load(dataSourceId: DataSourceId, lowerBoundary: Instant?, properties: Map): TableRow?
companion object {
@JvmField
- val DISABLED = MessageLoader { _, _ -> error("Message loader doesn't configured to execute request") }
+ val DISABLED = MessageLoader { _, _, _ -> error("Message loader doesn't configured to execute request") }
}
}
\ No newline at end of file
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt
index eb52c68..8c775f5 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt
@@ -16,9 +16,11 @@
package com.exactpro.th2.read.db.core.impl
+import com.exactpro.th2.read.db.app.ResetState
import com.exactpro.th2.read.db.core.DataSourceId
import com.exactpro.th2.read.db.core.DataBaseMonitorService
import com.exactpro.th2.read.db.core.DataBaseMonitorService.Companion.TH2_PULL_TASK_UPDATE_HASH_PROPERTY
+import com.exactpro.th2.read.db.core.DataBaseMonitorService.Companion.calculateNearestResetDate
import com.exactpro.th2.read.db.core.DataBaseService
import com.exactpro.th2.read.db.core.HashService
import com.exactpro.th2.read.db.core.HashService.Companion.calculateHash
@@ -39,13 +41,16 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import mu.KotlinLogging
+import java.time.Clock
import java.time.Duration
+import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
class DataBaseMonitorServiceImpl(
private val dataBaseService: DataBaseService,
private val hashService: HashService,
+ private val clock: Clock,
) : DataBaseMonitorService {
private val ids = AtomicInteger(1)
private val runningTasks: MutableMap = ConcurrentHashMap()
@@ -54,6 +59,7 @@ class DataBaseMonitorServiceImpl(
override fun CoroutineScope.submitTask(
dataSourceId: DataSourceId,
startFromLastReadRow: Boolean,
+ resetStateParameters: ResetState,
initQueryId: QueryId?,
initParameters: QueryParametersValues,
useColumns: Set,
@@ -73,11 +79,12 @@ class DataBaseMonitorServiceImpl(
poolUpdates(
dataSourceId,
startFromLastReadRow,
+ resetStateParameters,
initQueryId,
initParameters,
useColumns,
updateParameters,
- interval,
+ interval.toMillis(),
updateQueryId,
updateListener,
messageLoader
@@ -108,11 +115,12 @@ class DataBaseMonitorServiceImpl(
private suspend fun poolUpdates(
dataSourceId: DataSourceId,
startFromLastReadRow: Boolean,
+ resetStateParameters: ResetState,
initQueryId: QueryId?,
initParameters: QueryParametersValues,
useColumns: Set,
updateParameters: QueryParametersValues,
- interval: Duration,
+ intervalMilliseconds: Long,
updateQueryId: QueryId,
updateListener: UpdateListener,
messageLoader: MessageLoader
@@ -120,32 +128,47 @@ class DataBaseMonitorServiceImpl(
val properties = mapOf(
TH2_PULL_TASK_UPDATE_HASH_PROPERTY to hashService.calculateHash(dataSourceId, updateQueryId).toString()
)
+ val finalParameters: MutableMap> = hashMapOf()
+ var lastResetTime = Instant.MIN
+ var firstIteration = true
- val lastRow: TableRow? = when(startFromLastReadRow) {
- true -> messageLoader.load(dataSourceId, properties)
- else -> null
- } ?: initQueryId?.let { queryId ->
- dataBaseService.executeQuery(
- dataSourceId,
- queryId,
- initParameters,
- ).lastOrNull()
- }
-
- fun updateParameters(lastRow: TableRow): QueryParametersValues {
+ fun extractParameters(lastRow: TableRow): QueryParametersValues {
return useColumns.associateWith {
val value = lastRow.columns[it] ?: error("Missing required parameter $it from init query result $lastRow")
listOf(value.toString())
}
}
- val finalParameters: MutableMap> = if (lastRow == null) {
- updateParameters.toMutableMap().apply { putAll(initParameters) }
- } else {
- updateParameters.toMutableMap().apply { putAll(updateParameters(lastRow)) }
- }
do {
- delay(interval.toMillis())
+ val now = clock.instant()
+ val resetDate = resetStateParameters.calculateNearestResetDate(now)
+ if (firstIteration || (resetDate != null && lastResetTime < resetDate)) {
+ LOGGER.info { "Initialize update query parameters, is first: $firstIteration, nearest reset: $resetDate, last reset: $lastResetTime" }
+ firstIteration = false
+ lastResetTime = now
+
+ finalParameters.apply {
+ clear()
+ putAll(updateParameters)
+ }
+
+ val lastRow: TableRow? = when(startFromLastReadRow) {
+ true -> messageLoader.load(dataSourceId, resetDate, properties)
+ else -> null
+ } ?: initQueryId?.let { queryId ->
+ dataBaseService.executeQuery(
+ dataSourceId,
+ queryId,
+ initParameters,
+ ).lastOrNull()
+ }
+
+ if (lastRow == null) {
+ finalParameters.putAll(initParameters)
+ } else {
+ finalParameters.putAll(extractParameters(lastRow))
+ }
+ }
try {
dataBaseService.executeQuery(
@@ -157,11 +180,13 @@ class DataBaseMonitorServiceImpl(
}.onCompletion { reason ->
reason?.also { updateListener.onError(dataSourceId, it) }
}.lastOrNull()?.also {
- finalParameters.putAll(updateParameters(it))
+ finalParameters.putAll(extractParameters(it))
}
} catch (ex: Exception) {
updateListener.onError(dataSourceId, ex)
}
+
+ delay(intervalMilliseconds)
} while (currentCoroutineContext().isActive)
}
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt
index 813fe30..a5d7f34 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt
@@ -72,6 +72,7 @@ class DataBaseReaderGrpcServer(
PullTableRequest(
sourceId.toModel(),
startFromLastReadRow,
+ resetStateParameters.toModel(),
if (hasInitQueryId()) initQueryId.toModel() else null,
initParameters.toModel(),
useColumnsList.toSet(),
diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt
index d4eb1c6..6b4c1bb 100644
--- a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt
+++ b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt
@@ -16,13 +16,21 @@
package com.exactpro.th2.read.db.impl.grpc.util
+import com.exactpro.th2.common.util.toInstant
+import com.exactpro.th2.read.db.app.ResetState
import com.exactpro.th2.read.db.core.DataSourceId
import com.exactpro.th2.read.db.core.QueryParametersValues
import com.exactpro.th2.read.db.core.TaskId
+import com.exactpro.th2.read.db.grpc.DbPullResetState
import com.exactpro.th2.read.db.grpc.PullTaskId
import com.exactpro.th2.read.db.grpc.QueryId
import com.exactpro.th2.read.db.grpc.QueryParameters
import com.exactpro.th2.read.db.grpc.SourceId
+import com.google.protobuf.util.Timestamps
+import java.time.Instant
+import java.time.LocalTime
+import java.time.ZoneOffset
+import java.time.temporal.ChronoField
fun SourceId.toModel(): DataSourceId = DataSourceId(id)
@@ -32,4 +40,22 @@ fun QueryParameters.toModel(): QueryParametersValues = valuesMap.mapValues { lis
fun PullTaskId.toModel(): TaskId = TaskId(id)
-fun TaskId.toGrpc(): PullTaskId = PullTaskId.newBuilder().setId(id).build()
\ No newline at end of file
+fun TaskId.toGrpc(): PullTaskId = PullTaskId.newBuilder().setId(id).build()
+
+fun DbPullResetState.toModel(): ResetState {
+ val afterDate = if (hasAfterDate()) afterDate.toInstant() else null
+ val afterTime = if (hasAfterTime()) {
+ val instant: Instant = afterTime.toInstant()
+ check(
+ instant.get(ChronoField.DAY_OF_MONTH) == 1
+ && instant.get(ChronoField.MONTH_OF_YEAR) == 1
+ && instant.get(ChronoField.YEAR) == 1970
+ ) {
+ "'afterTime' field contains date units, expected '1970-01-01 ...', actual '${Timestamps.toString(afterTime)}'"
+ }
+ LocalTime.ofInstant(instant, ZoneOffset.UTC)
+ } else {
+ null
+ }
+ return ResetState(afterDate, afterTime)
+}
\ No newline at end of file
diff --git a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt
index 4a0d263..dfb3011 100644
--- a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt
+++ b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt
@@ -36,6 +36,8 @@ import strikt.assertions.isEqualTo
import strikt.assertions.single
import java.io.InputStream
import java.nio.file.Path
+import java.time.Instant
+import java.time.LocalTime
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class DataBaseReaderConfigurationTest {
@@ -126,6 +128,10 @@ internal class DataBaseReaderConfigurationTest {
PullTaskConfiguration(
dataSource,
true,
+ ResetState(
+ Instant.parse("2023-11-14T12:12:34.567890123Z"),
+ LocalTime.parse("12:12:34.567890123")
+ ),
queryId,
mapOf(
"param1" to listOf("value1")
@@ -250,6 +256,7 @@ internal class DataBaseReaderConfigurationTest {
PullTaskConfiguration(
dataSource,
true,
+ ResetState(),
null,
mapOf(
"param1" to listOf("value1")
@@ -332,6 +339,10 @@ internal class DataBaseReaderConfigurationTest {
{
"type": "pull",
"dataSource": "test",
+ "resetStateParameters": {
+ "afterDate": "2023-11-14T12:12:34.567890123Z",
+ "afterTime": "12:12:34.567890123"
+ },
"startFromLastReadRow": true,
"initQueryId": "test_query",
"initParameters": {
diff --git a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt
index b95c124..ddd5405 100644
--- a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt
+++ b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt
@@ -34,9 +34,9 @@ import com.exactpro.th2.read.db.core.impl.BaseDataSourceProvider
import com.exactpro.th2.read.db.core.impl.BaseHashServiceImpl
import com.exactpro.th2.read.db.core.impl.BaseQueryProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
+import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import mu.KotlinLogging
import org.junit.jupiter.api.AfterAll
@@ -48,24 +48,29 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import org.mockito.Mockito.timeout
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
+import org.mockito.kotlin.clearInvocations
import org.mockito.kotlin.eq
+import org.mockito.kotlin.isNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.same
import org.mockito.kotlin.spy
-import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoInteractions
+import org.mockito.kotlin.whenever
import strikt.api.expectThat
import strikt.assertions.containsExactly
import java.io.ByteArrayInputStream
import java.sql.Connection
import java.sql.Date
+import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.LocalDate
+import java.time.LocalTime
import java.time.ZoneOffset
import java.time.temporal.ChronoUnit
@@ -200,6 +205,7 @@ internal class DataBaseReaderIntegrationTest {
PullTableRequest(
DataSourceId("persons"),
false,
+ ResetState(),
QueryId("current_state"),
emptyMap(),
setOf("id"),
@@ -216,11 +222,10 @@ internal class DataBaseReaderIntegrationTest {
insertData(newData)
advanceTimeBy(interval.toMillis() * 2)
- delay(interval.toMillis() * 2)
- genericUpdateListener.assertCaptured(newData)
+ genericUpdateListener.assertCaptured(newData, interval.toMillis() * 2)
listener.assertCaptured(newData)
- verify(messageLoader, never()).load(any(), any())
+ verify(messageLoader, never()).load(any(), isNull(), any())
verifyNoInteractions(genericRowListener)
reader.stopPullTask(taskId)
@@ -261,6 +266,7 @@ internal class DataBaseReaderIntegrationTest {
PullTableRequest(
DataSourceId("persons"),
false,
+ ResetState(),
null,
mapOf("id" to listOf(startId.toString())),
setOf("id"),
@@ -279,11 +285,10 @@ internal class DataBaseReaderIntegrationTest {
val pulledData = (persons + newData).drop(startId)
advanceTimeBy(interval.toMillis() * 2)
- delay(interval.toMillis() * 2)
- genericUpdateListener.assertCaptured(pulledData)
+ genericUpdateListener.assertCaptured(pulledData, interval.toMillis() * 2)
listener.assertCaptured(pulledData)
- verify(messageLoader, never()).load(any(), any())
+ verify(messageLoader, never()).load(any(), isNull(), any())
verifyNoInteractions(genericRowListener)
reader.stopPullTask(taskId)
@@ -297,7 +302,7 @@ internal class DataBaseReaderIntegrationTest {
val genericUpdateListener = mock { }
val genericRowListener = mock { }
val messageLoader = mock {
- on { load(any(), any()) }.thenReturn(persons[startId - 1].toTableRow(startId))
+ on { load(any(), isNull(), any()) }.thenReturn(persons[startId - 1].toTableRow(startId))
}
val interval = Duration.ofMillis(100)
@@ -335,6 +340,7 @@ internal class DataBaseReaderIntegrationTest {
PullTableRequest(
dataSourceId,
true,
+ ResetState(),
null,
emptyMap(),
setOf("id"),
@@ -353,12 +359,12 @@ internal class DataBaseReaderIntegrationTest {
val pulledData = (persons + newData).drop(startId)
advanceTimeBy(interval.toMillis() * 2)
- delay(interval.toMillis() * 2)
- genericUpdateListener.assertCaptured(pulledData)
+ genericUpdateListener.assertCaptured(pulledData, interval.toMillis() * 2)
listener.assertCaptured(pulledData)
verify(messageLoader).load(
same(dataSourceId),
+ isNull(),
eq(
mapOf(
TH2_PULL_TASK_UPDATE_HASH_PROPERTY to hashService.calculateHash(dataSourceId, queryId).toString()
@@ -372,10 +378,171 @@ internal class DataBaseReaderIntegrationTest {
}
}
- private fun UpdateListener.assertCaptured(persons: List) {
+ @Test
+ fun `reset internal state by afterDate`() {
+ val instant = Instant.now()
+ val resetDate = instant.plus(1, ChronoUnit.MINUTES)
+ val interval = Duration.ofMillis(100)
+
+ val genericUpdateListener = mock { }
+ val genericRowListener = mock { }
+ val messageLoader = mock { }
+ val clock = mock {
+ on { instant() }.thenReturn(instant)
+ }
+
+ runTest {
+ val reader = DataBaseReader.createDataBaseReader(
+ DataBaseReaderConfiguration(
+ mapOf(
+ DataSourceId("persons") to DataSourceConfiguration(
+ mysql.jdbcUrl,
+ mysql.username,
+ mysql.password,
+ )
+ ),
+ mapOf(
+ QueryId("updates") to QueryConfiguration(
+ "SELECT * FROM test_data.person WHERE id > \${id:integer}"
+ )
+ )
+ ),
+ this,
+ genericUpdateListener,
+ genericRowListener,
+ messageLoader,
+ clock
+ )
+ val listener = mock { }
+ val taskId = reader.submitPullTask(
+ PullTableRequest(
+ DataSourceId("persons"),
+ false,
+ ResetState(
+ afterDate = resetDate
+ ),
+ null,
+ mapOf("id" to listOf(persons.size.toString())),
+ setOf("id"),
+ QueryId("updates"),
+ emptyMap(),
+ interval,
+ ),
+ listener,
+ )
+
+ runCurrent()
+
+ val newData: List = (1..10).map { Person("new$it", Instant.now().truncatedTo(ChronoUnit.DAYS), "test-new-data-$it".toByteArray()) }
+ insertData(newData)
+
+ advanceTimeBy(interval.toMillis() * 2)
+
+ genericUpdateListener.assertCaptured(newData, interval.toMillis())
+ listener.assertCaptured(newData)
+ verify(messageLoader, never()).load(any(), isNull(), any())
+ verifyNoInteractions(genericRowListener)
+
+ clearInvocations(genericUpdateListener)
+ clearInvocations(listener)
+ whenever(clock.instant()).thenReturn(resetDate)
+ advanceTimeBy(interval.toMillis() * 2)
+
+ genericUpdateListener.assertCaptured(newData, interval.toMillis())
+ listener.assertCaptured(newData)
+ verify(messageLoader, never()).load(any(), isNull(), any())
+ verifyNoInteractions(genericRowListener)
+
+ reader.stopPullTask(taskId)
+ advanceUntilIdle()
+ }
+ }
+
+ @Test
+ fun `reset internal state by afterTime`() {
+ val instant = Instant.now()
+ val resetDate = instant.plus(1, ChronoUnit.MINUTES)
+ val resetTime = LocalTime.ofInstant(resetDate, ZoneOffset.UTC)
+ val interval = Duration.ofMillis(100)
+
+ val genericUpdateListener = mock { }
+ val genericRowListener = mock { }
+ val messageLoader = mock { }
+ val clock = mock {
+ on { instant() }.thenReturn(instant)
+ }
+
+ runTest {
+ val reader = DataBaseReader.createDataBaseReader(
+ DataBaseReaderConfiguration(
+ mapOf(
+ DataSourceId("persons") to DataSourceConfiguration(
+ mysql.jdbcUrl,
+ mysql.username,
+ mysql.password,
+ )
+ ),
+ mapOf(
+ QueryId("updates") to QueryConfiguration(
+ "SELECT * FROM test_data.person WHERE id > \${id:integer}"
+ )
+ )
+ ),
+ this,
+ genericUpdateListener,
+ genericRowListener,
+ messageLoader,
+ clock
+ )
+ val listener = mock { }
+ val taskId = reader.submitPullTask(
+ PullTableRequest(
+ DataSourceId("persons"),
+ false,
+ ResetState(
+ afterTime = resetTime
+ ),
+ null,
+ mapOf("id" to listOf(persons.size.toString())),
+ setOf("id"),
+ QueryId("updates"),
+ emptyMap(),
+ interval,
+ ),
+ listener,
+ )
+
+ runCurrent()
+
+ val newData: List = (1..10).map { Person("new$it", Instant.now().truncatedTo(ChronoUnit.DAYS), "test-new-data-$it".toByteArray()) }
+ insertData(newData)
+
+ advanceTimeBy(interval.toMillis() * 2)
+
+ genericUpdateListener.assertCaptured(newData, interval.toMillis())
+ listener.assertCaptured(newData)
+ verify(messageLoader, never()).load(any(), isNull(), any())
+ verifyNoInteractions(genericRowListener)
+
+ clearInvocations(genericUpdateListener)
+ clearInvocations(listener)
+ whenever(clock.instant()).thenReturn(resetDate)
+ advanceTimeBy(interval.toMillis() * 2)
+
+ genericUpdateListener.assertCaptured(newData, interval.toMillis())
+ listener.assertCaptured(newData)
+ verify(messageLoader, never()).load(any(), isNull(), any())
+ verifyNoInteractions(genericRowListener)
+
+ reader.stopPullTask(taskId)
+ advanceUntilIdle()
+ }
+ }
+
+ private fun UpdateListener.assertCaptured(persons: List, timeout: Long = 0) {
val tableRawCaptor = argumentCaptor()
val propertiesCaptor = argumentCaptor