Skip to content

Commit

Permalink
Merge pull request #4274 from esl/optimize-node-cleanup-for-last
Browse files Browse the repository at this point in the history
Optimize node cleanup for mod_last
  • Loading branch information
chrzaszcz authored May 16, 2024
2 parents e0b6504 + 3ca7a60 commit 3f40892
Show file tree
Hide file tree
Showing 13 changed files with 461 additions and 42 deletions.
38 changes: 37 additions & 1 deletion big_tests/tests/last_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
-include_lib("escalus/include/escalus.hrl").
-include_lib("escalus/include/escalus_xmlns.hrl").
-include_lib("exml/include/exml.hrl").
-include_lib("eunit/include/eunit.hrl").

-import(config_parser_helper, [mod_config_with_auto_backend/2]).

Expand All @@ -37,7 +38,7 @@ groups() ->
valid_test_cases() -> [online_user_query,
last_online_user,
last_offline_user,
last_server].
last_server, sessions_cleanup].

invalid_test_cases() -> [user_not_subscribed_receives_error].

Expand Down Expand Up @@ -169,6 +170,36 @@ user_not_subscribed_receives_error(Config) ->
ok
end).

sessions_cleanup(Config) ->
N = distributed_helper:mim(),
HostType = domain_helper:host_type(),
Server = domain_helper:domain(),
CreateUser = fun(Name) ->
SID = {erlang:system_time(microsecond), spawn(fun() -> ok end)},
JID = mongoose_helper:make_jid(Name, Server, <<"res">>),
Priority = 0,
Info = #{},
distributed_helper:rpc(N, ejabberd_sm, open_session, [HostType, SID, JID, Priority, Info])
end,
Names = [<<"user", (list_to_binary((integer_to_list(X))))/binary>> || X <- lists:seq(1, 345)],
measure("create users", fun() ->
lists:foreach(CreateUser, Names)
end),
%% Check that user3 is properly updated
%% User should be registered if we want to use mod_last_api
{ok, _} = distributed_helper:rpc(N, mongoose_account_api, register_user, [<<"user3">>, Server, <<"secret123">>]),
Jid3 = mongoose_helper:make_jid(<<"user3">>, Server, <<>>),
{ok, _} = distributed_helper:rpc(N, mod_last_api, set_last, [Jid3, 1714000000, <<"old status">>]),
{ok, #{timestamp := 1714000000}} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]),
measure("node cleanup", fun() ->
distributed_helper:rpc(N#{timeout => timer:minutes(1)}, mongoose_hooks, node_cleanup, [node()])
end),
{ok, #{timestamp := TS, status := Status} = Data} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]),
?assertNotEqual(TS, 1714000000, Data),
?assertEqual(Status, <<>>, Data),
distributed_helper:rpc(N, mongoose_metrics, update, [HostType, sessionCount, -345]),
{ok, _} = distributed_helper:rpc(N, mongoose_account_api, unregister_user, [<<"user3">>, Server]).


%%-----------------------------------------------------------------
%% Helpers
Expand All @@ -193,3 +224,8 @@ answer_last_activity(IQ = #xmlel{name = <<"iq">>}) ->

required_modules() ->
[{mod_last, mod_config_with_auto_backend(mod_last, #{iqdisc => one_queue})}].

measure(Text, F) ->
{Time, _} = timer:tc(F),
ct:pal("Time ~ts = ~p", [Text, Time]),
ok.
80 changes: 77 additions & 3 deletions big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ rdbms_queries_cases() ->
test_failed_transaction_with_execute_wrapped,
test_failed_wrapper_transaction,
test_incremental_upsert,
arguments_from_two_tables].
arguments_from_two_tables,
test_upsert_many1,
test_upsert_many2,
test_upsert_many1_replaces_existing,
test_upsert_many2_replaces_existing].

suite() ->
escalus:suite().
Expand Down Expand Up @@ -602,6 +606,63 @@ do_test_incremental_upsert(Config) ->
SelectResult = sql_query(Config, <<"SELECT timestamp FROM inbox">>),
?assertEqual({selected, [{<<"43">>}]}, selected_to_binary(SelectResult)).

test_upsert_many1(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 1, upsert_many_last1, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"kate">>, 0, <<>>],
Update = [0, <<>>],
%% Records keys must be unique (i.e. we cannot insert alice twice)
{updated, 1} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update).

test_upsert_many2(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 2, upsert_many_last2, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"alice">>, 0, <<>>],
Insert2 = [<<"localhost">>, <<"bob">>, 0, <<>>],
Update = [0, <<>>],
%% Records keys must be unique (i.e. we cannot insert alice twice)
{updated, 2} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update).

test_upsert_many1_replaces_existing(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 1, upsert_many_last1, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"kate">>, 0, <<>>],
Update1 = [0, <<>>],
Insert2 = [<<"localhost">>, <<"kate">>, 10, <<>>],
Update2 = [10, <<>>],
%% Replace returns wrong numbers with MySQL (2 instead of 1, 4 instead of 2)
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update1),
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert2, Update2),
SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>),
?assertEqual({selected, [{<<"10">>}]}, selected_to_binary(SelectResult)).

test_upsert_many2_replaces_existing(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 2, upsert_many_last2, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"alice">>, 0, <<>>],
Insert3 = [<<"localhost">>, <<"alice">>, 10, <<>>],
Insert2 = [<<"localhost">>, <<"bob">>, 0, <<>>],
Insert4 = [<<"localhost">>, <<"bob">>, 10, <<>>],
Update1 = [0, <<>>],
Update3 = [10, <<>>],
%% Records keys must be unique (i.e. we cannot insert alice twice)
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update1),
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert3 ++ Insert4, Update3),
SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>),
?assertEqual({selected, [{<<"10">>}, {<<"10">>}]}, selected_to_binary(SelectResult)).

%%--------------------------------------------------------------------
%% Text searching
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -635,6 +696,9 @@ sql_prepare(_Config, Name, Table, Fields, Query) ->
sql_prepare_upsert(_Config, Name, Table, Insert, Update, Unique, Incr) ->
escalus_ejabberd:rpc(rdbms_queries, prepare_upsert, [host_type(), Name, Table, Insert, Update, Unique, Incr]).

sql_prepare_upsert_many(_Config, RecordCount, Name, Table, Insert, Update, Unique) ->
escalus_ejabberd:rpc(rdbms_queries, prepare_upsert_many, [host_type(), RecordCount, Name, Table, Insert, Update, Unique]).

sql_execute(Config, Name, Parameters) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, execute, ScopeAndTag ++ [Name, Parameters]).
Expand Down Expand Up @@ -670,6 +734,10 @@ sql_execute_upsert(Config, Name, Insert, Update, Unique) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(rdbms_queries, execute_upsert, ScopeAndTag ++ [Name, Insert, Update, Unique]).

sql_execute_upsert_many(Config, Name, Insert, Update) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(rdbms_queries, execute_upsert_many, ScopeAndTag ++ [Name, Insert, Update]).

sql_query_request(Config, Query) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, sql_query_request, ScopeAndTag ++ [Query]).
Expand Down Expand Up @@ -725,6 +793,9 @@ decode_boolean(_Config, Value) ->
erase_table(Config) ->
{updated, _} = sql_query(Config, <<"DELETE FROM test_types">>).

erase_last(Config) ->
{updated, _} = sql_query(Config, <<"DELETE FROM last">>).

erase_users(Config) ->
{updated, _} = sql_query(Config, <<"DELETE FROM users">>),
{updated, _} = sql_query(Config, <<"DELETE FROM last">>).
Expand Down Expand Up @@ -765,11 +836,14 @@ integer_to_binary_or_null(X) -> integer_to_binary(X).
%% Helper function to transform values to an uniform format.
%% Single tuple, single element case.
%% In ODBC int32 is integer, but int64 is binary.
selected_to_binary({selected, [{Value}]}) when is_integer(Value) ->
{selected, [{integer_to_binary(Value)}]};
selected_to_binary({selected, Rows}) ->
{selected, [row_to_binary(Row) || Row <- Rows]};
selected_to_binary(Other) ->
Other.

row_to_binary(Row) ->
list_to_tuple([value_to_binary(Value) || Value <- tuple_to_list(Row)]).

selected_to_sorted({selected, Rows}) ->
{selected, lists:sort(Rows)};
selected_to_sorted(Other) ->
Expand Down
39 changes: 36 additions & 3 deletions src/ejabberd_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
is_offline/1,
get_user_present_pids/2,
sync/0,
run_session_cleanup_hook/1,
session_cleanup/1,
sessions_cleanup/1,
terminate_session/2,
sm_backend/0
]).
Expand Down Expand Up @@ -354,8 +355,8 @@ unregister_iq_handler(Host, XMLNS) ->
ejabberd_sm ! {unregister_iq_handler, Host, XMLNS},
ok.

-spec run_session_cleanup_hook(#session{}) -> mongoose_acc:t().
run_session_cleanup_hook(#session{usr = {U, S, R}, sid = SID}) ->
-spec session_cleanup(#session{}) -> mongoose_acc:t().
session_cleanup(#session{usr = {U, S, R}, sid = SID}) ->
{ok, HostType} = mongoose_domain_api:get_domain_host_type(S),
Acc = mongoose_acc:new(
#{location => ?LOCATION,
Expand All @@ -364,6 +365,38 @@ run_session_cleanup_hook(#session{usr = {U, S, R}, sid = SID}) ->
element => undefined}),
mongoose_hooks:session_cleanup(S, Acc, U, R, SID).

-spec sessions_cleanup([#session{}]) -> ok.
sessions_cleanup(Sessions) ->
SerSess = [{Server, Session} || Session = #session{usr = {_, Server, _}} <- Sessions],
Servers = lists:usort([Server || {Server, _Session} <- SerSess]),
Map = maps:from_list([{Server, server_to_host_type(Server)} || Server <- Servers]),
HTSession = [{maps:get(Server, Map), Session} || {Server, Session} <- SerSess],
HT2Session = group_sessions(lists:sort(HTSession)),
[mongoose_hooks:sessions_cleanup(HostType, HTSessions)
|| {HostType, HTSessions} <- HT2Session, HostType =/= undefined],
ok.

%% Group sessions by HostType.
%% Sessions should be sorted.
group_sessions([{HostType, Session} | Sessions]) ->
{Acc, Sessions2} = group_sessions(HostType, [Session], Sessions),
[{HostType, Acc} | group_sessions(Sessions2)];
group_sessions([]) ->
[].

group_sessions(HostType, Acc, [{HostType, Session} | Sessions]) ->
group_sessions(HostType, [Session | Acc], Sessions);
group_sessions(_HostType, Acc, Sessions) ->
{lists:reverse(Acc), Sessions}.

server_to_host_type(Server) ->
case mongoose_domain_api:get_domain_host_type(Server) of
{ok, HostType} ->
HostType;
_ ->
undefined
end.

-spec terminate_session(jid:jid() | pid(), binary()) -> ok | no_session.
terminate_session(#jid{} = Jid, Reason) ->
case get_session_pid(Jid) of
Expand Down
9 changes: 5 additions & 4 deletions src/ejabberd_sm_cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ cleanup(Node) ->
%% This is a full table scan, but cleanup is rare.
Tuples = ets:select(?TABLE, [{R, [Guard], ['$_']}]),
cets:delete_many(?TABLE, [Key || {Key, _, _} <- Tuples]),
lists:foreach(fun(Tuple) ->
Session = tuple_to_session(Tuple),
ejabberd_sm:run_session_cleanup_hook(Session)
end, Tuples).
Sessions = tuples_to_sessions(Tuples),
ejabberd_sm:sessions_cleanup(Sessions),
lists:foreach(fun(Session) ->
ejabberd_sm:session_cleanup(Session)
end, Sessions).

-spec total_count() -> integer().
total_count() ->
Expand Down
3 changes: 2 additions & 1 deletion src/ejabberd_sm_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ cleanup(Node) ->
[{#session{sid = {'_', '$1'}, _ = '_'},
[{'==', {node, '$1'}, Node}],
['$_']}]),
ejabberd_sm:sessions_cleanup(Es),
lists:foreach(fun(#session{sid = SID} = Session) ->
mnesia:delete({session, SID}),
ejabberd_sm:run_session_cleanup_hook(Session)
ejabberd_sm:session_cleanup(Session)
end, Es)
end,
mnesia:async_dirty(F).
Expand Down
21 changes: 14 additions & 7 deletions src/ejabberd_sm_redis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,19 @@ get_sessions(User, Server, Resource) ->
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
set_session(User, Server, Resource, Session) ->
OldSessions = get_sessions(User, Server, Resource),
Node = sid_to_node(Session#session.sid),
case lists:keysearch(Session#session.sid, #session.sid, OldSessions) of
{value, OldSession} ->
BOldSession = term_to_binary(OldSession),
BSession = term_to_binary(Session),
mongoose_redis:cmds([["SADD", n(node()), hash(User, Server, Resource, Session#session.sid)],
mongoose_redis:cmds([["SADD", n(Node), hash(User, Server, Resource, Session#session.sid)],
["SREM", hash(User, Server), BOldSession],
["SREM", hash(User, Server, Resource), BOldSession],
["SADD", hash(User, Server), BSession],
["SADD", hash(User, Server, Resource), BSession]]);
false ->
BSession = term_to_binary(Session),
mongoose_redis:cmds([["SADD", n(node()), hash(User, Server, Resource, Session#session.sid)],
mongoose_redis:cmds([["SADD", n(Node), hash(User, Server, Resource, Session#session.sid)],
["SADD", hash(User, Server), BSession],
["SADD", hash(User, Server, Resource), BSession]])
end.
Expand All @@ -103,7 +104,7 @@ delete_session(SID, User, Server, Resource) ->
BSession = term_to_binary(Session),
mongoose_redis:cmds([["SREM", hash(User, Server), BSession],
["SREM", hash(User, Server, Resource), BSession],
["SREM", n(node()), hash(User, Server, Resource, SID)]]);
["SREM", n(sid_to_node(SID)), hash(User, Server, Resource, SID)]]);
false ->
ok
end.
Expand All @@ -116,7 +117,7 @@ cleanup(Node) ->
maybe_initial_cleanup(Node, Initial) ->
Hashes = mongoose_redis:cmd(["SMEMBERS", n(Node)]),
mongoose_redis:cmd(["DEL", n(Node)]),
lists:foreach(fun(H) ->
Sessions = lists:map(fun(H) ->
[_, U, S, R | SIDEncoded] = re:split(H, ":"),
%% Add possible removed ":" from encoded SID
SID = binary_to_term(mongoose_bin:join(SIDEncoded, <<":">>)),
Expand All @@ -125,10 +126,12 @@ maybe_initial_cleanup(Node, Initial) ->
true ->
ok;
false ->
ejabberd_sm:run_session_cleanup_hook(#session{usr = {U, S, R},
sid = SID})
Session = #session{usr = {U, S, R}, sid = SID},
ejabberd_sm:session_cleanup(Session),
Session
end
end, Hashes).
end, Hashes),
ejabberd_sm:sessions_cleanup(Sessions).

-spec total_count() -> integer().
total_count() ->
Expand Down Expand Up @@ -160,3 +163,7 @@ hash(Val1, Val2, Val3, Val4) ->
-spec n(atom()) -> iolist().
n(Node) ->
["n:", atom_to_list(Node)].

sid_to_node(SID) ->
{_, Pid} = SID,
node(Pid).
6 changes: 6 additions & 0 deletions src/hooks/mongoose_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
remove_user/3,
resend_offline_messages_hook/2,
session_cleanup/5,
sessions_cleanup/2,
set_vcard/3,
unacknowledged_message/2,
filter_unacknowledged_messages/3,
Expand Down Expand Up @@ -369,6 +370,11 @@ session_cleanup(Server, Acc, User, Resource, SID) ->
HostType = mongoose_acc:host_type(Acc),
run_hook_for_host_type(session_cleanup, HostType, Acc, Params).

-spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> map().
sessions_cleanup(HostType, Sessions) ->
Params = #{sessions => Sessions},
run_hook_for_host_type(sessions_cleanup, HostType, #{host_type => HostType}, Params).

%%% @doc The `set_vcard' hook is called when the caller wants to set the VCard.
-spec set_vcard(HostType, UserJID, VCard) -> Result when
HostType :: mongooseim:host_type(),
Expand Down
Loading

0 comments on commit 3f40892

Please sign in to comment.