Skip to content

Commit

Permalink
fix(server): Implement SCRIPT GC command (#3431)
Browse files Browse the repository at this point in the history
* fix(server): Implement SCRIPT GC command
  • Loading branch information
dranikpg authored Aug 2, 2024
1 parent f652f10 commit 82298b8
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 3 deletions.
14 changes: 14 additions & 0 deletions src/core/interpreter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,10 @@ void Interpreter::ResetStack() {
lua_settop(lua_, 0);
}

void Interpreter::RunGC() {
lua_gc(lua_, LUA_GCCOLLECT);
}

// Returns number of results, which is always 1 in this case.
// Please note that lua resets the stack once the function returns so no need
// to unwind the stack manually in the function (though lua allows doing this).
Expand Down Expand Up @@ -1104,4 +1108,14 @@ void InterpreterManager::Reset() {
VLOG(1) << "InterpreterManager::Reset ended";
}

void InterpreterManager::Alter(std::function<void(Interpreter*)> modf) {
vector<Interpreter*> taken;
swap(taken, available_); // swap data because modf can preempt

for (Interpreter* ir : taken) {
modf(ir);
Return(ir);
}
}

} // namespace dfly
5 changes: 5 additions & 0 deletions src/core/interpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class Interpreter {

void ResetStack();

void RunGC();

// fp must point to buffer with at least 41 chars.
// fp[40] will be set to '\0'.
static void FuncSha1(std::string_view body, char* fp);
Expand Down Expand Up @@ -166,6 +168,9 @@ class InterpreterManager {
// Clear all interpreters, keeps capacity. Waits until all are returned.
void Reset();

// Run on all unused interpreters. Those are marked as used at once, so the callback can preempt
void Alter(std::function<void(Interpreter*)> modf);

static Stats& tl_stats();

private:
Expand Down
20 changes: 17 additions & 3 deletions src/server/script_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,17 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
"LOAD <script>",
" Load a script into the scripts cache without executing it.",
"FLAGS <sha> [flags ...]",
" Set specific flags for script. Can be called before the sript is loaded."
" Set specific flags for script. Can be called before the sript is loaded.",
" The following flags are possible: ",
" - Use 'allow-undeclared-keys' to allow accessing undeclared keys",
" - Use 'disable-atomicity' to allow running scripts non-atomically",
"LIST",
" Lists loaded scripts.",
"LATENCY",
" Prints latency histograms in usec for every called function.",
"HELP"
"GC",
" Invokes garbage collection on all unused interpreter instances.",
"HELP",
" Prints this help."};
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
return rb->SendSimpleStrArr(kHelp);
Expand All @@ -104,6 +106,9 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
if (subcmd == "FLAGS" && args.size() > 2)
return ConfigCmd(args, cntx);

if (subcmd == "GC")
return GCCmd(cntx);

string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try SCRIPT HELP.");
cntx->SendError(err, kSyntaxErrType);
Expand All @@ -122,7 +127,6 @@ void ScriptMgr::ExistsCmd(CmdArgList args, ConnectionContext* cntx) const {
for (uint8_t v : res) {
rb->SendLong(v);
}
return;
}

void ScriptMgr::FlushCmd(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -207,6 +211,16 @@ void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
}
}

void ScriptMgr::GCCmd(ConnectionContext* cntx) const {
auto cb = [](Interpreter* ir) {
ir->RunGC();
ThisFiber::Yield();
};
shard_set->pool()->AwaitFiberOnAll(
[cb](auto* pb) { ServerState::tlocal()->AlterInterpreters(cb); });
return cntx->SendOk();
}

// Check if script starts with shebang (#!lua). If present, look for flags parameter and truncate
// it.
io::Result<optional<ScriptMgr::ScriptParams>, GenericError> DeduceParams(string_view* body) {
Expand Down
1 change: 1 addition & 0 deletions src/server/script_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ScriptMgr {
void ConfigCmd(CmdArgList args, ConnectionContext* cntx);
void ListCmd(ConnectionContext* cntx) const;
void LatencyCmd(ConnectionContext* cntx) const;
void GCCmd(ConnectionContext* cntx) const;

void UpdateScriptCaches(ScriptKey sha, ScriptParams params) const;

Expand Down
4 changes: 4 additions & 0 deletions src/server/server_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ void ServerState::ResetInterpreter() {
interpreter_mgr_.Reset();
}

void ServerState::AlterInterpreters(std::function<void(Interpreter*)> modf) {
interpreter_mgr_.Alter(std::move(modf));
}

ServerState* ServerState::SafeTLocal() {
// https://stackoverflow.com/a/75622732
asm volatile("");
Expand Down
4 changes: 4 additions & 0 deletions src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ class ServerState { // public struct - to allow initialization.

void ResetInterpreter();

// Invoke function on all free interpreters. They are marked atomically as
// used and the function is allowed to suspend.
void AlterInterpreters(std::function<void(Interpreter*)> modf);

// Returns sum of all requests in the last 6 seconds
// (not including the current one).
uint32_t MovingSum6() const {
Expand Down
23 changes: 23 additions & 0 deletions tests/dragonfly/eval_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,26 @@ async def run():

tasks = [asyncio.create_task(run()) for _ in range(50)]
await asyncio.gather(*tasks)


@pytest.mark.opt_only
@dfly_args({"proactor_threads": 4, "interpreter_per_thread": 4})
async def test_fill_memory_gc(async_client: aioredis.Redis):
SCRIPT = """
local res = {{}}
for j = 1, 100 do
for i = 1, 10000 do
table.insert(res, tostring(i) .. 'data')
end
end
"""

await asyncio.gather(*(async_client.eval(SCRIPT, 0) for _ in range(5)))

info = await async_client.info("memory")
# if this assert fails, we likely run gc after script invocations, remove this test
assert info["used_memory_lua"] > 50 * 1e6

await async_client.execute_command("SCRIPT GC")
info = await async_client.info("memory")
assert info["used_memory_lua"] < 10 * 1e6

0 comments on commit 82298b8

Please sign in to comment.