From fc811a3387a9821f60be712ad59a48088c9f5cdd Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Tue, 1 Aug 2023 15:57:45 +0400 Subject: [PATCH] [TH2-5004] Migrate to statistics --- .../services/cradle/CradleService.kt | 118 +++++++++--------- 1 file changed, 57 insertions(+), 61 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt index 0f28d2a4..fee20315 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt @@ -19,8 +19,8 @@ package com.exactpro.th2.rptdataprovider.services.cradle import com.exactpro.cradle.BookId import com.exactpro.cradle.CradleManager -import com.exactpro.cradle.Order -import com.exactpro.cradle.PageInfo +import com.exactpro.cradle.Direction +import com.exactpro.cradle.FrameType import com.exactpro.cradle.cassandra.CassandraStorageSettings import com.exactpro.cradle.counters.Interval import com.exactpro.cradle.messages.GroupedMessageFilter @@ -66,12 +66,6 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager) private val getTestEventsAsyncMetric: Metrics = Metrics("get_test_events_async", "getTestEventsAsync") private val getTestEventAsyncMetric: Metrics = Metrics("get_test_event_async", "getTestEventAsync") private val getStreamsMetric: Metrics = Metrics("get_streams", "getStreams") - - // source interval changes to exclude intersection to next or previous page - private fun PageInfo.toInterval(): Interval = Interval( - started?.plusNanos(1) ?: Instant.MIN, - ended?.minusNanos(1) ?: Instant.MAX - ) } private val manager = CacheManagerBuilder.newCacheManagerBuilder().build(true) @@ -112,31 +106,33 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager) (logMetrics(getMessagesBatches) { logTime("getMessagesBatches (filter=${filter.convertToString()})") { if (searchBySessionGroup) { - val group = getSessionGroupSuspend(filter) - val groupedMessageFilter = filter.toGroupedMessageFilter(group).also { - logger.debug { "Start searching group batches by $it" } - } - storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asSequence() - .map { batch -> - batch.messages.asSequence().filter { message -> - filter.sessionAlias == message.sessionAlias - && filter.direction == message.direction - && filter.timestampFrom?.check(message.timestamp) ?: true - && filter.timestampTo?.check(message.timestamp) ?: true - }.toList() - .run { - if (isEmpty()) { - StoredMessageBatch() - } else { - StoredMessageBatch( - this, - storage.findPage(batch.bookId, batch.recDate).id, - batch.recDate - ) + getSessionGroupSuspend(filter)?.let { group -> + val groupedMessageFilter = filter.toGroupedMessageFilter(group).also { + logger.debug { "Start searching group batches by $it" } + } + storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asSequence() + .map { batch -> + batch.messages.asSequence().filter { message -> + filter.sessionAlias == message.sessionAlias + && filter.direction == message.direction + && filter.timestampFrom?.check(message.timestamp) ?: true + && filter.timestampTo?.check(message.timestamp) ?: true + }.toList() + .run { + if (isEmpty()) { + StoredMessageBatch() + } else { + StoredMessageBatch( + this, + storage.findPage(batch.bookId, batch.recDate).id, + batch.recDate + ) + } } - } - }.filterNot(StoredMessageBatch::isEmpty) - .asIterable() + }.filterNot(StoredMessageBatch::isEmpty) + .asIterable() + + } ?: listOf() } else { storage.getMessageBatchesAsync(filter).await().asIterable() } @@ -171,7 +167,8 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager) bookId: BookId, from: Instant?, to: Instant?, - sessionAlias: String + sessionAlias: String, + direction: Direction ): String? { val cache = bookToCache.computeIfAbsent(bookId.name) { manager.createCache( @@ -182,7 +179,6 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager) ResourcePoolsBuilder.heap(aliasToGroupCacheSize) ).build() ) - } return cache.get(sessionAlias) ?: run { @@ -190,52 +186,51 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager) logMetrics(getMapAliasToGroupAsyncMetric) { logTime("getSessionGroup (book=${bookId.name}, from=${from}, to=${to}, session alias=${sessionAlias})") { val interval = Interval(from ?: Instant.MIN, to ?: Instant.MAX) - logger.debug { "Strat searching '$sessionAlias' session alias in cradle in [${interval.start}, ${interval.end}] interval" } - // getPagesAsync method is used instead of getPage because the first one return all pages touched by interval - storage.getPagesAsync(bookId, interval).get().asSequence() - .map { pageInfo -> pageInfo.toInterval() } - .filter { pageInterval -> - storage.getSessionAliases(bookId, pageInterval).asSequence() - .any { alias -> alias == sessionAlias } - }.firstOrNull() - ?.let searchInGroups@ { pageInterval -> + + logger.debug { "Strat searching '$sessionAlias' session alias counters in cradle in [${interval.start}, ${interval.end}] interval, ${FrameType.TYPE_100MS} frame type" } + storage.getMessageCountersAsync(bookId, sessionAlias, direction, FrameType.TYPE_100MS, interval).await() + .asSequence() + .firstOrNull() + ?.let searchInGroups@ { counter -> cache.get(sessionAlias)?.let { group -> - logger.debug { "Another coroutine has dound session '$sessionAlias' alias to '$group' group pair" } + logger.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" } return@searchInGroups group } - logger.debug { "Strat searching session group by '$sessionAlias' alias in cradle in (${pageInterval.start}, ${pageInterval.end}) interval" } - storage.getSessionGroups(bookId, pageInterval).asSequence() + val shortInterval = Interval(counter.frameStart, counter.frameStart.plusMillis(100)) + logger.debug { "Strat searching session group by '$sessionAlias' alias in cradle in (${shortInterval.start}, ${shortInterval.end}) interval" } + storage.getSessionGroupsAsync(bookId, shortInterval).await().asSequence() .flatMap { group -> storage.getGroupedMessageBatches( GroupedMessageFilter.builder() .bookId(bookId) - .timestampFrom().isGreaterThan(pageInterval.start) - .timestampTo().isLessThan(pageInterval.end) + .timestampFrom().isGreaterThanOrEqualTo(shortInterval.start) + .timestampTo().isLessThanOrEqualTo(shortInterval.end) .groupName(group) - .order(Order.DIRECT) .build() ).asSequence() }.forEach searchInBatch@{ batch -> cache.get(sessionAlias)?.let { group -> - logger.debug { "Another coroutine has dound session '$sessionAlias' alias to '$group' group pair" } + logger.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" } return@searchInGroups group } logger.debug { "Search session group by '$sessionAlias' alias in grouped batch" } batch.messages.forEach { message -> cache.putIfAbsent(message.sessionAlias, batch.group) ?: run { - logger.info { "Put session '${message.sessionAlias}' alias to '${batch.group}' group to cache" } - } - if (sessionAlias == message.sessionAlias) { - logger.debug { "Found session '${message.sessionAlias}' alias to '${batch.group}' group pair" } - return@searchInGroups batch.group + logger.info { "Put '${message.sessionAlias}' session alias to '${batch.group}' group to cache" } + if (sessionAlias == message.sessionAlias) { + logger.debug { "Found '${message.sessionAlias}' session alias to '${batch.group}' group pair" } + } } } } - error("Mapping between a session group and the '${sessionAlias}' session alias isn't found, book: ${bookId.name}, [from: $from, to: $to]") + cache.get(sessionAlias)?.let { group -> + logger.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" } + return@searchInGroups group + } ?: error("Mapping between a session group and the '${sessionAlias}' alias isn't found, book: ${bookId.name}, [from: $from, to: $to]") } ?: run { - logger.debug { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval interval" } - null - } + logger.info { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval" } + null + } } } } @@ -248,12 +243,13 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager) filter.bookId, filter.timestampFrom?.value, filter.timestampTo?.value, - filter.sessionAlias + filter.sessionAlias, + filter.direction ) private suspend fun getSessionGroupSuspend( id: StoredMessageId - ): String? = getSessionGroupSuspend(id.bookId, id.timestamp, id.timestamp, id.sessionAlias) + ): String? = getSessionGroupSuspend(id.bookId, id.timestamp, id.timestamp, id.sessionAlias, id.direction) suspend fun getMessageSuspend(id: StoredMessageId): StoredMessage? { return withContext(cradleDispatcher) {