Skip to content

Commit

Permalink
Merge pull request #304 from metrico/metrics_agg_improvements
Browse files Browse the repository at this point in the history
Metrics agg improvements
  • Loading branch information
akvlad authored Aug 4, 2023
2 parents 1dc0895 + 3292016 commit 0c1fa65
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 2 deletions.
30 changes: 29 additions & 1 deletion lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,35 @@ module.exports.overall = [
FROM time_series ARRAY JOIN JSONExtractKeysAndValues(time_series.labels, 'String') as pairs`,

`INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update',
'v3_1', toString(toUnixTimestamp(NOW())), NOW())`
'v3_1', toString(toUnixTimestamp(NOW())), NOW())`,

`CREATE TABLE IF NOT EXISTS metrics_15s (
fingerprint UInt64,
timestamp_ns Int64 CODEC(DoubleDelta),
last AggregateFunction(argMax, Float64, Int64),
max SimpleAggregateFunction(max, Float64),
min SimpleAggregateFunction(min, Float64),
count AggregateFunction(count),
sum SimpleAggregateFunction(sum, Float64),
bytes SimpleAggregateFunction(sum, Float64)
) ENGINE = AggregatingMergeTree
PARTITION BY toDate(toDateTime(intDiv(timestamp_ns, 1000000000)))
ORDER BY (fingerprint, timestamp_ns);`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS metrics_15s_mv TO metrics_15s AS
SELECT fingerprint,
intDiv(samples.timestamp_ns, 15000000000) * 15000000000 as timestamp_ns,
argMaxState(value, samples.timestamp_ns) as last,
maxSimpleState(value) as max,
minSimpleState(value) as min,
countState() as count,
sumSimpleState(value) as sum,
sumSimpleState(length(string)) as bytes
FROM samples_v3 as samples
GROUP BY fingerprint, timestamp_ns;`,

`INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update',
'v3_2', toString(toUnixTimestamp(NOW())), NOW())`
]

module.exports.traces = [
Expand Down
1 change: 1 addition & 0 deletions lib/handlers/query_range.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async function handler (req, res) {
res.header('Content-Type', pluginOut.type)
return res.send(pluginOut.out)
}
req.query.optimizations = true
try {
const response = await this.scanFingerprints(req.query)
res.code(200)
Expand Down
18 changes: 18 additions & 0 deletions parser/registry/smart_optimizations/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
const optimizations = [require('./optimization_v3_2')]

module.exports = {
/**
*
* @param token {Token}
* @param fromNS {number}
* @param toNS {number}
* @param stepNS {number}
*/
apply: (token, fromNS, toNS, stepNS) => {
const optimization = optimizations.find((opt) => opt.isApplicable(token, fromNS / 1000000))
if (optimization) {
return optimization.apply(token, fromNS, toNS, stepNS)
}
return null
}
}
69 changes: 69 additions & 0 deletions parser/registry/smart_optimizations/log_range_agg_reg_v3_2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const { getDuration } = require('../common')
const Sql = require('@cloki/clickhouse-sql')
module.exports = {
/**
*
* @param token {Token}
* @param query {Select}
* @returns {Select}
*/
rate: (token, query) => {
const duration = getDuration(token)
return genericRate(new Sql.Raw(`toFloat64(countMerge(count)) * 1000 / ${duration}`), token, query)
},

/**
*
* @param token {Token}
* @param query {Select}
* @returns {Select}
*/
count_over_time: (token, query) => {
return genericRate(new Sql.Raw('toFloat64(countMerge(count))'), token, query)
},

/**
*
* @param token {Token}
* @param query {Select}
* @returns {Select}
*/
bytes_rate: (token, query) => {
const duration = getDuration(token, query)
return genericRate(new Sql.Raw(`toFloat64(sum(bytes) * 1000 / ${duration})`), token, query)
},
/**
*
* @param token {Token}
* @param query {Select}
* @returns {Select}
*/
bytes_over_time: (token, query) => {
return genericRate(new Sql.Raw('toFloat64(sum(bytes))'), token, query)
}
}

const genericRate = (valueExpr, token, query) => {
const duration = getDuration(token)
query.ctx.matrix = true
query.ctx.duration = duration
query.limit(undefined, undefined)
const tsGroupingExpr = new Sql.Raw(`intDiv(timestamp_ns, ${duration}000000) * ${duration}`)
query.select([tsGroupingExpr, 'timestamp_ns'], [valueExpr, 'value'])
.groupBy('fingerprint', 'timestamp_ns')
.orderBy(['fingerprint', 'asc'], ['timestamp_ns', 'asc'])
const step = query.ctx.step
if (step <= duration) {
return query
}
const rateC = (new Sql.Select())
.select(
'labels',
[new Sql.Raw(`intDiv(timestamp_ns, ${step}) * ${step}`), 'timestamp_ns'],
[new Sql.Raw('argMin(rate_b.value, rate_b.timestamp_ns)'), 'value']
)
.from('rate_b')
.groupBy('fingerprint', 'timestamp_ns')
.orderBy(['fingerprint', 'asc'], ['timestamp_ns', 'asc'])
return rateC.with(new Sql.With('rate_b', query))
}
72 changes: 72 additions & 0 deletions parser/registry/smart_optimizations/optimization_v3_2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
const { getDuration } = require('../common')
const reg = require('./log_range_agg_reg_v3_2')
const Sql = require('@cloki/clickhouse-sql')
const { DATABASE_NAME, checkVersion } = require('../../../lib/utils')
const streamSelectorReg = require('../stream_selector_operator_registry')
const aggOpReg = require('../high_level_aggregation_registry')

/**
*
* @param token {Token}
* @param fromMS {number}
* @returns {boolean}
*/
module.exports.isApplicable = (token, fromMS) => {
let logAggFn = token.Child('log_range_aggregation_fn')
logAggFn = logAggFn ? logAggFn.value : null
if (!logAggFn) {
return false
}
const durationMs = getDuration(token)
return checkVersion('v3_2', fromMS) &&
!isLogPipeline(token) && reg[logAggFn] && durationMs >= 15000 && durationMs % 15000 === 0
}

function isLogPipeline (token) {
let isPipeline = false
for (const pipeline of token.Children('log_pipeline')) {
isPipeline |= !pipeline.Child('line_filter_operator') ||
!(pipeline.Child('line_filter_operator').value === '|=' &&
['""', '``'].includes(pipeline.Child('quoted_str').value))
}
return isPipeline
}

/**
*
* @param token {Token}
* @param fromNS {number}
* @param toNS {number}
* @param stepNS {number}
*/
module.exports.apply = (token, fromNS, toNS, stepNS) => {
fromNS = Math.floor(fromNS / 15000000000) * 15000000000
const tsClause = toNS
? Sql.between('samples.timestamp_ns', fromNS, toNS)
: Sql.Gt('samples.timestamp_ns', fromNS)
let q = (new Sql.Select())
.select(['samples.fingerprint', 'fingerprint'])
.from([`${DATABASE_NAME()}.metrics_15s`, 'samples'])
.where(tsClause)
q.join(`${DATABASE_NAME()}.time_series`, 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('any(JSONExtractKeysAndValues(time_series.labels, \'String\'))'), 'labels'])

q.ctx = {
step: stepNS / 1000000000
}

for (const streamSelectorRule of token.Children('log_stream_selector_rule')) {
q = streamSelectorReg[streamSelectorRule.Child('operator').value](streamSelectorRule, q)
}

const lra = token.Child('log_range_aggregation')
q = reg[lra.Child('log_range_aggregation_fn').value](lra, q)

const aggOp = token.Child('aggregation_operator')
if (aggOp) {
q = aggOpReg[aggOp.Child('aggregation_operator_fn').value](aggOp, q)
}

return q
}
43 changes: 42 additions & 1 deletion parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const Sql = require('@cloki/clickhouse-sql')
const { simpleAnd } = require('./registry/stream_selector_operator_registry/stream_selector_operator_registry')
const logger = require('../lib/logger')
const { QrynBadRequest } = require('../lib/handlers/errors')
const optimizations = require('./registry/smart_optimizations')

/**
* @param joinLabels {boolean}
Expand Down Expand Up @@ -74,6 +75,34 @@ module.exports.initQuery = (joinLabels) => {
return q
}

/**
* @param joinLabels {boolean}
* @returns {Select}
*/
module.exports.initQueryV3_2 = (joinLabels) => {
const from = new Sql.Parameter(sharedParamNames.from)
const to = new Sql.Parameter(sharedParamNames.to)
const tsClause = new Sql.Raw('')
tsClause.toString = () => {
if (to.get()) {
return Sql.between('samples.timestamp_ns', from, to).toString()
}
return Sql.Gt('samples.timestamp_ns', from).toString()
}
const q = (new Sql.Select())
.select(['samples.fingerprint', 'fingerprint'])
.from(['metrics_15s', 'samples'])
.where(tsClause)
.addParam(from)
.addParam(to)
if (joinLabels) {
q.join(`${DATABASE_NAME()}.time_series`, 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'])
}
return q
}

/**
*
* @param request {{
Expand All @@ -89,6 +118,12 @@ module.exports.initQuery = (joinLabels) => {
* @returns {{query: string, stream: (function (DataStream): DataStream)[], matrix: boolean, duration: number | undefined}}
*/
module.exports.transpile = (request) => {
const response = (query) => ({
query: request.rawQuery ? query : query.toString(),
matrix: !!query.ctx.matrix,
duration: query.ctx && query.ctx.duration ? query.ctx.duration : 1000,
stream: getStream(query)
})
const expression = compiler.ParseScript(request.query.trim())
if (!expression) {
throw new QrynBadRequest('invalid request')
Expand All @@ -109,6 +144,12 @@ module.exports.transpile = (request) => {
let end = BigInt(request.end || (BigInt(Date.now()) * BigInt(1e6)))
const step = BigInt(request.step ? Math.floor(parseFloat(request.step) * 1000) : 0) * BigInt(1e6)
*/
if (request.optimizations) {
const query = optimizations.apply(token, start * 1000000, end * 1000000, step * 1000000)
if (query) {
return response(query)
}
}
const joinLabels = ['unwrap_function', 'log_range_aggregation', 'aggregation_operator',
'agg_statement', 'user_macro', 'parser_expression', 'label_filter_pipeline',
'line_format_expression', 'labels_format_expression'].some(t => token.Child(t))
Expand Down Expand Up @@ -254,7 +295,7 @@ module.exports.transpileTail = (request) => {
query.order_expressions = []
query.orderBy(['timestamp_ns', 'asc'])
query.limit(undefined, undefined)
// logger.debug(query.toString())
//logger.debug(query.toString())
return {
query: request.rawRequest ? query : query.toString(),
stream: getStream(query)
Expand Down

0 comments on commit 0c1fa65

Please sign in to comment.