Skip to content

Commit

Permalink
[TH2-5239] Reduced required memory for executing sse event request wi…
Browse files Browse the repository at this point in the history
…th `limitForParent` parameter (#370)
  • Loading branch information
Nikita-Smirnov-Exactpro authored Sep 20, 2024
1 parent 28a1fc3 commit 1429f91
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 65 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Report data provider (5.13.1)
# Report data provider (5.13.2)

# Overview
This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -297,6 +297,9 @@ spec:

# Release notes

## 5.13.2
* Reduced required memory for executing sse event request with `limitForParent` parameter

## 5.13.1
* Fixed the problem data provider can't handle `messageIds` request with `messageId` but without `startTimestamp` arguments

Expand Down
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ application {


test {
// FIXME: the tests were temporary disabled since they're not compatible with new api

// useJUnitPlatform()
useJUnitPlatform()
}

dependencyCheck {
Expand Down
18 changes: 1 addition & 17 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
################################################################################
# Copyright 2009-2024 Exactpro (Exactpro Systems Limited)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

kotlin.code.style=official
release_version=5.13.1
release_version=5.13.2
docker_image_name=
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.rptdataprovider.handlers

import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

internal interface IParentEventCounter {
/**
* This method use parent event id or event id to limit number of child events.
* WARNING: event id isn't grantee event unique then this method can't be used for strict limitation.
* @return false if limit exceeded otherwise true
*/
fun checkCountAndGet(event: BaseEventEntity): Boolean

private object NoLimitedParentEventCounter : IParentEventCounter {
override fun checkCountAndGet(event: BaseEventEntity): Boolean = true
}

private class LimitedParentEventCounter(
private val limitForParent: Long
) : IParentEventCounter {
private val parentEventCounter = ConcurrentHashMap<String, AtomicLong>()

override fun checkCountAndGet(event: BaseEventEntity): Boolean {
if (event.parentEventId == null) {
return true
}

val value = parentEventCounter.compute(event.parentEventId.eventId.id) { _, value ->
if (value == null) {
AtomicLong(1)
} else {
if (value === MAX_EVENT_COUNTER) {
parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER)
MAX_EVENT_COUNTER
} else {
if (value.incrementAndGet() > limitForParent) {
parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER)
MAX_EVENT_COUNTER
} else {
value
}
}
}
}

return value !== MAX_EVENT_COUNTER
}
}

companion object {
private val MAX_EVENT_COUNTER = AtomicLong(Long.MAX_VALUE)

fun create(limitForParent: Long? = null): IParentEventCounter =
limitForParent?.let { LimitedParentEventCounter(it) } ?: NoLimitedParentEventCounter
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.exactpro.th2.rptdataprovider.minInstant
import com.exactpro.th2.rptdataprovider.producers.EventProducer
import com.exactpro.th2.rptdataprovider.services.cradle.CradleService
import com.exactpro.th2.rptdataprovider.tryToGetTestEvents
import io.github.oshai.kotlinlogging.KotlinLogging
import io.prometheus.client.Counter
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand All @@ -61,11 +62,9 @@ import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.time.LocalTime
import java.time.ZoneOffset
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
Expand All @@ -85,34 +84,6 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
private val eventSearchChunkSize: Int = context.configuration.eventSearchChunkSize.value.toInt()
private val keepAliveTimeout: Long = context.configuration.keepAliveTimeout.value.toLong()


private data class ParentEventCounter private constructor(
private val parentEventCounter: ConcurrentHashMap<String, AtomicLong>?,
val limitForParent: Long?
) {

constructor(limitForParent: Long?) : this(
parentEventCounter = limitForParent?.let { ConcurrentHashMap<String, AtomicLong>() },
limitForParent = limitForParent
)

fun checkCountAndGet(event: BaseEventEntity): BaseEventEntity? {
if (limitForParent == null || event.parentEventId == null)
return event

return parentEventCounter!!.getOrPut(event.parentEventId.toString(), { AtomicLong(1) }).let { parentCount ->
if (parentCount.get() <= limitForParent) {
parentCount.incrementAndGet()
event
} else {
parentEventCounter.putIfAbsent(event.id.toString(), AtomicLong(Long.MAX_VALUE))
null
}
}
}
}


private suspend fun keepAlive(
writer: StreamWriter<*, *>,
lastScannedObjectInfo: LastScannedObjectInfo,
Expand Down Expand Up @@ -323,7 +294,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
requireNotNull(resumeTimestamp) { "timestamp for $resumeProviderId cannot be extracted" }
}
val timeIntervals = getTimeIntervals(request, sseEventSearchStep, startTimestamp)
val parentEventCounter = ParentEventCounter(request.limitForParent)
val parentEventCounter = IParentEventCounter.create(request.limitForParent)

flow {
for ((start, end) in timeIntervals) {
Expand Down Expand Up @@ -359,14 +330,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
lastScannedObject.update(event, scanCnt)
processedEventCount.inc()
}
.filter { request.filterPredicate.apply(it) }
.let {
if (parentEventCounter.limitForParent != null) {
it.filter { event -> parentEventCounter.checkCountAndGet(event) != null }
} else {
it
}
}
.filter { request.filterPredicate.apply(it) && parentEventCounter.checkCountAndGet(it) }
.let { fl -> request.resultCountLimit?.let { fl.take(it) } ?: fl }
.onStart {
launch {
Expand Down
Loading

0 comments on commit 1429f91

Please sign in to comment.