Skip to content

Commit

Permalink
[TH2-5004] Migrate to statistics (#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro authored Aug 1, 2023
1 parent d8579ca commit 6121dff
Showing 1 changed file with 57 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package com.exactpro.th2.rptdataprovider.services.cradle

import com.exactpro.cradle.BookId
import com.exactpro.cradle.CradleManager
import com.exactpro.cradle.Order
import com.exactpro.cradle.PageInfo
import com.exactpro.cradle.Direction
import com.exactpro.cradle.FrameType
import com.exactpro.cradle.cassandra.CassandraStorageSettings
import com.exactpro.cradle.counters.Interval
import com.exactpro.cradle.messages.GroupedMessageFilter
Expand Down Expand Up @@ -66,12 +66,6 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager)
private val getTestEventsAsyncMetric: Metrics = Metrics("get_test_events_async", "getTestEventsAsync")
private val getTestEventAsyncMetric: Metrics = Metrics("get_test_event_async", "getTestEventAsync")
private val getStreamsMetric: Metrics = Metrics("get_streams", "getStreams")

// source interval changes to exclude intersection to next or previous page
private fun PageInfo.toInterval(): Interval = Interval(
started?.plusNanos(1) ?: Instant.MIN,
ended?.minusNanos(1) ?: Instant.MAX
)
}

private val manager = CacheManagerBuilder.newCacheManagerBuilder().build(true)
Expand Down Expand Up @@ -112,31 +106,33 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager)
(logMetrics(getMessagesBatches) {
logTime("getMessagesBatches (filter=${filter.convertToString()})") {
if (searchBySessionGroup) {
val group = getSessionGroupSuspend(filter)
val groupedMessageFilter = filter.toGroupedMessageFilter(group).also {
logger.debug { "Start searching group batches by $it" }
}
storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asSequence()
.map { batch ->
batch.messages.asSequence().filter { message ->
filter.sessionAlias == message.sessionAlias
&& filter.direction == message.direction
&& filter.timestampFrom?.check(message.timestamp) ?: true
&& filter.timestampTo?.check(message.timestamp) ?: true
}.toList()
.run {
if (isEmpty()) {
StoredMessageBatch()
} else {
StoredMessageBatch(
this,
storage.findPage(batch.bookId, batch.recDate).id,
batch.recDate
)
getSessionGroupSuspend(filter)?.let { group ->
val groupedMessageFilter = filter.toGroupedMessageFilter(group).also {
logger.debug { "Start searching group batches by $it" }
}
storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asSequence()
.map { batch ->
batch.messages.asSequence().filter { message ->
filter.sessionAlias == message.sessionAlias
&& filter.direction == message.direction
&& filter.timestampFrom?.check(message.timestamp) ?: true
&& filter.timestampTo?.check(message.timestamp) ?: true
}.toList()
.run {
if (isEmpty()) {
StoredMessageBatch()
} else {
StoredMessageBatch(
this,
storage.findPage(batch.bookId, batch.recDate).id,
batch.recDate
)
}
}
}
}.filterNot(StoredMessageBatch::isEmpty)
.asIterable()
}.filterNot(StoredMessageBatch::isEmpty)
.asIterable()

} ?: listOf()
} else {
storage.getMessageBatchesAsync(filter).await().asIterable()
}
Expand Down Expand Up @@ -171,7 +167,8 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager)
bookId: BookId,
from: Instant?,
to: Instant?,
sessionAlias: String
sessionAlias: String,
direction: Direction
): String? {
val cache = bookToCache.computeIfAbsent(bookId.name) {
manager.createCache(
Expand All @@ -182,60 +179,58 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager)
ResourcePoolsBuilder.heap(aliasToGroupCacheSize)
).build()
)

}
return cache.get(sessionAlias)
?: run {
withContext(cradleDispatcher) {
logMetrics(getMapAliasToGroupAsyncMetric) {
logTime("getSessionGroup (book=${bookId.name}, from=${from}, to=${to}, session alias=${sessionAlias})") {
val interval = Interval(from ?: Instant.MIN, to ?: Instant.MAX)
logger.debug { "Strat searching '$sessionAlias' session alias in cradle in [${interval.start}, ${interval.end}] interval" }
// getPagesAsync method is used instead of getPage because the first one return all pages touched by interval
storage.getPagesAsync(bookId, interval).get().asSequence()
.map { pageInfo -> pageInfo.toInterval() }
.filter { pageInterval ->
storage.getSessionAliases(bookId, pageInterval).asSequence()
.any { alias -> alias == sessionAlias }
}.firstOrNull()
?.let searchInGroups@ { pageInterval ->

logger.debug { "Strat searching '$sessionAlias' session alias counters in cradle in [${interval.start}, ${interval.end}] interval, ${FrameType.TYPE_100MS} frame type" }
storage.getMessageCountersAsync(bookId, sessionAlias, direction, FrameType.TYPE_100MS, interval).await()
.asSequence()
.firstOrNull()
?.let searchInGroups@ { counter ->
cache.get(sessionAlias)?.let { group ->
logger.debug { "Another coroutine has dound session '$sessionAlias' alias to '$group' group pair" }
logger.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" }
return@searchInGroups group
}
logger.debug { "Strat searching session group by '$sessionAlias' alias in cradle in (${pageInterval.start}, ${pageInterval.end}) interval" }
storage.getSessionGroups(bookId, pageInterval).asSequence()
val shortInterval = Interval(counter.frameStart, counter.frameStart.plusMillis(100))
logger.debug { "Strat searching session group by '$sessionAlias' alias in cradle in (${shortInterval.start}, ${shortInterval.end}) interval" }
storage.getSessionGroupsAsync(bookId, shortInterval).await().asSequence()
.flatMap { group ->
storage.getGroupedMessageBatches(
GroupedMessageFilter.builder()
.bookId(bookId)
.timestampFrom().isGreaterThan(pageInterval.start)
.timestampTo().isLessThan(pageInterval.end)
.timestampFrom().isGreaterThanOrEqualTo(shortInterval.start)
.timestampTo().isLessThanOrEqualTo(shortInterval.end)
.groupName(group)
.order(Order.DIRECT)
.build()
).asSequence()
}.forEach searchInBatch@{ batch ->
cache.get(sessionAlias)?.let { group ->
logger.debug { "Another coroutine has dound session '$sessionAlias' alias to '$group' group pair" }
logger.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" }
return@searchInGroups group
}
logger.debug { "Search session group by '$sessionAlias' alias in grouped batch" }
batch.messages.forEach { message ->
cache.putIfAbsent(message.sessionAlias, batch.group) ?: run {
logger.info { "Put session '${message.sessionAlias}' alias to '${batch.group}' group to cache" }
}
if (sessionAlias == message.sessionAlias) {
logger.debug { "Found session '${message.sessionAlias}' alias to '${batch.group}' group pair" }
return@searchInGroups batch.group
logger.info { "Put '${message.sessionAlias}' session alias to '${batch.group}' group to cache" }
if (sessionAlias == message.sessionAlias) {
logger.debug { "Found '${message.sessionAlias}' session alias to '${batch.group}' group pair" }
}
}
}
}
error("Mapping between a session group and the '${sessionAlias}' session alias isn't found, book: ${bookId.name}, [from: $from, to: $to]")
cache.get(sessionAlias)?.let { group ->
logger.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" }
return@searchInGroups group
} ?: error("Mapping between a session group and the '${sessionAlias}' alias isn't found, book: ${bookId.name}, [from: $from, to: $to]")
} ?: run {
logger.debug { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval interval" }
null
}
logger.info { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval" }
null
}
}
}
}
Expand All @@ -248,12 +243,13 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager)
filter.bookId,
filter.timestampFrom?.value,
filter.timestampTo?.value,
filter.sessionAlias
filter.sessionAlias,
filter.direction
)

private suspend fun getSessionGroupSuspend(
id: StoredMessageId
): String? = getSessionGroupSuspend(id.bookId, id.timestamp, id.timestamp, id.sessionAlias)
): String? = getSessionGroupSuspend(id.bookId, id.timestamp, id.timestamp, id.sessionAlias, id.direction)

suspend fun getMessageSuspend(id: StoredMessageId): StoredMessage? {
return withContext(cradleDispatcher) {
Expand Down

0 comments on commit 6121dff

Please sign in to comment.