From 224c77f0e2535441fbf0768868cc5fc6d4dea7bc Mon Sep 17 00:00:00 2001 From: Nikita Smirnov <46124551+Nikita-Smirnov-Exactpro@users.noreply.github.com> Date: Fri, 24 Nov 2023 12:38:58 +0400 Subject: [PATCH] [TH2-5123] read-db pull task should be able to invalidate state read data (#14) Co-authored-by: Oleg Smirnov --- README.md | 26 ++- app/gradle.properties | 2 +- core/gradle.properties | 2 +- .../th2/read/db/app/DataBaseReader.kt | 6 +- .../db/app/DataBaseReaderConfiguration.kt | 8 + .../com/exactpro/th2/read/db/app/Requests.kt | 1 + .../exactpro/th2/read/db/bootstrap/Main.kt | 17 +- .../read/db/core/DataBaseMonitorService.kt | 48 +++++ .../th2/read/db/core/MessageLoader.kt | 10 +- .../core/impl/DataBaseMonitorServiceImpl.kt | 67 ++++-- .../db/impl/grpc/DataBaseReaderGrpcServer.kt | 1 + .../th2/read/db/impl/grpc/util/GrpcUtil.kt | 28 ++- .../db/app/DataBaseReaderConfigurationTest.kt | 11 + .../db/app/DataBaseReaderIntegrationTest.kt | 197 ++++++++++++++++-- .../db/core/DataBaseMonitorServiceTest.kt | 115 ++++++++++ grpc/README.md | 5 +- grpc/gradle.properties | 2 +- grpc/proto/th2_grpc_read_db/read_db.proto | 7 + 18 files changed, 501 insertions(+), 52 deletions(-) create mode 100644 core/src/test/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorServiceTest.kt diff --git a/README.md b/README.md index fafcc88..e8256c6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-read-db 0.4.0 +# th2-read-db 0.5.0 The read-db is a component for extracting data from databases using JDBC technology. If database has JDBC driver the read can work with the database @@ -88,7 +88,13 @@ The read tasks tries to read all data from the specified data source using speci Pulls updates from the specified data source using the specified queries. + dataSource - the id of the source that should be used -+ startFromLastReadRow - task tries to load previous state via `data-provider` if this option is `true` ++ startFromLastReadRow - task tries to load previous state via `data-provider` if this option is `true` ++ resetStateParameters - optional parameters to scheduled reset internal state and re-init task. + + afterDate - optional parameter with date time in [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT) format, for example: `"2023-11-14T12:12:34.567890123Z"` + The option is used to set single reset at date. + + afterTime - optional parameter with time in [ISO_LOCAL_TIME](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_TIME) format, for example: `"12:12:34.567890123"` + The time value must be specified in the UTC zone. + The option is used to set every day reset at time. + initQueryId - the id of the query that should be used to retrieve the current state of the database. NOTE: this parameter is used to initialize state and read-db doesn't publish retrieved messages to MQ router. + initParameters - the parameters that should be used in the init query. Also, The task uses these parameters to configure the first `updateQuery` execution if `initQuery` parameter is not specified @@ -102,8 +108,9 @@ Pulls updates from the specified data source using the specified queries. This type of task work by the algorithm: 1) Initialize parameters for the first `updateQuery` - * task tris to load the last message with `th2.pull_task.update_hash` property published to Cradle if startFromLastReadRow is `true`.
- NOTE: if read-db isn't connected to a data-provider [Go to gRPC client configuration](#client), the task failures. + * task tris to load the last message with `th2.pull_task.update_hash` property published to Cradle if startFromLastReadRow is `true`. + the time boundary for message loading is the nearest reset time calculated by `resetStateParameters` option if set, otherwise the execution time minus one day
+ **NOTE**: if read-db isn't connected to a data-provider [Go to gRPC client configuration](#client), the task failures. * if `startFromLastReadRow` is `false` or no one message hasn't been published into Cradle by related session alias, task tries to execute init query. * if init query is `null`, task uses `initProperties` to initialize property for the first `updateQuery` run.
NOTE: if `initProperties` doesn't defined, the first `updateQuery` is run with `NULL` value for all used parameters @@ -283,6 +290,16 @@ spec: ## Changes +### 0.5.0 + +#### Feature: + ++ added the `reset state parameters` option to configure static or dynamic dates of reset + +#### Update: + ++ grpc-read-db: `0.0.5` + ### 0.4.0 #### Feature: @@ -292,6 +309,7 @@ spec: + pull task optionally loads the last message for initialisation from a data-provider via gRPC #### Update: + + common: `5.7.1-dev` + grpc-service-generator: `3.5.1` + grpc-read-db: `0.0.4` diff --git a/app/gradle.properties b/app/gradle.properties index 3926126..6bd63e9 100644 --- a/app/gradle.properties +++ b/app/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=0.4.0 +release_version=0.5.0 description=read-db component for extracting data from databases using JDBC technology \ No newline at end of file diff --git a/core/gradle.properties b/core/gradle.properties index 6272792..eed2de6 100644 --- a/core/gradle.properties +++ b/core/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=0.4.0 +release_version=0.5.0 description=core part of read db to create an application with required JDBC drivers in the classpath \ No newline at end of file diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt index 4886ac9..fec0382 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt @@ -37,6 +37,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.launch import mu.KotlinLogging +import java.time.Clock import java.time.Duration class DataBaseReader( @@ -67,6 +68,7 @@ class DataBaseReader( scope.submitTask( task.dataSource, task.startFromLastReadRow, + task.resetStateParameters, task.initQueryId, task.initParameters, task.useColumns, @@ -113,6 +115,7 @@ class DataBaseReader( scope.submitTask( dataSourceId, startFromLastReadRow, + resetStateParameters, initQueryId, initParameters, useColumns, @@ -187,12 +190,13 @@ class DataBaseReader( pullingListener: UpdateListener, rowListener: RowListener, messageLoader: MessageLoader, + clock: Clock = Clock.systemDefaultZone() ): DataBaseReader { val sourceProvider: DataSourceProvider = BaseDataSourceProvider(configuration.dataSources) val queryProvider: QueryProvider = BaseQueryProvider(configuration.queries) val dataBaseService: DataBaseService = DataBaseServiceImpl(sourceProvider, queryProvider) val hashService: HashService = BaseHashServiceImpl(sourceProvider, queryProvider) - val monitorService: DataBaseMonitorService = DataBaseMonitorServiceImpl(dataBaseService, hashService) + val monitorService: DataBaseMonitorService = DataBaseMonitorServiceImpl(dataBaseService, hashService, clock) return DataBaseReader( dataBaseService, monitorService, diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt index 341a88d..0ae938e 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt @@ -23,6 +23,8 @@ import com.exactpro.th2.read.db.core.QueryId import com.exactpro.th2.read.db.core.QueryParametersValues import com.fasterxml.jackson.annotation.JsonTypeInfo import com.fasterxml.jackson.annotation.JsonTypeName +import java.time.Instant +import java.time.LocalTime class DataBaseReaderConfiguration( val dataSources: Map, @@ -57,6 +59,7 @@ data class ReadTaskConfiguration( data class PullTaskConfiguration( val dataSource: DataSourceId, val startFromLastReadRow: Boolean = false, + val resetStateParameters: ResetState = ResetState(), val initQueryId: QueryId?, val initParameters: QueryParametersValues = emptyMap(), val updateQueryId: QueryId, @@ -65,6 +68,11 @@ data class PullTaskConfiguration( val interval: Long, ) : StartupTaskConfiguration +data class ResetState( + val afterDate: Instant? = null, + val afterTime: LocalTime? = null, +) + fun DataBaseReaderConfiguration.validate(): List { val sourceIDs = dataSources.keys val queryIDs = queries.keys diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt index 3d7f438..f22a143 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt @@ -30,6 +30,7 @@ class ExecuteQueryRequest( class PullTableRequest( val dataSourceId: DataSourceId, val startFromLastReadRow: Boolean, + val resetStateParameters: ResetState, val initQueryId: QueryId?, val initParameters: QueryParametersValues, val useColumns: Set, diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt index d28f832..5efaa68 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt @@ -45,6 +45,8 @@ import com.exactpro.th2.read.db.core.TableRow import com.exactpro.th2.read.db.core.UpdateListener import com.exactpro.th2.read.db.impl.grpc.DataBaseReaderGrpcServer import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.google.protobuf.Timestamp +import com.google.protobuf.util.Timestamps import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -195,14 +197,23 @@ private fun createMessageLoader( componentBookName: String ) = runCatching { MessageSearcher.create(factory.grpcRouter.getService(DataProviderService::class.java)).run { - MessageLoader { dataSourceId, properties -> + MessageLoader { dataSourceId, horizon, properties -> + val horizonTimestamp: Timestamp? = horizon?.toTimestamp() + val searcherInterval: Duration = horizon?.let { + // FIXME: add overload with explicit end time for findLastOrNull method + Duration.between(it, Instant.now()).plusHours(1) + } ?: Duration.ofDays(1) + findLastOrNull( book = componentBookName, sessionAlias = dataSourceId.id, direction = FIRST, - searchInterval = Duration.ofDays(1), + searchInterval = searcherInterval, ) { - properties.all { (key, value) -> it.message.getMessagePropertiesOrDefault(key, null) == value } + with(it.message) { + (horizon?.let { Timestamps.compare(horizonTimestamp, messageId.timestamp) < 0 } ?: true) + && properties.all { (key, value) -> getMessagePropertiesOrDefault(key, null) == value } + } }?.toTableRow() } } diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt index 2810da4..33e0dac 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorService.kt @@ -16,13 +16,20 @@ package com.exactpro.th2.read.db.core +import com.exactpro.th2.read.db.app.ResetState import kotlinx.coroutines.CoroutineScope +import mu.KotlinLogging import java.time.Duration +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneOffset +import java.time.temporal.ChronoUnit interface DataBaseMonitorService : AutoCloseable { fun CoroutineScope.submitTask( dataSourceId: DataSourceId, startFromLastReadRow: Boolean, + resetStateParameters: ResetState, initQueryId: QueryId?, initParameters: QueryParametersValues, useColumns: Set, @@ -36,6 +43,47 @@ interface DataBaseMonitorService : AutoCloseable { suspend fun cancelTask(id: TaskId) companion object { + private val LOGGER = KotlinLogging.logger { } internal const val TH2_PULL_TASK_UPDATE_HASH_PROPERTY = "th2.pull_task.update_hash" + + /** + * Calculates the nearest reset date before or equal [current]. [current] time is excluded + * @return null when:
+ * * [ResetState.afterDate] and [ResetState.afterTime] are null + * * [ResetState.afterDate] is after [current] and [ResetState.afterTime] is null + * + * otherwise [Instant] + */ + internal fun ResetState.calculateNearestResetDate(current: Instant): Instant? { + val boundaryByAfterDate: Instant = afterDate?.let { + if (it > current) { + null + } else { + it + } + } ?: Instant.MIN + val boundaryAfterTime: Instant = afterTime?.let { + val currentRestDate = current.withTime(it) + if (currentRestDate > current) { + currentRestDate.minus(1L, ChronoUnit.DAYS) + } else { + currentRestDate + } + + } ?: Instant.MIN + + val result: Instant = maxOf(boundaryByAfterDate, boundaryAfterTime) + + return (if (result == Instant.MIN) null else result).also { + LOGGER.trace { "Calculated nearest reset date, result: $it, data boundary: $boundaryByAfterDate, time boundary: $boundaryAfterTime, after date: $afterDate, after time: $afterTime, now: $current" } + } + } + + private fun Instant.withTime(time: LocalTime): Instant = atZone(ZoneOffset.UTC) + .withHour(time.hour) + .withMinute(time.minute) + .withSecond(time.second) + .withNano(time.nano) + .toInstant() } } \ No newline at end of file diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt index 8791595..8e0d966 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/core/MessageLoader.kt @@ -16,17 +16,21 @@ package com.exactpro.th2.read.db.core +import java.time.Instant + fun interface MessageLoader { /** * Loads the most recent message from the Cradle produced by data source with [dataSourceId] - * that has a specified set of [properties] in its metadata + * that message timestamp is after or equal [lowerBoundary] and has a specified set of [properties] in its metadata * @param dataSourceId is used for session alias determination related to the data source + * @param lowerBoundary the time boundary for message loading. Only messages with a timestamp greater or equal to this value will be loaded if they exist. + * Must be lower than the current time if set * @param properties is used for filtering messages to find the first suitable */ - fun load(dataSourceId: DataSourceId, properties: Map): TableRow? + fun load(dataSourceId: DataSourceId, lowerBoundary: Instant?, properties: Map): TableRow? companion object { @JvmField - val DISABLED = MessageLoader { _, _ -> error("Message loader doesn't configured to execute request") } + val DISABLED = MessageLoader { _, _, _ -> error("Message loader doesn't configured to execute request") } } } \ No newline at end of file diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt index eb52c68..8c775f5 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/core/impl/DataBaseMonitorServiceImpl.kt @@ -16,9 +16,11 @@ package com.exactpro.th2.read.db.core.impl +import com.exactpro.th2.read.db.app.ResetState import com.exactpro.th2.read.db.core.DataSourceId import com.exactpro.th2.read.db.core.DataBaseMonitorService import com.exactpro.th2.read.db.core.DataBaseMonitorService.Companion.TH2_PULL_TASK_UPDATE_HASH_PROPERTY +import com.exactpro.th2.read.db.core.DataBaseMonitorService.Companion.calculateNearestResetDate import com.exactpro.th2.read.db.core.DataBaseService import com.exactpro.th2.read.db.core.HashService import com.exactpro.th2.read.db.core.HashService.Companion.calculateHash @@ -39,13 +41,16 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import mu.KotlinLogging +import java.time.Clock import java.time.Duration +import java.time.Instant import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger class DataBaseMonitorServiceImpl( private val dataBaseService: DataBaseService, private val hashService: HashService, + private val clock: Clock, ) : DataBaseMonitorService { private val ids = AtomicInteger(1) private val runningTasks: MutableMap = ConcurrentHashMap() @@ -54,6 +59,7 @@ class DataBaseMonitorServiceImpl( override fun CoroutineScope.submitTask( dataSourceId: DataSourceId, startFromLastReadRow: Boolean, + resetStateParameters: ResetState, initQueryId: QueryId?, initParameters: QueryParametersValues, useColumns: Set, @@ -73,11 +79,12 @@ class DataBaseMonitorServiceImpl( poolUpdates( dataSourceId, startFromLastReadRow, + resetStateParameters, initQueryId, initParameters, useColumns, updateParameters, - interval, + interval.toMillis(), updateQueryId, updateListener, messageLoader @@ -108,11 +115,12 @@ class DataBaseMonitorServiceImpl( private suspend fun poolUpdates( dataSourceId: DataSourceId, startFromLastReadRow: Boolean, + resetStateParameters: ResetState, initQueryId: QueryId?, initParameters: QueryParametersValues, useColumns: Set, updateParameters: QueryParametersValues, - interval: Duration, + intervalMilliseconds: Long, updateQueryId: QueryId, updateListener: UpdateListener, messageLoader: MessageLoader @@ -120,32 +128,47 @@ class DataBaseMonitorServiceImpl( val properties = mapOf( TH2_PULL_TASK_UPDATE_HASH_PROPERTY to hashService.calculateHash(dataSourceId, updateQueryId).toString() ) + val finalParameters: MutableMap> = hashMapOf() + var lastResetTime = Instant.MIN + var firstIteration = true - val lastRow: TableRow? = when(startFromLastReadRow) { - true -> messageLoader.load(dataSourceId, properties) - else -> null - } ?: initQueryId?.let { queryId -> - dataBaseService.executeQuery( - dataSourceId, - queryId, - initParameters, - ).lastOrNull() - } - - fun updateParameters(lastRow: TableRow): QueryParametersValues { + fun extractParameters(lastRow: TableRow): QueryParametersValues { return useColumns.associateWith { val value = lastRow.columns[it] ?: error("Missing required parameter $it from init query result $lastRow") listOf(value.toString()) } } - val finalParameters: MutableMap> = if (lastRow == null) { - updateParameters.toMutableMap().apply { putAll(initParameters) } - } else { - updateParameters.toMutableMap().apply { putAll(updateParameters(lastRow)) } - } do { - delay(interval.toMillis()) + val now = clock.instant() + val resetDate = resetStateParameters.calculateNearestResetDate(now) + if (firstIteration || (resetDate != null && lastResetTime < resetDate)) { + LOGGER.info { "Initialize update query parameters, is first: $firstIteration, nearest reset: $resetDate, last reset: $lastResetTime" } + firstIteration = false + lastResetTime = now + + finalParameters.apply { + clear() + putAll(updateParameters) + } + + val lastRow: TableRow? = when(startFromLastReadRow) { + true -> messageLoader.load(dataSourceId, resetDate, properties) + else -> null + } ?: initQueryId?.let { queryId -> + dataBaseService.executeQuery( + dataSourceId, + queryId, + initParameters, + ).lastOrNull() + } + + if (lastRow == null) { + finalParameters.putAll(initParameters) + } else { + finalParameters.putAll(extractParameters(lastRow)) + } + } try { dataBaseService.executeQuery( @@ -157,11 +180,13 @@ class DataBaseMonitorServiceImpl( }.onCompletion { reason -> reason?.also { updateListener.onError(dataSourceId, it) } }.lastOrNull()?.also { - finalParameters.putAll(updateParameters(it)) + finalParameters.putAll(extractParameters(it)) } } catch (ex: Exception) { updateListener.onError(dataSourceId, ex) } + + delay(intervalMilliseconds) } while (currentCoroutineContext().isActive) } diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt index 813fe30..a5d7f34 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/DataBaseReaderGrpcServer.kt @@ -72,6 +72,7 @@ class DataBaseReaderGrpcServer( PullTableRequest( sourceId.toModel(), startFromLastReadRow, + resetStateParameters.toModel(), if (hasInitQueryId()) initQueryId.toModel() else null, initParameters.toModel(), useColumnsList.toSet(), diff --git a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt index d4eb1c6..6b4c1bb 100644 --- a/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt +++ b/core/src/main/kotlin/com/exactpro/th2/read/db/impl/grpc/util/GrpcUtil.kt @@ -16,13 +16,21 @@ package com.exactpro.th2.read.db.impl.grpc.util +import com.exactpro.th2.common.util.toInstant +import com.exactpro.th2.read.db.app.ResetState import com.exactpro.th2.read.db.core.DataSourceId import com.exactpro.th2.read.db.core.QueryParametersValues import com.exactpro.th2.read.db.core.TaskId +import com.exactpro.th2.read.db.grpc.DbPullResetState import com.exactpro.th2.read.db.grpc.PullTaskId import com.exactpro.th2.read.db.grpc.QueryId import com.exactpro.th2.read.db.grpc.QueryParameters import com.exactpro.th2.read.db.grpc.SourceId +import com.google.protobuf.util.Timestamps +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneOffset +import java.time.temporal.ChronoField fun SourceId.toModel(): DataSourceId = DataSourceId(id) @@ -32,4 +40,22 @@ fun QueryParameters.toModel(): QueryParametersValues = valuesMap.mapValues { lis fun PullTaskId.toModel(): TaskId = TaskId(id) -fun TaskId.toGrpc(): PullTaskId = PullTaskId.newBuilder().setId(id).build() \ No newline at end of file +fun TaskId.toGrpc(): PullTaskId = PullTaskId.newBuilder().setId(id).build() + +fun DbPullResetState.toModel(): ResetState { + val afterDate = if (hasAfterDate()) afterDate.toInstant() else null + val afterTime = if (hasAfterTime()) { + val instant: Instant = afterTime.toInstant() + check( + instant.get(ChronoField.DAY_OF_MONTH) == 1 + && instant.get(ChronoField.MONTH_OF_YEAR) == 1 + && instant.get(ChronoField.YEAR) == 1970 + ) { + "'afterTime' field contains date units, expected '1970-01-01 ...', actual '${Timestamps.toString(afterTime)}'" + } + LocalTime.ofInstant(instant, ZoneOffset.UTC) + } else { + null + } + return ResetState(afterDate, afterTime) +} \ No newline at end of file diff --git a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt index 4a0d263..dfb3011 100644 --- a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt +++ b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfigurationTest.kt @@ -36,6 +36,8 @@ import strikt.assertions.isEqualTo import strikt.assertions.single import java.io.InputStream import java.nio.file.Path +import java.time.Instant +import java.time.LocalTime @TestInstance(TestInstance.Lifecycle.PER_CLASS) internal class DataBaseReaderConfigurationTest { @@ -126,6 +128,10 @@ internal class DataBaseReaderConfigurationTest { PullTaskConfiguration( dataSource, true, + ResetState( + Instant.parse("2023-11-14T12:12:34.567890123Z"), + LocalTime.parse("12:12:34.567890123") + ), queryId, mapOf( "param1" to listOf("value1") @@ -250,6 +256,7 @@ internal class DataBaseReaderConfigurationTest { PullTaskConfiguration( dataSource, true, + ResetState(), null, mapOf( "param1" to listOf("value1") @@ -332,6 +339,10 @@ internal class DataBaseReaderConfigurationTest { { "type": "pull", "dataSource": "test", + "resetStateParameters": { + "afterDate": "2023-11-14T12:12:34.567890123Z", + "afterTime": "12:12:34.567890123" + }, "startFromLastReadRow": true, "initQueryId": "test_query", "initParameters": { diff --git a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt index b95c124..ddd5405 100644 --- a/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt +++ b/core/src/test/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderIntegrationTest.kt @@ -34,9 +34,9 @@ import com.exactpro.th2.read.db.core.impl.BaseDataSourceProvider import com.exactpro.th2.read.db.core.impl.BaseHashServiceImpl import com.exactpro.th2.read.db.core.impl.BaseQueryProvider import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import mu.KotlinLogging import org.junit.jupiter.api.AfterAll @@ -48,24 +48,29 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import org.mockito.Mockito.timeout import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.clearInvocations import org.mockito.kotlin.eq +import org.mockito.kotlin.isNull import org.mockito.kotlin.mock import org.mockito.kotlin.never import org.mockito.kotlin.same import org.mockito.kotlin.spy -import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.verifyNoInteractions +import org.mockito.kotlin.whenever import strikt.api.expectThat import strikt.assertions.containsExactly import java.io.ByteArrayInputStream import java.sql.Connection import java.sql.Date +import java.time.Clock import java.time.Duration import java.time.Instant import java.time.LocalDate +import java.time.LocalTime import java.time.ZoneOffset import java.time.temporal.ChronoUnit @@ -200,6 +205,7 @@ internal class DataBaseReaderIntegrationTest { PullTableRequest( DataSourceId("persons"), false, + ResetState(), QueryId("current_state"), emptyMap(), setOf("id"), @@ -216,11 +222,10 @@ internal class DataBaseReaderIntegrationTest { insertData(newData) advanceTimeBy(interval.toMillis() * 2) - delay(interval.toMillis() * 2) - genericUpdateListener.assertCaptured(newData) + genericUpdateListener.assertCaptured(newData, interval.toMillis() * 2) listener.assertCaptured(newData) - verify(messageLoader, never()).load(any(), any()) + verify(messageLoader, never()).load(any(), isNull(), any()) verifyNoInteractions(genericRowListener) reader.stopPullTask(taskId) @@ -261,6 +266,7 @@ internal class DataBaseReaderIntegrationTest { PullTableRequest( DataSourceId("persons"), false, + ResetState(), null, mapOf("id" to listOf(startId.toString())), setOf("id"), @@ -279,11 +285,10 @@ internal class DataBaseReaderIntegrationTest { val pulledData = (persons + newData).drop(startId) advanceTimeBy(interval.toMillis() * 2) - delay(interval.toMillis() * 2) - genericUpdateListener.assertCaptured(pulledData) + genericUpdateListener.assertCaptured(pulledData, interval.toMillis() * 2) listener.assertCaptured(pulledData) - verify(messageLoader, never()).load(any(), any()) + verify(messageLoader, never()).load(any(), isNull(), any()) verifyNoInteractions(genericRowListener) reader.stopPullTask(taskId) @@ -297,7 +302,7 @@ internal class DataBaseReaderIntegrationTest { val genericUpdateListener = mock { } val genericRowListener = mock { } val messageLoader = mock { - on { load(any(), any()) }.thenReturn(persons[startId - 1].toTableRow(startId)) + on { load(any(), isNull(), any()) }.thenReturn(persons[startId - 1].toTableRow(startId)) } val interval = Duration.ofMillis(100) @@ -335,6 +340,7 @@ internal class DataBaseReaderIntegrationTest { PullTableRequest( dataSourceId, true, + ResetState(), null, emptyMap(), setOf("id"), @@ -353,12 +359,12 @@ internal class DataBaseReaderIntegrationTest { val pulledData = (persons + newData).drop(startId) advanceTimeBy(interval.toMillis() * 2) - delay(interval.toMillis() * 2) - genericUpdateListener.assertCaptured(pulledData) + genericUpdateListener.assertCaptured(pulledData, interval.toMillis() * 2) listener.assertCaptured(pulledData) verify(messageLoader).load( same(dataSourceId), + isNull(), eq( mapOf( TH2_PULL_TASK_UPDATE_HASH_PROPERTY to hashService.calculateHash(dataSourceId, queryId).toString() @@ -372,10 +378,171 @@ internal class DataBaseReaderIntegrationTest { } } - private fun UpdateListener.assertCaptured(persons: List) { + @Test + fun `reset internal state by afterDate`() { + val instant = Instant.now() + val resetDate = instant.plus(1, ChronoUnit.MINUTES) + val interval = Duration.ofMillis(100) + + val genericUpdateListener = mock { } + val genericRowListener = mock { } + val messageLoader = mock { } + val clock = mock { + on { instant() }.thenReturn(instant) + } + + runTest { + val reader = DataBaseReader.createDataBaseReader( + DataBaseReaderConfiguration( + mapOf( + DataSourceId("persons") to DataSourceConfiguration( + mysql.jdbcUrl, + mysql.username, + mysql.password, + ) + ), + mapOf( + QueryId("updates") to QueryConfiguration( + "SELECT * FROM test_data.person WHERE id > \${id:integer}" + ) + ) + ), + this, + genericUpdateListener, + genericRowListener, + messageLoader, + clock + ) + val listener = mock { } + val taskId = reader.submitPullTask( + PullTableRequest( + DataSourceId("persons"), + false, + ResetState( + afterDate = resetDate + ), + null, + mapOf("id" to listOf(persons.size.toString())), + setOf("id"), + QueryId("updates"), + emptyMap(), + interval, + ), + listener, + ) + + runCurrent() + + val newData: List = (1..10).map { Person("new$it", Instant.now().truncatedTo(ChronoUnit.DAYS), "test-new-data-$it".toByteArray()) } + insertData(newData) + + advanceTimeBy(interval.toMillis() * 2) + + genericUpdateListener.assertCaptured(newData, interval.toMillis()) + listener.assertCaptured(newData) + verify(messageLoader, never()).load(any(), isNull(), any()) + verifyNoInteractions(genericRowListener) + + clearInvocations(genericUpdateListener) + clearInvocations(listener) + whenever(clock.instant()).thenReturn(resetDate) + advanceTimeBy(interval.toMillis() * 2) + + genericUpdateListener.assertCaptured(newData, interval.toMillis()) + listener.assertCaptured(newData) + verify(messageLoader, never()).load(any(), isNull(), any()) + verifyNoInteractions(genericRowListener) + + reader.stopPullTask(taskId) + advanceUntilIdle() + } + } + + @Test + fun `reset internal state by afterTime`() { + val instant = Instant.now() + val resetDate = instant.plus(1, ChronoUnit.MINUTES) + val resetTime = LocalTime.ofInstant(resetDate, ZoneOffset.UTC) + val interval = Duration.ofMillis(100) + + val genericUpdateListener = mock { } + val genericRowListener = mock { } + val messageLoader = mock { } + val clock = mock { + on { instant() }.thenReturn(instant) + } + + runTest { + val reader = DataBaseReader.createDataBaseReader( + DataBaseReaderConfiguration( + mapOf( + DataSourceId("persons") to DataSourceConfiguration( + mysql.jdbcUrl, + mysql.username, + mysql.password, + ) + ), + mapOf( + QueryId("updates") to QueryConfiguration( + "SELECT * FROM test_data.person WHERE id > \${id:integer}" + ) + ) + ), + this, + genericUpdateListener, + genericRowListener, + messageLoader, + clock + ) + val listener = mock { } + val taskId = reader.submitPullTask( + PullTableRequest( + DataSourceId("persons"), + false, + ResetState( + afterTime = resetTime + ), + null, + mapOf("id" to listOf(persons.size.toString())), + setOf("id"), + QueryId("updates"), + emptyMap(), + interval, + ), + listener, + ) + + runCurrent() + + val newData: List = (1..10).map { Person("new$it", Instant.now().truncatedTo(ChronoUnit.DAYS), "test-new-data-$it".toByteArray()) } + insertData(newData) + + advanceTimeBy(interval.toMillis() * 2) + + genericUpdateListener.assertCaptured(newData, interval.toMillis()) + listener.assertCaptured(newData) + verify(messageLoader, never()).load(any(), isNull(), any()) + verifyNoInteractions(genericRowListener) + + clearInvocations(genericUpdateListener) + clearInvocations(listener) + whenever(clock.instant()).thenReturn(resetDate) + advanceTimeBy(interval.toMillis() * 2) + + genericUpdateListener.assertCaptured(newData, interval.toMillis()) + listener.assertCaptured(newData) + verify(messageLoader, never()).load(any(), isNull(), any()) + verifyNoInteractions(genericRowListener) + + reader.stopPullTask(taskId) + advanceUntilIdle() + } + } + + private fun UpdateListener.assertCaptured(persons: List, timeout: Long = 0) { val tableRawCaptor = argumentCaptor() val propertiesCaptor = argumentCaptor>() - verify(this, times(persons.size)).onUpdate(any(), tableRawCaptor.capture(), propertiesCaptor.capture()) + verify(this, timeout(timeout).times(persons.size)).onUpdate(any(), tableRawCaptor.capture(), propertiesCaptor.capture()) tableRawCaptor.allValues.map { Person( checkNotNull(it.columns["name"]).toString(), @@ -390,9 +557,9 @@ internal class DataBaseReaderIntegrationTest { assertNotNull(it[TH2_PULL_TASK_UPDATE_HASH_PROPERTY]) } } - private fun RowListener.assertCaptured(persons: List) { + private fun RowListener.assertCaptured(persons: List, timeout: Long = 0) { val captor = argumentCaptor() - verify(this, times(persons.size)).onRow(any(), captor.capture()) + verify(this, timeout(timeout).times(persons.size)).onRow(any(), captor.capture()) captor.allValues.map { Person( checkNotNull(it.columns["name"]).toString(), diff --git a/core/src/test/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorServiceTest.kt b/core/src/test/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorServiceTest.kt new file mode 100644 index 0000000..3213ba3 --- /dev/null +++ b/core/src/test/kotlin/com/exactpro/th2/read/db/core/DataBaseMonitorServiceTest.kt @@ -0,0 +1,115 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.read.db.core + +import com.exactpro.th2.read.db.app.ResetState +import com.exactpro.th2.read.db.core.DataBaseMonitorService.Companion.calculateNearestResetDate +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertSame +import org.junit.jupiter.api.Test +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneOffset +import java.time.temporal.ChronoUnit.DAYS +import java.time.temporal.ChronoUnit.MINUTES + +class DataBaseMonitorServiceTest { + @Test + fun `is reset required where empty test`() { + assertNull(ResetState().calculateNearestResetDate(Instant.now())) + } + + @Test + fun `is reset required where after-date before now test`() { + val now = Instant.now() + val afterDate = now.minus(1L, MINUTES) + assertSame(afterDate, ResetState(afterDate).calculateNearestResetDate(Instant.now())) + } + + @Test + fun `is reset required where after-date equals now test`() { + val now = Instant.now() + assertSame(now, ResetState(now).calculateNearestResetDate(now)) + } + + @Test + fun `is reset required where after-date after now test`() { + val now = Instant.now() + val afterDate = now.plus(1L, MINUTES) + assertNull(ResetState(afterDate).calculateNearestResetDate(Instant.now())) + } + + @Test + fun `is reset required where after-time before now test`() { + val expected = BASE_DATE.minus(1L, MINUTES) + val afterTime = LocalTime.ofInstant(expected, ZoneOffset.UTC) + + assertEquals(expected, ResetState(afterTime = afterTime).calculateNearestResetDate(BASE_DATE)) + } + + @Test + fun `is reset required where after-time equals now test`() { + val afterTime = LocalTime.ofInstant(BASE_DATE, ZoneOffset.UTC) + assertEquals(BASE_DATE, ResetState(afterTime = afterTime).calculateNearestResetDate(BASE_DATE)) + } + + @Test + fun `is reset required where after-time after now test`() { + val expected = BASE_DATE.plus(1L, MINUTES).minus(1L, DAYS) + val afterTime = LocalTime.ofInstant(expected, ZoneOffset.UTC) + assertEquals(expected, ResetState(afterTime = afterTime).calculateNearestResetDate(BASE_DATE)) + } + + @Test + fun `is reset required where after-date before after-time and both before now test`() { + val expected = BASE_DATE.minus(1L, MINUTES) + val afterDate = BASE_DATE.minus(2L, MINUTES) + val afterTime = LocalTime.ofInstant(expected, ZoneOffset.UTC) + assertEquals(expected, ResetState(afterDate, afterTime).calculateNearestResetDate(BASE_DATE)) + } + + @Test + fun `is reset required where after-date after after-time and both before now test`() { + val afterDate = BASE_DATE.minus(1L, MINUTES) + val afterTime = LocalTime.ofInstant(BASE_DATE.minus(2L, MINUTES), ZoneOffset.UTC) + assertEquals(afterDate, ResetState(afterDate, afterTime).calculateNearestResetDate(BASE_DATE)) + } + + @Test + fun `is reset required where after-date before after-time and both after now test`() { + val expected = BASE_DATE.plus(2L, MINUTES).minus(1L, DAYS) + val afterDate = BASE_DATE.plus(1L, MINUTES) + val afterTime = LocalTime.ofInstant(expected, ZoneOffset.UTC) + assertEquals(expected, ResetState(afterDate, afterTime).calculateNearestResetDate(BASE_DATE)) + } + + @Test + fun `is reset required where after-date after after-time and both after now test`() { + val expected = BASE_DATE.plus(1L, MINUTES).minus(1L, DAYS) + val afterDate = BASE_DATE.plus(2L, MINUTES) + val afterTime = LocalTime.ofInstant(expected, ZoneOffset.UTC) + assertEquals(expected, ResetState(afterDate, afterTime).calculateNearestResetDate(BASE_DATE)) + } + + companion object { + /** + * This date is middle of day to execute after-time tests + */ + private val BASE_DATE = Instant.parse("2023-11-14T12:12:34.567890123Z") + } +} \ No newline at end of file diff --git a/grpc/README.md b/grpc/README.md index a308da5..990a783 100644 --- a/grpc/README.md +++ b/grpc/README.md @@ -1,4 +1,4 @@ -# gRPC for read-db (0.0.4) +# gRPC for read-db (0.0.5) The read-db provides you with gRPC interface for interacting with database. You can: @@ -8,5 +8,8 @@ You can: # Release notes: +## 0.0.5 ++ added DbPullRequest.reset_state_parameters field + ## 0.0.4 + added DbPullRequest.start_from_last_read_row field \ No newline at end of file diff --git a/grpc/gradle.properties b/grpc/gradle.properties index 17c7687..47556d8 100644 --- a/grpc/gradle.properties +++ b/grpc/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=0.0.4 +release_version=0.0.5 description=gRPC API for executing requests in read-db application \ No newline at end of file diff --git a/grpc/proto/th2_grpc_read_db/read_db.proto b/grpc/proto/th2_grpc_read_db/read_db.proto index 0ce7505..96fa1aa 100644 --- a/grpc/proto/th2_grpc_read_db/read_db.proto +++ b/grpc/proto/th2_grpc_read_db/read_db.proto @@ -4,6 +4,7 @@ package th2.read_db; import "google/protobuf/duration.proto"; import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; option java_multiple_files = true; option java_package = "com.exactpro.th2.read.db.grpc"; @@ -51,6 +52,12 @@ message DbPullRequest { google.protobuf.Duration pull_interval = 7; AssociatedMessageType associated_message_type = 8; bool start_from_last_read_row = 9; + DbPullResetState reset_state_parameters = 10; +} + +message DbPullResetState { + google.protobuf.Timestamp after_date = 1; + google.protobuf.Timestamp after_time = 2; // Only time units matter, value must contains '1970-01-01' date } message PullTaskId {