Skip to content

Commit

Permalink
clean sync/rpc.lua
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw committed Sep 29, 2024
1 parent 61550f3 commit 59f7194
Showing 1 changed file with 38 additions and 26 deletions.
64 changes: 38 additions & 26 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }


local pairs = pairs
local fmt = string.format
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
Expand Down Expand Up @@ -42,6 +43,10 @@ function _M:init_cp(manager)
-- { { namespace = "default", current_version = 1000, }, }
local purge_delay = manager.conf.cluster_data_plane_purge_delay

local function gen_delta_result(res, wipe)
return { default = { deltas = res, wipe = wipe, }, }
end

manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions)
ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)")

Expand All @@ -61,28 +66,31 @@ function _M:init_cp(manager)
return nil, "default namespace does not exist inside params"
end

local default_namespace_version = default_namespace.version

-- XXX TODO: follow update_sync_status() in control_plane.lua
local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, {
last_seen = ngx.time(),
hostname = node_id,
ip = kong.rpc:get_peer_ip(node_id), -- try to get the corret ip
version = "3.8.0.0", -- XXX TODO
ip = kong.rpc:get_peer_ip(node_id), -- try to get the correct ip
version = "3.8.0.0", -- XXX TODO: get from rpc call
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace.version),
config_hash = fmt("%032d", default_namespace_version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
}, { ttl = purge_delay })
if not ok then
ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err)
end

-- is the node empty? If so, just do a full sync to bring it up to date faster
if default_namespace.version == 0 or
self.strategy:get_latest_version() - default_namespace.version > FULL_SYNC_THRESHOLD
if default_namespace_version == 0 or
self.strategy:get_latest_version() - default_namespace_version > FULL_SYNC_THRESHOLD
then
-- we need to full sync because holes are found

ngx_log(ngx_INFO, "[kong.sync.v2] database is empty or too far behind for node_id: ", node_id,
", current_version: ", default_namespace.version,
ngx_log(ngx_INFO,
"[kong.sync.v2] database is empty or too far behind for node_id: ", node_id,
", current_version: ", default_namespace_version,
", forcing a full sync")


Expand All @@ -91,41 +99,45 @@ function _M:init_cp(manager)
return nil, err
end

return { default = { deltas = deltas, wipe = true, }, }
-- wipe dp lmdb, full sync
return gen_delta_result(deltas, true)
end

local res, err = self.strategy:get_delta(default_namespace.version)
local res, err = self.strategy:get_delta(default_namespace_version)
if not res then
return nil, err
end

if #res == 0 then
ngx_log(ngx_DEBUG, "[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", default_namespace.version,
ngx_log(ngx_DEBUG,
"[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", default_namespace_version,
", node is already up to date" )
return { default = { deltas = res, wipe = false, }, }
return gen_delta_result(res, false)
end

-- some deltas are returned, are they contiguous?
if res[1].version ~= default_namespace.version + 1 then
-- we need to full sync because holes are found
-- in the delta, meaning the oldest version is no longer
-- available

ngx_log(ngx_INFO, "[kong.sync.v2] delta for node_id no longer available: ", node_id,
", current_version: ", default_namespace.version,
", forcing a full sync")
if res[1].version == default_namespace.version + 1 then
-- doesn't wipe dp lmdb, incremental sync
return gen_delta_result(res, false)
end

-- we need to full sync because holes are found
-- in the delta, meaning the oldest version is no longer
-- available

local deltas, err = declarative.export_config_sync()
if not deltas then
return nil, err
end
ngx_log(ngx_INFO,
"[kong.sync.v2] delta for node_id no longer available: ", node_id,
", current_version: ", default_namespace_version,
", forcing a full sync")

return { default = { deltas = deltas, wipe = true, }, }
local deltas, err = declarative.export_config_sync()
if not deltas then
return nil, err
end

return { default = { deltas = res, wipe = false, }, }
-- wipe dp lmdb, full sync
return gen_delta_result(deltas, true)
end)
end

Expand Down

0 comments on commit 59f7194

Please sign in to comment.