Skip to content

Commit

Permalink
feat(prometheus): Proxy-Wasm metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
casimiro committed Oct 31, 2024
1 parent 30fcffb commit 636717d
Show file tree
Hide file tree
Showing 8 changed files with 608 additions and 2 deletions.
4 changes: 4 additions & 0 deletions changelog/unreleased/kong/prometheus-wasmx-metrics.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
message: |
**Prometheus**: Added support for Proxy-Wasm metrics.
type: feature
scope: Plugin
1 change: 1 addition & 0 deletions kong-3.9.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ build = {
["kong.plugins.prometheus.prometheus"] = "kong/plugins/prometheus/prometheus.lua",
["kong.plugins.prometheus.serve"] = "kong/plugins/prometheus/serve.lua",
["kong.plugins.prometheus.schema"] = "kong/plugins/prometheus/schema.lua",
["kong.plugins.prometheus.wasmx"] = "kong/plugins/prometheus/wasmx.lua",

["kong.plugins.session.handler"] = "kong/plugins/session/handler.lua",
["kong.plugins.session.schema"] = "kong/plugins/session/schema.lua",
Expand Down
8 changes: 6 additions & 2 deletions kong/plugins/prometheus/exporter.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
local balancer = require "kong.runloop.balancer"
local yield = require("kong.tools.yield").yield
local wasm = require "kong.plugins.prometheus.wasmx"


local kong = kong
local ngx = ngx
local get_phase = ngx.get_phase
local lower = string.lower
local ngx_timer_pending_count = ngx.timer.pending_count
local ngx_timer_running_count = ngx.timer.running_count
local balancer = require("kong.runloop.balancer")
local yield = require("kong.tools.yield").yield
local get_all_upstreams = balancer.get_all_upstreams
if not balancer.get_all_upstreams then -- API changed since after Kong 2.5
get_all_upstreams = require("kong.runloop.balancer.upstreams").get_all_upstreams
Expand Down Expand Up @@ -517,6 +520,7 @@ local function metric_data(write_fn)
-- notify the function if prometheus plugin is enabled,
-- so that it can avoid exporting unnecessary metrics if not
prometheus:metric_data(write_fn, not IS_PROMETHEUS_ENABLED)
wasm.metrics_data()
end

local function collect()
Expand Down
230 changes: 230 additions & 0 deletions kong/plugins/prometheus/wasmx.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
local buffer = require "string.buffer"
local wasm = require "kong.runloop.wasm"
local wasmx_shm


local str_sub = string.sub
local table_insert = table.insert
local table_sort = table.sort
local buf_new = buffer.new
local ngx_say = ngx.say
local ngx_re_match = ngx.re.match


local _M = {}


local FLUSH_EVERY = 100
local GET_METRIC_OPTS = { prefix = false }


local metrics_data_buf = buf_new()
local labels_serialization_buf = buf_new()
local sum_lines_buf = buf_new()
local count_lines_buf = buf_new()


local function sorted_iter(ctx, i)
i = i + 1

local v = ctx.t[ctx.sorted_keys[i]]

if v ~= nil then
return i, v
end
end


local function sorted_pairs(t)
local sorted_keys = {}

for k, _ in pairs(t) do
table_insert(sorted_keys, k)
end

table_sort(sorted_keys)

return sorted_iter, { t = t, sorted_keys = sorted_keys }, 0
end

--
-- Convert a pw_key into a pair of metric name and labels
--
-- pw_key follows the form `pw.<filter_name>.<metric_name>`
-- `<metric_name>` might contain labels, e.g. a_metric_label1="v1";
-- if it does, the position of the first label corresponds to the end of the
-- metric name and is used to discard labels from <metric_name>.
local function parse_pw_key(pw_key)
local m_name = pw_key
local m_labels = {}
local m_1st_label_pos = #pw_key

local matches = ngx_re_match(pw_key, [[pw\.(\w+)\.]])
local f_name = matches[1]
local f_meta = wasm.filter_meta[f_name] or {}
local l_patterns = f_meta.metrics and f_meta.metrics.label_patterns or {}

for _, pair in ipairs(l_patterns) do
local ctx = {}

matches = ngx_re_match(pw_key, pair.pattern, "", ctx)

if matches then
local l_pos, value = ctx.pos - #matches[1], matches[2]

table_insert(m_labels, { pair.label, value })

m_1st_label_pos = (l_pos < m_1st_label_pos) and l_pos or m_1st_label_pos
end
end

if m_1st_label_pos ~= #pw_key then
-- discarding labels from m_name
m_name = str_sub(pw_key, 1, m_1st_label_pos - 1)
end

return m_name, m_labels
end


local function parse_key(key)
-- TODO: parse wa. (WasmX metrics) and lua. (metrics defined in Lua land)
local name = key
local labels

if #key > 3 and key:sub(0, 3) == "pw." then
name, labels = parse_pw_key(key)
end

name = name:gsub("%.", "_")

return name, labels or {}
end


local function serialize_labels(labels)
labels_serialization_buf:reset()

for _, pair in ipairs(labels) do
labels_serialization_buf:putf(',%s="%s"', pair[1], pair[2])
end

labels_serialization_buf:skip(1) -- discard leading comma

return "{" .. labels_serialization_buf:get() .. "}"
end


local function serialize_metric(m, buf)
buf:putf("# HELP %s\n# TYPE %s %s", m.name, m.name, m.type)

if m.type == "histogram" then
sum_lines_buf:reset()
count_lines_buf:reset()

for _, pair in ipairs(m.labels) do
local count, sum = 0, 0
local labels, labeled_m = pair[1], pair[2]
local slabels, blabels = "", "{"

if #labels > 0 then
slabels = serialize_labels(labels)
blabels = slabels:sub(1, #slabels - 1) .. ","
end

for _, bin in ipairs(labeled_m.value) do
count = count + bin.count

buf:putf('\n%s%sle="%s"} %s',
m.name,
blabels,
(bin.ub ~= 4294967295 and bin.ub or "+Inf"),
count)
end

sum = sum + labeled_m.sum

sum_lines_buf:putf("\n%s_sum%s %s", m.name, slabels, sum)
count_lines_buf:putf("\n%s_count%s %s", m.name, slabels, count)
end

buf:put(sum_lines_buf:get())
buf:put(count_lines_buf:get())

else
assert(m.type == "gauge" or m.type == "counter", "unknown metric type")

for _, pair in ipairs(m.labels) do
local labels, labeled_m = pair[1], pair[2]
local slabels = (#labels > 0) and serialize_labels(labels) or ""

buf:putf("\n%s%s %s", m.name, slabels, labeled_m.value)
end
end

buf:put("\n")
end


local function require_wasmx()
if not wasmx_shm then
local ok, _wasmx_shm = pcall(require, "resty.wasmx.shm")
if ok then
wasmx_shm = _wasmx_shm
end
end
end


_M.metrics_data = function()
if not wasm.enabled() then
return
end

local metrics = {}
local parsed = {}

-- delayed require of the WasmX module, to ensure it is loaded
-- after ngx_wasm_module.so is loaded.
require_wasmx()

if not wasmx_shm then
return
end

wasmx_shm.metrics:lock()

for key in wasmx_shm.metrics:iterate_keys() do
local pair = { key, wasmx_shm.metrics:get_by_name(key, GET_METRIC_OPTS)}
table_insert(metrics, pair)
end

wasmx_shm.metrics:unlock()

-- in WasmX the different labels of a metric are stored as separate metrics;
-- aggregate those separate metrics into a single one.
for _, pair in ipairs(metrics) do
local key = pair[1]
local m = pair[2]
local name, labels = parse_key(key)

parsed[name] = parsed[name] or { name = name, type = m.type, labels = {} }

table_insert(parsed[name].labels, { labels, m })
end

metrics_data_buf:reset()

for i, metric_by_label in sorted_pairs(parsed) do
metrics_data_buf:put(serialize_metric(metric_by_label, metrics_data_buf))

if i % FLUSH_EVERY == 0 then
ngx_say(metrics_data_buf:get())
end
end

ngx_say(metrics_data_buf:get())
end


return _M
Loading

0 comments on commit 636717d

Please sign in to comment.