Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5234] Fixed the problem data provider can't handle messageIds #368

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Report data provider (5.13.0)
# Report data provider (5.13.1)

# Overview
This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -297,6 +297,9 @@ spec:

# Release notes

## 5.13.1
* Fixed the problem data provider can't handle `messageIds` request with `messageId` but without `startTimestamp` arguments

## 5.13.0
* Provided ability to limit `messageIds` request by `lookupLimitDays` argument or `messageIdsLookupLimitDays` option
* Updated:
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
################################################################################

kotlin.code.style=official
release_version=5.13.0
release_version=5.13.1
docker_image_name=
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@ package com.exactpro.th2.rptdataprovider.entities.requests
import com.exactpro.cradle.Direction
import com.exactpro.cradle.BookId
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.TimeRelation.AFTER
import com.exactpro.cradle.TimeRelation.BEFORE
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.common.util.toInstant
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest
Expand Down Expand Up @@ -96,7 +98,7 @@ data class SseMessageSearchRequest<RM, PM>(

searchDirection = request.searchDirection.let {
when (it) {
PREVIOUS -> TimeRelation.BEFORE
PREVIOUS -> BEFORE
else -> searchDirection
}
},
Expand Down Expand Up @@ -138,7 +140,7 @@ data class SseMessageSearchRequest<RM, PM>(
constructor(parameters: Map<String, List<String>>, filterPredicate: FilterPredicate<MessageWithMetadata<RM, PM>>) : this(
parameters = parameters,
filterPredicate = filterPredicate,
searchDirection = TimeRelation.AFTER
searchDirection = AFTER
)

constructor(
Expand All @@ -148,19 +150,24 @@ data class SseMessageSearchRequest<RM, PM>(
) : this(
request = request,
filterPredicate = filterPredicate,
searchDirection = TimeRelation.AFTER,
searchDirection = AFTER,
bookId = bookId
)

private fun checkEndTimestamp() {
if (endTimestamp == null || startTimestamp == null) return

if (searchDirection == TimeRelation.AFTER) {
if (startTimestamp.isAfter(endTimestamp))
throw InvalidRequestException("startTimestamp: $startTimestamp > endTimestamp: $endTimestamp")
} else {
if (startTimestamp.isBefore(endTimestamp))
throw InvalidRequestException("startTimestamp: $startTimestamp < endTimestamp: $endTimestamp")
when(searchDirection) {
BEFORE -> {
if (startTimestamp < endTimestamp) {
throw InvalidRequestException("startTimestamp: $startTimestamp < endTimestamp: $endTimestamp")
}
}
AFTER -> {
if (startTimestamp > endTimestamp) {
throw InvalidRequestException("startTimestamp: $startTimestamp > endTimestamp: $endTimestamp")
}
}
}
}

Expand All @@ -176,8 +183,26 @@ data class SseMessageSearchRequest<RM, PM>(
}

private fun checkTimestampAndId() {
if (startTimestamp != null && resumeFromIdsList.isNotEmpty())
throw InvalidRequestException("You cannot specify resume Id and start timestamp at the same time")
if (startTimestamp != null && resumeFromIdsList.isNotEmpty()) {
OptimumCode marked this conversation as resolved.
Show resolved Hide resolved
when(searchDirection) {
BEFORE -> {
val pointers = resumeFromIdsList.filter { it.timestamp > startTimestamp }
if (pointers.isNotEmpty()) {
throw InvalidRequestException(
"You cannot specify resume Ids $pointers with timestamp greater than startTimestamp $startTimestamp"
)
}
}
AFTER -> {
val pointers = resumeFromIdsList.filter { it.timestamp < startTimestamp }
if (pointers.isNotEmpty()) {
throw InvalidRequestException(
"You cannot specify resume Ids $pointers with timestamp less than startTimestamp $startTimestamp"
)
}
}
}
}
}

private fun checkResumeIds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.exactpro.th2.rptdataprovider.entities.internal.PipelineKeepAlive
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch
import com.exactpro.th2.rptdataprovider.entities.internal.StreamEndObject
import com.exactpro.th2.rptdataprovider.entities.internal.StreamName
import com.exactpro.th2.rptdataprovider.entities.internal.StreamPointer
import com.exactpro.th2.rptdataprovider.entities.mappers.TimeRelationMapper
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import com.exactpro.th2.rptdataprovider.entities.responses.StreamInfo
Expand Down Expand Up @@ -74,6 +75,14 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
private val searchMessageRequests =
Counter.build("th2_search_messages", "Count of search message requests")
.register()

private fun StreamPointer.toStoredMessageId() = StoredMessageId(
stream.bookId,
stream.name,
stream.direction,
timestamp,
sequence
)
}


Expand Down Expand Up @@ -131,26 +140,13 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
}

suspend fun getIds(request: SseMessageSearchRequest<RM, PM>, lookupLimitDays: Long): Map<String, List<StreamInfo>> {
require(request.startTimestamp != null && request.endTimestamp == null) {
"startTimestamp must be not null and endTimestamp be null in request: $request"
require((request.startTimestamp != null || request.resumeFromIdsList.isNotEmpty()) && request.endTimestamp == null) {
"(startTimestamp must not be null or resumeFromIdsList must not be empty) and endTimestamp must be null in request: $request"
}
searchMessageRequests.inc()
val resumeId = request.resumeFromIdsList.firstOrNull()
val messageId = resumeId?.let {
StoredMessageId(
it.stream.bookId,
it.stream.name,
it.stream.direction,
it.timestamp,
it.sequence
)
}
val resultRequest = resumeId?.let {
request.copy(startTimestamp = resumeId.timestamp)
} ?: request

val before = getIds(resultRequest, messageId, lookupLimitDays, BEFORE)
val after = getIds(resultRequest, messageId, lookupLimitDays, AFTER)
val before = getIds(request, request.resumeFromIdsList, lookupLimitDays, BEFORE)
val after = getIds(request, request.resumeFromIdsList, lookupLimitDays, AFTER)

return mapOf(
TimeRelationMapper.toHttp(BEFORE) to before,
Expand All @@ -166,17 +162,24 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(

private suspend fun getIds(
request: SseMessageSearchRequest<RM, PM>,
messageId: StoredMessageId?,
messageIds: List<StreamPointer>,
lookupLimitDays: Long,
searchDirection: TimeRelation
): MutableList<StreamInfo> {
val messageId = when(searchDirection) {
BEFORE -> messageIds.maxByOrNull(StreamPointer::timestamp)
AFTER -> messageIds.minByOrNull(StreamPointer::timestamp)
}?.toStoredMessageId()

val lookupLimit = request.lookupLimitDays ?: lookupLimitDays
val resultRequest = request.run {
val calculatedStartTimestamp = startTimestamp ?: messageId?.timestamp
copy(
searchDirection = searchDirection,
startTimestamp = calculatedStartTimestamp,
endTimestamp = when (searchDirection) {
BEFORE -> startTimestamp?.minus(lookupLimit, DAYS)
AFTER -> startTimestamp?.plus(lookupLimit, DAYS)
BEFORE -> calculatedStartTimestamp?.minus(lookupLimit, DAYS)
AFTER -> calculatedStartTimestamp?.plus(lookupLimit, DAYS)
}
).also(SseMessageSearchRequest<*, *>::checkIdsRequest)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.exactpro.th2.rptdataprovider.handlers.messages

import com.exactpro.cradle.Order
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.Order.DIRECT
import com.exactpro.cradle.Order.REVERSE
import com.exactpro.cradle.TimeRelation.AFTER
import com.exactpro.cradle.TimeRelation.BEFORE
import com.exactpro.cradle.messages.MessageFilterBuilder
import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.cradle.messages.StoredMessageBatch
Expand All @@ -42,6 +44,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.time.Instant
import com.exactpro.cradle.Order as CradleOrder


class MessageExtractor<B, G, RM, PM>(
Expand All @@ -66,10 +69,9 @@ class MessageExtractor<B, G, RM, PM>(
private var lastElement: StoredMessageId? = null
private var lastTimestamp: Instant? = null

private val order = if (request.searchDirection == TimeRelation.AFTER) {
Order.DIRECT
} else {
Order.REVERSE
private val order: CradleOrder = when(request.searchDirection) {
BEFORE -> REVERSE
AFTER -> DIRECT
}

init {
Expand All @@ -86,10 +88,9 @@ class MessageExtractor<B, G, RM, PM>(
}

private fun getMessagesFromBatch(batch: StoredMessageBatch): Collection<StoredMessage> {
return if (order == Order.DIRECT) {
batch.messages
} else {
batch.messagesReverse
return when(order) {
DIRECT -> batch.messages
REVERSE -> batch.messagesReverse
}
}

Expand All @@ -102,32 +103,29 @@ class MessageExtractor<B, G, RM, PM>(
if (resumeFromId?.sequence != null) {
val startSeq = resumeFromId.sequence
dropWhile {
if (order == Order.DIRECT) {
it.sequence < startSeq
} else {
it.sequence > startSeq
when(order) {
DIRECT -> it.sequence < startSeq
REVERSE -> it.sequence > startSeq
}
}
} else {
this // nothing to filter by sequence
}
}.dropWhile { //trim messages that do not strictly match time filter
request.startTimestamp?.let { startTimestamp ->
if (order == Order.DIRECT) {
it.timestamp.isBefore(startTimestamp)
} else {
it.timestamp.isAfter(startTimestamp)
when(order) {
DIRECT -> it.timestamp.isBefore(startTimestamp)
REVERSE -> it.timestamp.isAfter(startTimestamp)
}
} ?: false
}
}

private fun trimMessagesListTail(message: StoredMessage): Boolean {
return request.endTimestamp?.let { endTimestamp ->
if (order == Order.DIRECT) {
message.timestamp.isAfterOrEqual(endTimestamp)
} else {
message.timestamp.isBeforeOrEqual(endTimestamp)
when(order) {
DIRECT -> message.timestamp.isAfterOrEqual(endTimestamp)
REVERSE -> message.timestamp.isBeforeOrEqual(endTimestamp)
}
} ?: false
}
Expand All @@ -149,9 +147,14 @@ class MessageExtractor<B, G, RM, PM>(

streamName!!

val resumeFromId = request.resumeFromIdsList.firstOrNull {
it.stream.name == streamName.name && it.stream.direction == streamName.direction
}
val resumeFromId = request.resumeFromIdsList.asSequence()
.filter { it.stream.name == streamName.name && it.stream.direction == streamName.direction }
.run {
when(order) {
DIRECT -> minByOrNull(StreamPointer::sequence)
REVERSE -> maxByOrNull(StreamPointer::sequence)
}
}

logger.debug { "acquiring cradle iterator for stream $streamName" }

Expand All @@ -170,20 +173,22 @@ class MessageExtractor<B, G, RM, PM>(
.also { builder ->
if (resumeFromId != null) {
builder.sequence().let {
if (order == Order.DIRECT) {
it.isGreaterThanOrEqualTo(resumeFromId.sequence)
} else {
it.isLessThanOrEqualTo(resumeFromId.sequence)
when(order) {
DIRECT -> it.isGreaterThanOrEqualTo(resumeFromId.sequence)
REVERSE -> it.isLessThanOrEqualTo(resumeFromId.sequence)
}
}
}
// always need to make sure that we send messages within the specified timestamp (in case the resume ID points to the past)
if (order == Order.DIRECT) {
request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampTo().isLessThan(it) }
} else {
request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }
when(order) {
DIRECT -> {
request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampTo().isLessThan(it) }
}
REVERSE -> {
request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }
}
}
}.build()
)
Expand All @@ -207,8 +212,14 @@ class MessageExtractor<B, G, RM, PM>(
}
}

val firstMessage = if (order == Order.DIRECT) batch.messages.first() else batch.messages.last()
val lastMessage = if (order == Order.DIRECT) batch.messages.last() else batch.messages.first()
val firstMessage = when(order) {
DIRECT -> batch.messages.first()
REVERSE -> batch.messages.last()
}
val lastMessage = when(order) {
DIRECT -> batch.messages.last()
REVERSE -> batch.messages.first()
}

logger.trace {
"batch ${batch.id.sequence} of stream $streamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messages.size} messages left (firstId=${firstMessage.id.sequence} firstTimestamp=${firstMessage.timestamp} lastId=${lastMessage.id.sequence} lastTimestamp=${lastMessage.timestamp})"
Expand Down Expand Up @@ -254,10 +265,9 @@ class MessageExtractor<B, G, RM, PM>(
}

isStreamEmpty = true
lastTimestamp = if (order == Order.DIRECT) {
Instant.ofEpochMilli(Long.MAX_VALUE)
} else {
Instant.ofEpochMilli(Long.MIN_VALUE)
lastTimestamp = when(order) {
DIRECT -> Instant.ofEpochMilli(Long.MAX_VALUE)
REVERSE -> Instant.ofEpochMilli(Long.MIN_VALUE)
}

logger.debug { "no more data for stream $streamName (lastId=${lastElement.toString()} lastTimestamp=${lastTimestamp})" }
Expand Down
Loading