Skip to content

Commit

Permalink
fix(wasm): execute filter plugins in a consistent order (#13946)
Browse files Browse the repository at this point in the history
Before this change, execution order of filter plugins was subject to
ordering returned by the underling DB implementation of
`kong.db.plugins:each()`.

This adds an extra step that sorts all discovered filter plugins by name
so that execution order is more consistent.
  • Loading branch information
flrgh authored and ProBrian committed Dec 13, 2024
1 parent 7a2dded commit 4277143
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 17 deletions.
50 changes: 37 additions & 13 deletions kong/runloop/wasm.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ local ipairs = ipairs
local type = type
local assert = assert
local insert = table.insert
local sort = table.sort
local cjson_encode = cjson.encode
local cjson_decode = cjson.decode
local fmt = string.format
Expand Down Expand Up @@ -116,6 +117,21 @@ local STATUS_ENABLED = "wasm support is enabled"
local ENABLED = false
local STATUS = STATUS_DISABLED

local function filter_plugin_compare(a, b)
if a.name ~= b.name then
return a.name < b.name
end

if a.updated_at and b.updated_at and a.updated_at ~= b.updated_at then
return a.updated_at < b.updated_at
end

if a.created_at and b.created_at and a.created_at ~= b.created_at then
return a.created_at < b.created_at
end

return a.id < b.id
end

local hash_chain
do
Expand Down Expand Up @@ -485,28 +501,36 @@ local function rebuild_state(db, version, old_state)

local plugin_pagesize = db.plugins.pagination.max_page_size

local filter_plugins = {}

for plugin, err in db.plugins:each(plugin_pagesize, GLOBAL_QUERY_OPTS) do
if err then
return nil, "failed iterating plugins: " .. tostring(err)
end

if _M.filters_by_name[plugin.name] and plugin.enabled then
local chain = get_or_insert_chain(chains, {
id = uuid.uuid(),
enabled = true,
route = plugin.route,
service = plugin.service,
filters = {},
})

insert(chain.filters, {
name = plugin.name,
enabled = true,
config = serialize_configuration(plugin.config),
})
insert(filter_plugins, plugin)
end
end

sort(filter_plugins, filter_plugin_compare)

for _, plugin in ipairs(filter_plugins) do
local chain = get_or_insert_chain(chains, {
id = uuid.uuid(),
enabled = true,
route = plugin.route,
service = plugin.service,
filters = {},
})

insert(chain.filters, {
name = plugin.name,
enabled = true,
config = serialize_configuration(plugin.config),
})
end

local routes = db.routes
local select_route = routes.select

Expand Down
202 changes: 198 additions & 4 deletions spec/02-integration/20-wasm/12-filters-as-plugins_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,23 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()


lazy_setup(function()
assert(helpers.file.copy(FILTER_PATH .. "/tests.wasm",
FILTER_PATH .. "/tests-01.wasm"))
assert(helpers.file.copy(FILTER_PATH .. "/tests.wasm",
FILTER_PATH .. "/tests-02.wasm"))

require("kong.runloop.wasm").enable({
{ name = "response_transformer",
path = FILTER_PATH .. "/response_transformer.wasm",
},
{
name = "tests-01",
path = FILTER_PATH .. "/tests-01.wasm",
},
{
name = "tests-02",
path = FILTER_PATH .. "/tests-02.wasm",
},
})

bp, db = helpers.get_db_utils(strategy, {
Expand All @@ -130,14 +143,14 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
"plugins",
})

helpers.start_kong({
assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
nginx_main_worker_processes = "2",
wasm = true,
wasm_filters = "response_transformer",
wasm_filters = "response_transformer,tests-01,tests-02",
plugins = "response-transformer",
})
}))

admin = helpers.admin_client()
proxy = helpers.proxy_client()
Expand All @@ -154,6 +167,8 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
end

helpers.stop_kong()
helpers.file.delete(FILTER_PATH .. "/tests-01.wasm")
helpers.file.delete(FILTER_PATH .. "/tests-02.wasm")
end)

before_each(function()
Expand Down Expand Up @@ -229,7 +244,6 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()

local expected = 4
assert.equals(expected, #json.data)
helpers.intercept(json.data)
local found = 0

for _, plugin in ipairs(json.data) do
Expand Down Expand Up @@ -339,6 +353,186 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
assert.equals(fc_value, assert.response(res).has.header(FILTER_CHAIN_HEADER))
end)
end)

describe("order of execution", function()
it("filter plugins execute at the end of any existing filter chain", function()
local lua_plugin = {
name = "response-transformer",
route = { id = route.id },
config = {
add = {
headers = {
"X-Added-By-Lua-Plugin:1",
"X-Replace-Me:lua",
"X-Append-Me:lua",
"X-Remove-Me:lua",
},
}
}
}

local plugin = {
name = "response_transformer",
route = { id = route.id },
config = cjson.encode({
add = {
headers = {
"X-Added-First:plugin",
"X-Added-By-Filter-Plugin:1",
"X-Not-Removed-By-Filter-Chain:plugin",
},
},
append = {
headers = {
"X-Append-Me:plugin",
},
},
replace = {
headers = {
"X-Replace-Me:plugin",
"X-Replaced-By-Filter-Plugin:plugin",
},
},
remove = {
headers = {
"X-Remove-Me",
"X-Removed-By-Filter-Plugin",
},
},
}),
}

local res, header, assert_no_header
do
function header(name)
return assert.response(res).has.header(name)
end

function assert_no_header(name)
return assert.response(res).has.no.header(name)
end
end

create_plugin(plugin)
create_plugin(lua_plugin)

helpers.wait_for_all_config_update()
res = proxy:get("/status/200")
assert.response(res).has.status(200)

-- sanity
assert.equals("1", header("X-Added-By-Filter-Plugin"))
assert.equals("1", header("X-Added-By-Lua-Plugin"))
assert_no_header("X-Remove-Me")

assert.equals("plugin", header("X-Added-First"))

-- added by Lua plugin, filter plugin appends
assert.same({ "lua", "plugin" }, header("X-Append-Me"))

-- replaced last by filter plugin
assert.same("plugin", header("X-Replace-Me"))

-- not replaced, because it was not added
assert_no_header("X-Replaced-By-Filter-Plugin")

local filter_chain = {
route = { id = route.id },
filters = {
{
name = "response_transformer",
config = cjson.encode({
add = {
headers = {
"X-Added-First:filter-chain",
"X-Added-By-Filter-Chain:1",
"X-Removed-By-Filter-Plugin:filter-chain",
"X-Replaced-By-Filter-Plugin:filter-chain",
},
},
append = {
headers = {
"X-Append-Me:filter-chain",
},
},
replace = {
headers = {
"X-Replace-Me:filter-chain",
"X-Replaced-By-Filter-Chain:filter-chain",
},
},
remove = {
headers = {
"X-Not-Removed-By-Filter-Chain",
},
},
}),
}
}
}

create_filter_chain(filter_chain)
helpers.wait_for_all_config_update()
res = proxy:get("/status/200")
assert.response(res).has.status(200)

-- sanity
assert.equals("1", header("X-Added-By-Filter-Plugin"))
assert.equals("1", header("X-Added-By-Lua-Plugin"))
assert.equals("1", header("X-Added-By-Filter-Chain"))
assert_no_header("X-Remove-Me")

-- added first by the filter chain
assert.equals("filter-chain", header("X-Added-First"))

-- added by Lua, appended to by filter chain and filter plugin
assert.same({ "lua", "filter-chain", "plugin" }, header("X-Append-Me"))
-- added after the filter chain tried to remove it
assert.same("plugin", header("X-Not-Removed-By-Filter-Chain"))

-- replaced last by filter plugin
assert.same("plugin", header("X-Replace-Me"))

assert_no_header("X-Removed-By-Filter-Plugin")
assert.same("plugin", header("X-Replaced-By-Filter-Plugin"))
end)

it("filter plugins execute in a consistent order", function()
-- should always run first because `tests-01` < `tests-02`
local plugin_1 = {
name = "tests-01",
config = "name=first",
route = { id = route.id },
}

local plugin_2 = {
name = "tests-02",
config = "name=last",
route = { id = route.id },
}

for _, order_added in ipairs({
{ plugin_1, plugin_2 },
{ plugin_2, plugin_1 },
}) do
bp.plugins:truncate()

create_plugin(order_added[1])
create_plugin(order_added[2])

helpers.wait_for_all_config_update()
local res = proxy:get("/status/200", {
headers = {
["X-PW-Phase"] = "request_headers",
["X-PW-Test"] = "dump_config",
}
})

local body = assert.res_status(200, res)
assert.equals("name=first", body)
end
end)
end)
end)

end -- each strategy
4 changes: 4 additions & 0 deletions spec/fixtures/proxy_wasm_filters/tests/src/test_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ impl TestHttp {
return self.send_http_dispatch(config);
}
"update_metrics" => self.update_metrics(),
"dump_config" => {
let res = self.config.as_ref().map(|config| config.to_string());
self.send_plain_response(StatusCode::OK, res.as_deref());
}
_ => (),
}
}
Expand Down
10 changes: 10 additions & 0 deletions spec/fixtures/proxy_wasm_filters/tests/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ impl FromStr for TestConfig {
}
}

impl std::fmt::Display for TestConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut values: Vec<String> = self.map.iter().map(|(k, v)| format!("{k}={v}")).collect();

values.sort();

write!(f, "{}", values.join(" "))
}
}

#[derive(Debug, Eq, PartialEq, enum_utils::FromStr)]
#[enumeration(rename_all = "snake_case")]
pub enum TestPhase {
Expand Down

0 comments on commit 4277143

Please sign in to comment.