Skip to content

Commit

Permalink
fix(wasm): execute filter plugins in a consistent order
Browse files Browse the repository at this point in the history
Before this change, execution order of filter plugins was subject to
ordering returned by the underling DB implementation of
`kong.db.plugins:each()`.

This adds an extra step that sorts all discovered filter plugins by name
so that execution order is more consistent.
  • Loading branch information
flrgh committed Nov 28, 2024
1 parent a13ed4a commit a0f67f9
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 17 deletions.
50 changes: 37 additions & 13 deletions kong/runloop/wasm.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ local ipairs = ipairs
local type = type
local assert = assert
local insert = table.insert
local sort = table.sort
local cjson_encode = cjson.encode
local cjson_decode = cjson.decode
local fmt = string.format
Expand Down Expand Up @@ -116,6 +117,21 @@ local STATUS_ENABLED = "wasm support is enabled"
local ENABLED = false
local STATUS = STATUS_DISABLED

local function filter_plugin_compare(a, b)
if a.name ~= b.name then
return a.name < b.name
end

if a.updated_at and b.updated_at and a.updated_at ~= b.updated_at then
return a.updated_at < b.updated_at
end

if a.created_at and b.created_at and a.created_at ~= b.created_at then
return a.created_at < b.created_at
end

return a.id < b.id
end

local hash_chain
do
Expand Down Expand Up @@ -485,28 +501,36 @@ local function rebuild_state(db, version, old_state)

local plugin_pagesize = db.plugins.pagination.max_page_size

local filter_plugins = {}

for plugin, err in db.plugins:each(plugin_pagesize, GLOBAL_QUERY_OPTS) do
if err then
return nil, "failed iterating plugins: " .. tostring(err)
end

if _M.filters_by_name[plugin.name] and plugin.enabled then
local chain = get_or_insert_chain(chains, {
id = uuid.uuid(),
enabled = true,
route = plugin.route,
service = plugin.service,
filters = {},
})

insert(chain.filters, {
name = plugin.name,
enabled = true,
config = serialize_configuration(plugin.config),
})
insert(filter_plugins, plugin)
end
end

sort(filter_plugins, filter_plugin_compare)

for _, plugin in ipairs(filter_plugins) do
local chain = get_or_insert_chain(chains, {
id = uuid.uuid(),
enabled = true,
route = plugin.route,
service = plugin.service,
filters = {},
})

insert(chain.filters, {
name = plugin.name,
enabled = true,
config = serialize_configuration(plugin.config),
})
end

local routes = db.routes
local select_route = routes.select

Expand Down
202 changes: 198 additions & 4 deletions spec/02-integration/20-wasm/12-filters-as-plugins_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,23 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()


lazy_setup(function()
assert(helpers.file.copy(FILTER_PATH .. "/tests.wasm",
FILTER_PATH .. "/tests-01.wasm"))
assert(helpers.file.copy(FILTER_PATH .. "/tests.wasm",
FILTER_PATH .. "/tests-02.wasm"))

require("kong.runloop.wasm").enable({
{ name = "response_transformer",
path = FILTER_PATH .. "/response_transformer.wasm",
},
{
name = "tests-01",
path = FILTER_PATH .. "/tests-01.wasm",
},
{
name = "tests-02",
path = FILTER_PATH .. "/tests-02.wasm",
},
})

bp, db = helpers.get_db_utils(strategy, {
Expand All @@ -130,14 +143,14 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
"plugins",
})

helpers.start_kong({
assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
nginx_main_worker_processes = "2",
wasm = true,
wasm_filters = "response_transformer",
wasm_filters = "response_transformer,tests-01,tests-02",
plugins = "response-transformer",
})
}))

admin = helpers.admin_client()
proxy = helpers.proxy_client()
Expand All @@ -154,6 +167,8 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
end

helpers.stop_kong()
helpers.file.delete(FILTER_PATH .. "/tests-01.wasm")
helpers.file.delete(FILTER_PATH .. "/tests-02.wasm")
end)

before_each(function()
Expand Down Expand Up @@ -229,7 +244,6 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()

local expected = 4
assert.equals(expected, #json.data)
helpers.intercept(json.data)
local found = 0

for _, plugin in ipairs(json.data) do
Expand Down Expand Up @@ -339,6 +353,186 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
assert.equals(fc_value, assert.response(res).has.header(FILTER_CHAIN_HEADER))
end)
end)

describe("order of execution", function()
it("filter plugins execute at the end of any existing filter chain", function()
local lua_plugin = {
name = "response-transformer",
route = { id = route.id },
config = {
add = {
headers = {
"X-Added-By-Lua-Plugin:1",
"X-Replace-Me:lua",
"X-Append-Me:lua",
"X-Remove-Me:lua",
},
}
}
}

local plugin = {
name = "response_transformer",
route = { id = route.id },
config = cjson.encode({
add = {
headers = {
"X-Added-First:plugin",
"X-Added-By-Filter-Plugin:1",
"X-Not-Removed-By-Filter-Chain:plugin",
},
},
append = {
headers = {
"X-Append-Me:plugin",
},
},
replace = {
headers = {
"X-Replace-Me:plugin",
"X-Replaced-By-Filter-Plugin:plugin",
},
},
remove = {
headers = {
"X-Remove-Me",
"X-Removed-By-Filter-Plugin",
},
},
}),
}

local res, header, assert_no_header
do
function header(name)
return assert.response(res).has.header(name)
end

function assert_no_header(name)
return assert.response(res).has.no.header(name)
end
end

create_plugin(plugin)
create_plugin(lua_plugin)

helpers.wait_for_all_config_update()
res = proxy:get("/status/200")
assert.response(res).has.status(200)

-- sanity
assert.equals("1", header("X-Added-By-Filter-Plugin"))
assert.equals("1", header("X-Added-By-Lua-Plugin"))
assert_no_header("X-Remove-Me")

assert.equals("plugin", header("X-Added-First"))

-- added by Lua plugin, filter plugin appends
assert.same({ "lua", "plugin" }, header("X-Append-Me"))

-- replaced last by filter plugin
assert.same("plugin", header("X-Replace-Me"))

-- not replaced, because it was not added
assert_no_header("X-Replaced-By-Filter-Plugin")

local filter_chain = {
route = { id = route.id },
filters = {
{
name = "response_transformer",
config = cjson.encode({
add = {
headers = {
"X-Added-First:filter-chain",
"X-Added-By-Filter-Chain:1",
"X-Removed-By-Filter-Plugin:filter-chain",
"X-Replaced-By-Filter-Plugin:filter-chain",
},
},
append = {
headers = {
"X-Append-Me:filter-chain",
},
},
replace = {
headers = {
"X-Replace-Me:filter-chain",
"X-Replaced-By-Filter-Chain:filter-chain",
},
},
remove = {
headers = {
"X-Not-Removed-By-Filter-Chain",
},
},
}),
}
}
}

create_filter_chain(filter_chain)
helpers.wait_for_all_config_update()
res = proxy:get("/status/200")
assert.response(res).has.status(200)

-- sanity
assert.equals("1", header("X-Added-By-Filter-Plugin"))
assert.equals("1", header("X-Added-By-Lua-Plugin"))
assert.equals("1", header("X-Added-By-Filter-Chain"))
assert_no_header("X-Remove-Me")

-- added first by the filter chain
assert.equals("filter-chain", header("X-Added-First"))

-- added by Lua, appended to by filter chain and filter plugin
assert.same({ "lua", "filter-chain", "plugin" }, header("X-Append-Me"))
-- added after the filter chain tried to remove it
assert.same("plugin", header("X-Not-Removed-By-Filter-Chain"))

-- replaced last by filter plugin
assert.same("plugin", header("X-Replace-Me"))

assert_no_header("X-Removed-By-Filter-Plugin")
assert.same("plugin", header("X-Replaced-By-Filter-Plugin"))
end)

it("filter plugins execute in a consistent order", function()
-- should always run first because `tests-01` < `tests-02`
local plugin_1 = {
name = "tests-01",
config = "name=first",
route = { id = route.id },
}

local plugin_2 = {
name = "tests-02",
config = "name=last",
route = { id = route.id },
}

for _, order_added in ipairs({
{ plugin_1, plugin_2 },
{ plugin_2, plugin_1 },
}) do
bp.plugins:truncate()

create_plugin(order_added[1])
create_plugin(order_added[2])

helpers.wait_for_all_config_update()
local res = proxy:get("/status/200", {
headers = {
["X-PW-Phase"] = "request_headers",
["X-PW-Test"] = "dump_config",
}
})

local body = assert.res_status(200, res)
assert.equals("name=first", body)
end
end)
end)
end)

end -- each strategy
4 changes: 4 additions & 0 deletions spec/fixtures/proxy_wasm_filters/tests/src/test_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ impl TestHttp {
return self.send_http_dispatch(config);
}
"update_metrics" => self.update_metrics(),
"dump_config" => {
let res = self.config.as_ref().map(|config| config.to_string());
self.send_plain_response(StatusCode::OK, res.as_deref());
}
_ => (),
}
}
Expand Down
10 changes: 10 additions & 0 deletions spec/fixtures/proxy_wasm_filters/tests/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ impl FromStr for TestConfig {
}
}

impl std::fmt::Display for TestConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut values: Vec<String> = self.map.iter().map(|(k, v)| format!("{k}={v}")).collect();

values.sort();

write!(f, "{}", values.join(" "))
}
}

#[derive(Debug, Eq, PartialEq, enum_utils::FromStr)]
#[enumeration(rename_all = "snake_case")]
pub enum TestPhase {
Expand Down

0 comments on commit a0f67f9

Please sign in to comment.