diff --git a/bin/busted b/bin/busted index e0635aabd6f7..e59760322faf 100755 --- a/bin/busted +++ b/bin/busted @@ -19,6 +19,9 @@ local cert_path do busted_cert_content = busted_cert_content .. "\n" .. pl_file.read(system_cert_path) end + local cluster_cert_content = assert(pl_file.read("spec/fixtures/kong_clustering.crt")) + busted_cert_content = busted_cert_content .. "\n" .. cluster_cert_content + pl_file.write(busted_cert_file, busted_cert_content) cert_path = busted_cert_file end diff --git a/spec/01-unit/19-hybrid/04-rpc_spec.lua b/spec/01-unit/19-hybrid/04-rpc_spec.lua new file mode 100644 index 000000000000..880a8b31cf01 --- /dev/null +++ b/spec/01-unit/19-hybrid/04-rpc_spec.lua @@ -0,0 +1,89 @@ +-- by importing helpers, we initialize the kong PDK module +local helpers = require "spec.helpers" +local server = require("spec.helpers.rpc_mock.server") +local client = require("spec.helpers.rpc_mock.client") + +describe("rpc v2", function() + describe("full sync pagination", function() + describe("server side", function() + local server_mock + lazy_setup(function() + helpers.get_db_utils() + + server_mock = server.new() + assert(server_mock:start()) + + assert(helpers.start_kong({ + database = "off", + role = "data_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + log_level = "debug", + cluster_control_plane = "127.0.0.1:8005", + })) + end) + lazy_teardown(function() + server_mock:stop(true) + helpers.stop_kong(nil, true) + end) + + it("works", function() + -- the initial sync is flaky. let's trigger a sync by creating a service + local admin_client = helpers.admin_client() + assert.res_status(201, admin_client:send { + method = "POST", + path = "/services/", + body = { + name = "mockbin", + url = "http://mockbin.org", + }, + headers = { + ["Content-Type"] = "application/json", + }, + }) + + helpers.wait_until(function() + return server_mock.records and next(server_mock.records) + end,20) + end) + end) + + describe("client side", function() + local client_mock + lazy_setup(function() + helpers.get_db_utils() + + client_mock = assert(client.new()) + assert(helpers.start_kong({ + role = "control_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + })) + client_mock:start() + end) + lazy_teardown(function() + helpers.stop_kong(nil, true) + client_mock:stop() + end) + + it("works", function() + client_mock:wait_until_connected() + + local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + assert.is_nil(err) + assert.is_table(res and res.default and res.default.deltas) + + local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },}) + assert.is_string(err) + end) + end) + end) +end) diff --git a/spec/02-integration/01-helpers/05-rpc-mock_spec.lua b/spec/02-integration/01-helpers/05-rpc-mock_spec.lua new file mode 100644 index 000000000000..9139c329245d --- /dev/null +++ b/spec/02-integration/01-helpers/05-rpc-mock_spec.lua @@ -0,0 +1,101 @@ +local helpers = require "spec.helpers" +local server = require("spec.helpers.rpc_mock.server") +local client = require("spec.helpers.rpc_mock.client") + +local function trigger_change() + -- the initial sync is flaky. let's trigger a sync by creating a service + local admin_client = helpers.admin_client() + assert.res_status(201, admin_client:send { + method = "POST", + path = "/services/", + body = { + name = "mockbin", + url = "http://mockbin.org", + }, + headers = { + ["Content-Type"] = "application/json", + }, + }) +end + +describe("rpc mock/hook", function() + describe("server side", function() + local server_mock + lazy_setup(function() + helpers.get_db_utils() + + server_mock = server.new() + assert(server_mock:start()) + + assert(helpers.start_kong({ + database = "off", + role = "data_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + log_level = "debug", + cluster_control_plane = "127.0.0.1:8005", + })) + end) + lazy_teardown(function() + server_mock:stop(true) + helpers.stop_kong(nil, true) + end) + + it("recording", function() + trigger_change() + helpers.wait_until(function() + for _, record in pairs(server_mock.records or {}) do + return record.response and true + end + end,20) + end) + end) + + describe("client side", function() + local client_mock + lazy_setup(function() + helpers.get_db_utils() + + client_mock = assert(client.new()) + assert(helpers.start_kong({ + role = "control_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + })) + client_mock:start() + end) + lazy_teardown(function() + helpers.stop_kong(nil, true) + client_mock:stop() + end) + + it("works", function() + local new_version + client_mock.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, payload) + print(require("pl.pretty").write(payload)) + new_version = payload.default.new_version + return true + end) + client_mock:wait_until_connected() + trigger_change() + helpers.wait_until(function() + return new_version + end) + + local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + assert.is_nil(err) + assert.is_table(res and res.default and res.default.deltas) + + local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },}) + assert.is_string(err) + end) + end) +end) diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua new file mode 100644 index 000000000000..c2a2f3fffe77 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua @@ -0,0 +1,67 @@ +local rpc_mgr = require("kong.clustering.rpc.manager") +local kong_meta = require("kong.meta") + +local _M = { + PRIORITY = 1000, + VERSION = kong_meta.version, +} + +local original_callbacks = {} +local inc_id = 0 + +function _M.init_worker() + kong.rpc.callbacks:register("kong.rpc.debug.register", function(node_id, register_payload) + local proxy_apis = register_payload.proxy_apis + + for _, proxy_api in ipairs(proxy_apis) do + -- unregister and save the original callback + local original_cb + if not original_callbacks[proxy_api] then + original_callbacks[proxy_api] = kong.rpc.callbacks.callbacks[proxy_api] + end + original_cb = original_callbacks[proxy_api] + kong.rpc.callbacks.callbacks[proxy_api] = nil + + kong.log.info("hooking registering RPC proxy API: ", proxy_api) + kong.rpc.callbacks:register(proxy_api, function(client_id, payload) + local id = inc_id + inc_id = inc_id + 1 + kong.log.info("hooked proxy API ", proxy_api, " called by node: ", client_id) + kong.log.info("forwarding to node: ", node_id) + local res, err = kong.rpc:call(node_id, "kong.rpc.debug.mock", { call_id = id, method = proxy_api, node_id = client_id, payload = payload }) + if not res then + return nil, "Failed to proxy(" .. node_id .. "): " .. err + end + + if res.error then + return nil, res.error + end + + if res.prehook or res.posthook then + if res.prehook then + payload = res.args + end + + local origin_res, origin_err = original_cb(client_id, payload) + + if res.posthook then + res, err = kong.rpc:call(node_id, "kong.rpc.debug.posthook", { call_id = id, method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} }) + if not res then + return nil, "Failed to call post hook(" .. node_id .. "): " .. err + end + + return res.result, res.error + end + elseif res.mock then + return res.result, res.error + end + + return nil, "invalid response from proxy" + end) + end + + return true + end) +end + +return _M diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua new file mode 100644 index 000000000000..dc57731018bf --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua @@ -0,0 +1,12 @@ +return { + name = "rpc-debug", + fields = { + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +} diff --git a/spec/helpers.lua b/spec/helpers.lua index 22b67c4434d3..85993a421eb4 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -28,6 +28,30 @@ local server = reload_module("spec.internal.server") local client = reload_module("spec.internal.client") local wait = reload_module("spec.internal.wait") +-- redo the patches to apply the kong global patches +local _timerng + +_timerng = require("resty.timerng").new({ + min_threads = 16, + max_threads = 32, +}) + +_timerng:start() + +_G.timerng = _timerng + +_G.ngx.timer.at = function (delay, callback, ...) + return _timerng:at(delay, callback, ...) +end + +_G.ngx.timer.every = function (interval, callback, ...) + return _timerng:every(interval, callback, ...) +end + +if not kong.timer then + kong.timer = _timerng +end + ---------------- -- Variables/constants diff --git a/spec/helpers/rpc_mock/client.lua b/spec/helpers/rpc_mock/client.lua new file mode 100644 index 000000000000..f86becd39c4c --- /dev/null +++ b/spec/helpers/rpc_mock/client.lua @@ -0,0 +1,58 @@ +-- by importing helpers, we ensure the kong PDK module is initialized +local helpers = require "spec.helpers" +local rpc_mgr = require("kong.clustering.rpc.manager") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert + +local _M = {} +local _MT = { __index = _M, } + +local default_dp_conf = { + role = "data_plane", + cluster_control_plane = "localhost:8005", +} + +setmetatable(default_dp_conf, { __index = default_cert }) +local default_meta = { __index = default_dp_conf, } + +local function do_nothing() end + +local function client_stop(rpc_mgr) + -- a hacky way to stop rpc_mgr from reconnecting + rpc_mgr.try_connect = do_nothing + + -- this will stop all connections + for _, socket in pairs(rpc_mgr.clients) do + for conn in pairs(socket) do + pcall(conn.stop, conn) + end + end +end + +local function client_is_connected(rpc_mgr) + for _, socket in pairs(rpc_mgr.clients) do + for conn in pairs(socket) do + return true + end + end + return false +end + +local function client_wait_until_connected(rpc_mgr, timeout) + return helpers.wait_until(function() + return rpc_mgr:is_connected() + end, timeout or 15) +end + +-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds +function _M.new(opts) + opts = opts or {} + setmetatable(opts, default_meta) + local ret = rpc_mgr.new(default_dp_conf, opts.name or "dp") + ret.stop = client_stop + ret.is_connected = client_is_connected + ret.start = ret.try_connect + ret.wait_until_connected = client_wait_until_connected + return ret +end + +return _M diff --git a/spec/helpers/rpc_mock/default.lua b/spec/helpers/rpc_mock/default.lua new file mode 100644 index 000000000000..caadca8d8a6e --- /dev/null +++ b/spec/helpers/rpc_mock/default.lua @@ -0,0 +1,10 @@ +local default_cert = { + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", +} + +return { + default_cert = default_cert, +} diff --git a/spec/helpers/rpc_mock/server.lua b/spec/helpers/rpc_mock/server.lua new file mode 100644 index 000000000000..67608849f277 --- /dev/null +++ b/spec/helpers/rpc_mock/server.lua @@ -0,0 +1,199 @@ +local helpers = require("spec.helpers") +local client = require("spec.helpers.rpc_mock.client") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert + +local _M = {} +local _MT = { __index = _M, } + +-- this function will start a control plane server with the given configuration +-- and attach a debugger to it +-- set attaching to true to attach to an existing control plane +function _M.new(opts) + opts = opts or {} + opts.prefix = opts.prefix or "servroot_rpc_tap" + opts.role = "control_plane" + opts.plugins = "bundled,rpc-debug" + opts.cluster_listen = opts.cluster_listen or "127.0.0.1:8005" + opts.mocks = opts.mocks or {} + opts.prehooks = opts.prehooks or {} + opts.posthooks = opts.posthooks or {} + opts.cluster_rpc = "on" + opts.cluster_rpc_sync = opts.cluster_rpc_sync or "on" + if opts.interception == nil then + opts.interception = true + end + + for k, v in pairs(default_cert) do + if opts[k] == nil then + opts[k] = v + end + end + + return setmetatable(opts, _MT) +end + +function _M.start(self) + if not self.attaching then + local bp, db = helpers.get_db_utils(strategy, nil, { "rpc-debug" }) + + local plugin = db.plugins:insert({ + name = "rpc-debug", + config = {}, + }) + + assert(helpers.start_kong(self)) + end + + self.debugger_client = client.new({ + cluster_control_plane = self.cluster_listen, + }) + + -- install default interception handlers + if self.interception then + self:enable_inception() + end + + -- attached control plane will call this method when a hooked/mocked RPC is called. + -- this RPC handles both prehook and mock, and response to the control plane: + -- 1. if the RPC is mocked, return the mock result; + -- 2. if the RPC has a prehook, manipulate the args and returns them, and tell if a posthook is present and pending call + self.debugger_client.callbacks:register("kong.rpc.debug.mock", function(proxy_id, proxy_payload) + local method, node_id, payload, call_id = + proxy_payload.method, proxy_payload.node_id, proxy_payload.payload, proxy_payload.call_id + local cb = self.mocks[method] + if cb then + local res, err = cb(node_id, payload, proxy_id, self) + return { + mock = true, + result = res, + error = err, + } + end + + local prehook = self.prehooks[method] or self.prehooks["*"] + local posthook = self.posthooks[method] or self.posthooks["*"] + local result = { + prehook = prehook and true, + posthook = posthook and true, + } + + if prehook then + local res, err = prehook(node_id, payload, proxy_id, self, method, call_id) + if not res then + return nil, err + end + + result.args = res + end + + return result + end) + + self.debugger_client.callbacks:register("kong.rpc.debug.posthook", function(proxy_id, proxy_payload) + local method, node_id, payload, call_id = + proxy_payload.method, proxy_payload.node_id, proxy_payload.payload, proxy_payload.call_id + local cb = self.posthooks[method] or self.posthooks["*"] + if not cb then + return nil, "no callback registered for method: " .. method + end + + return cb(node_id, payload, proxy_id, self, method, call_id) + end) + + self.debugger_client:start() + self.debugger_client:wait_until_connected() + + if next(self.mocks) or next(self.prehooks) or next(self.posthooks) then + return self:attach_debugger() + end + + return true +end + +function _M:attach_debugger() + local hooked = {} + for api_name, cb in pairs(self.mocks) do + hooked[api_name] = true + end + for api_name, cb in pairs(self.prehooks) do + hooked[api_name] = true + end + for api_name, cb in pairs(self.posthooks) do + hooked[api_name] = true + end + + local hooked_list + if hooked["*"] then + hooked_list = { + "kong.sync.v2.get_delta", + } + else + hooked_list = {} + for api_name, _ in pairs(hooked) do + hooked_list[#hooked_list + 1] = api_name + end + end + + return self.debugger_client:call("control_plane", "kong.rpc.debug.register", { + proxy_apis = hooked_list, + }) +end + +function _M:mock_api(api_name, cb) + self.mocks[api_name] = cb +end + +function _M:prehook_api(api_name, cb) + self.prehooks[api_name] = cb +end + +function _M:posthook_api(api_name, cb) + self.posthooks[api_name] = cb +end + +local function get_records(server) + local records = server.records + if not records then + records = {} + server.records = records + end + return records +end + +-- TODO: add req ID for correlation +local function default_inception_prehook(node_id, payload, proxy_id, server, method, call_id) + local records = get_records(server) + records[call_id] = { + request = payload, + node_id = node_id, + proxy_id = proxy_id, + method = method, + } + return payload +end + +local function default_inception_posthook(node_id, payload, proxy_id, server, method, call_id) + local records = get_records(server) + local record = records[call_id] + if not record then + print("no record found for call_id: ", call_id) + return payload + end + record.response = payload + return payload +end + +function _M:enable_inception() + self.prehooks["*"] = default_inception_prehook + self.posthooks["*"] = default_inception_posthook +end + +function _M:stop(...) + if not self.attaching then + helpers.stop_kong(self.prefix, ...) + end + + self.debugger_client:stop() +end + +return _M