-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c77a178
commit 419e823
Showing
9 changed files
with
635 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
local helpers = require("spec.helpers") | ||
local server = require("spec.helpers.rpc_mock.server") | ||
local client = require("spec.helpers.rpc_mock.client") | ||
local get_node_id = helpers.get_node_id | ||
|
||
local function trigger_change() | ||
-- the initial sync is flaky. let's trigger a sync by creating a service | ||
local admin_client = helpers.admin_client() | ||
assert.res_status(201, admin_client:send { | ||
method = "POST", | ||
path = "/services/", | ||
body = { | ||
url = "http://example.com", | ||
}, | ||
headers = { | ||
["Content-Type"] = "application/json", | ||
}, | ||
}) | ||
end | ||
|
||
describe("rpc mock/hook", function() | ||
local recover | ||
lazy_setup(function() | ||
if not kong.worker_events then | ||
helpers.patch_worker_events() | ||
recover = true | ||
end | ||
end) | ||
|
||
lazy_teardown(function() | ||
if recover then | ||
kong.worker_events = nil | ||
end | ||
end) | ||
|
||
describe("server side", function() | ||
local server_mock | ||
|
||
lazy_setup(function() | ||
helpers.get_db_utils() | ||
|
||
server_mock = server.new() | ||
assert(server_mock:start()) | ||
|
||
assert(helpers.start_kong({ | ||
database = "off", | ||
role = "data_plane", | ||
cluster_mtls = "shared", | ||
cluster_cert = "spec/fixtures/kong_clustering.crt", | ||
cluster_cert_key = "spec/fixtures/kong_clustering.key", | ||
nginx_conf = "spec/fixtures/custom_nginx.template", | ||
cluster_rpc = "on", | ||
cluster_rpc_sync = "on", | ||
log_level = "debug", | ||
cluster_control_plane = "127.0.0.1:8005", | ||
})) | ||
end) | ||
|
||
lazy_teardown(function() | ||
server_mock:stop(true) | ||
helpers.stop_kong(nil, true) | ||
end) | ||
|
||
it("recording", function() | ||
trigger_change() | ||
|
||
local record = server_mock:wait_for_call() | ||
assert.is_table(record.response.result.default.deltas) | ||
end) | ||
|
||
it("mock", function() | ||
local client_version | ||
server_mock:mock("kong.sync.v2.get_delta", function(node_id, payload) | ||
client_version = payload.default.version | ||
return { default = { version = 100, deltas = {} } } | ||
end) | ||
server_mock:attach_debugger() | ||
|
||
local dp_id = get_node_id("servroot") | ||
|
||
server_mock:wait_for_node(dp_id) | ||
|
||
assert(server_mock:call(dp_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } })) | ||
|
||
-- the mock should have been called | ||
helpers.wait_until(function() | ||
return client_version | ||
end, 20) | ||
end) | ||
end) | ||
|
||
describe("client side", function() | ||
local client_mock | ||
local called = false | ||
|
||
lazy_setup(function() | ||
helpers.get_db_utils() | ||
|
||
client_mock = assert(client.new()) | ||
assert(helpers.start_kong({ | ||
role = "control_plane", | ||
cluster_mtls = "shared", | ||
cluster_cert = "spec/fixtures/kong_clustering.crt", | ||
cluster_cert_key = "spec/fixtures/kong_clustering.key", | ||
nginx_conf = "spec/fixtures/custom_nginx.template", | ||
cluster_rpc = "on", | ||
cluster_rpc_sync = "on", | ||
})) | ||
|
||
client_mock.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, payload) | ||
called = true | ||
end) | ||
|
||
client_mock:start() | ||
client_mock:wait_until_connected() | ||
end) | ||
|
||
lazy_teardown(function() | ||
helpers.stop_kong(nil, true) | ||
client_mock:stop() | ||
end) | ||
|
||
it("client->CP", function() | ||
local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) | ||
assert.is_nil(err) | ||
assert.is_table(res and res.default and res.default.deltas) | ||
|
||
local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },}) | ||
assert.is_string(err) | ||
assert.is_nil(res) | ||
end) | ||
|
||
it("CP->client", function() | ||
-- this registers the data plane node | ||
local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) | ||
assert.is_nil(err) | ||
assert.is_table(res and res.default and res.default.deltas) | ||
|
||
trigger_change() | ||
|
||
helpers.wait_until(function() | ||
return called | ||
end, 20) | ||
end) | ||
end) | ||
end) |
76 changes: 76 additions & 0 deletions
76
spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
local kong_meta = require("kong.meta") | ||
|
||
local _M = { | ||
PRIORITY = 1000, | ||
VERSION = kong_meta.version, | ||
} | ||
|
||
local original_callbacks = {} | ||
local inc_id = 0 | ||
|
||
function _M.init_worker() | ||
kong.rpc.callbacks:register("kong.rpc.debug.register", function(node_id, register_payload) | ||
local proxy_apis = register_payload.proxy_apis | ||
|
||
for _, proxy_api in ipairs(proxy_apis) do | ||
-- unregister and save the original callback | ||
local original_cb | ||
if not original_callbacks[proxy_api] then | ||
original_callbacks[proxy_api] = kong.rpc.callbacks.callbacks[proxy_api] | ||
end | ||
original_cb = original_callbacks[proxy_api] | ||
kong.rpc.callbacks.callbacks[proxy_api] = nil | ||
|
||
kong.log.info("hooking registering RPC proxy API: ", proxy_api) | ||
kong.rpc.callbacks:register(proxy_api, function(client_id, payload) | ||
local id = inc_id | ||
inc_id = inc_id + 1 | ||
kong.log.info("hooked proxy API ", proxy_api, " called by node: ", client_id) | ||
kong.log.info("forwarding to node: ", node_id) | ||
local res, err = kong.rpc:call(node_id, "kong.rpc.debug.mock", { call_id = id, method = proxy_api, node_id = client_id, payload = payload }) | ||
if not res then | ||
return nil, "Failed to proxy(" .. node_id .. "): " .. err | ||
end | ||
|
||
if res.error then | ||
return nil, res.error | ||
end | ||
|
||
if res.prehook or res.posthook then | ||
if res.prehook then | ||
payload = res.args | ||
end | ||
|
||
local origin_res, origin_err = original_cb(client_id, payload) | ||
|
||
if res.posthook then | ||
res, err = kong.rpc:call(node_id, "kong.rpc.debug.posthook", { call_id = id, method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} }) | ||
if not res then | ||
return nil, "Failed to call post hook(" .. node_id .. "): " .. err | ||
end | ||
|
||
return res.result, res.error | ||
end | ||
elseif res.mock then | ||
return res.result, res.error | ||
end | ||
|
||
return nil, "invalid response from proxy" | ||
end) | ||
end | ||
|
||
return true | ||
end) | ||
|
||
kong.rpc.callbacks:register("kong.rpc.debug.call", function(node_id, payload) | ||
local res, err = kong.rpc:call(payload.node_id, payload.method, payload.args) | ||
return res, err | ||
end) | ||
|
||
kong.rpc.callbacks:register("kong.rpc.debug.lua_code", function(node_id, payload) | ||
local code = assert(loadstring(payload)) | ||
return code() | ||
end) | ||
end | ||
|
||
return _M |
12 changes: 12 additions & 0 deletions
12
spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
return { | ||
name = "rpc-debug", | ||
fields = { | ||
{ | ||
config = { | ||
type = "record", | ||
fields = { | ||
}, | ||
}, | ||
}, | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
-- by importing helpers, we ensure the kong PDK module is initialized | ||
local helpers = require "spec.helpers" | ||
local rpc_mgr = require("kong.clustering.rpc.manager") | ||
local default_cert = require("spec.helpers.rpc_mock.default").default_cert | ||
local uuid = require "kong.tools.uuid" | ||
|
||
|
||
local _M = {} | ||
|
||
|
||
local default_dp_conf = { | ||
role = "data_plane", | ||
cluster_control_plane = "localhost:8005", | ||
} | ||
|
||
setmetatable(default_dp_conf, { __index = default_cert }) | ||
local default_meta = { __index = default_dp_conf, } | ||
|
||
|
||
local function do_nothing() end | ||
|
||
|
||
local function client_stop(rpc_mgr) | ||
-- a hacky way to stop rpc_mgr from reconnecting | ||
rpc_mgr.try_connect = do_nothing | ||
|
||
-- this will stop all connections | ||
for _, socket in pairs(rpc_mgr.clients) do | ||
for conn in pairs(socket) do | ||
pcall(conn.stop, conn) | ||
end | ||
end | ||
end | ||
|
||
|
||
local function client_is_connected(rpc_mgr) | ||
for _, socket in pairs(rpc_mgr.clients) do | ||
if next(socket) then | ||
return true | ||
end | ||
end | ||
return false | ||
end | ||
|
||
|
||
local function client_wait_until_connected(rpc_mgr, timeout) | ||
return helpers.wait_until(function() | ||
return rpc_mgr:is_connected() | ||
end, timeout or 15) | ||
end | ||
|
||
|
||
-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds | ||
function _M.new(opts) | ||
opts = opts or {} | ||
setmetatable(opts, default_meta) | ||
local ret = rpc_mgr.new(default_dp_conf, opts.name or uuid.uuid()) | ||
|
||
ret.stop = client_stop | ||
ret.is_connected = client_is_connected | ||
ret.start = ret.try_connect | ||
ret.wait_until_connected = client_wait_until_connected | ||
|
||
return ret | ||
end | ||
|
||
|
||
return _M |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
local default_cert = { | ||
cluster_mtls = "shared", | ||
cluster_cert = "spec/fixtures/kong_clustering.crt", | ||
cluster_cert_key = "spec/fixtures/kong_clustering.key", | ||
nginx_conf = "spec/fixtures/custom_nginx.template", | ||
} | ||
|
||
return { | ||
default_cert = default_cert, | ||
} |
Oops, something went wrong.