Skip to content

Commit

Permalink
fix: check that Mnesia supports match_delete/2 before attempting to c…
Browse files Browse the repository at this point in the history
…all it

The check is done on a local node, assuming that all nodes in the cluster run the same OTP/Mnesia.
  • Loading branch information
SergeTupchiy committed Dec 28, 2023
1 parent 32c4d70 commit 8f063b0
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 3 deletions.
49 changes: 47 additions & 2 deletions src/mria.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
, sync_dirty/2
, clear_table/1
, match_delete/2
, match_delete/3

, dirty_write/2
, dirty_write/1
Expand All @@ -76,6 +77,10 @@
, wait_for_tables/1
]).

%% Internal export
-export([ match_delete_fallback_fun/2
]).

-type info_key() :: members | running_nodes | stopped_nodes | partitions | rlog.

-type infos() :: #{members := list(member()),
Expand Down Expand Up @@ -429,9 +434,49 @@ sync_dirty(Shard, Fun) ->
clear_table(Table) ->
call_backend_rw_dirty(?FUNCTION_NAME, Table, []).

-spec match_delete(mria:table(), ets:match_pattern()) -> t_result(ok).
-spec match_delete(Table, Pattern) -> t_result(Res) | Res | Error when
Table :: mria:table(),
Pattern :: ets:match_pattern(),
Res :: ok,
Error :: {error, unsupported_otp_version}.
match_delete(Table, Pattern) ->
call_backend_rw_dirty(?FUNCTION_NAME, Table, [Pattern]).
match_delete(Table, Pattern, error).

-spec match_delete(Table, Pattern, Fallback) -> Res | Error | no_return() when
Table :: mria:table(),
Pattern :: ets:match_pattern(),
Fallback :: error | async_dirty | sync_dirty | transaction,
Res :: t_result(ok),
Error :: {error, unsupported_otp_version}.
match_delete(Table, Pattern, Fallback) ->
%% Assuming that all nodes run the same OTP/Mnesia release.
%% Rolling updates are already handled gracefully,
%% due to the fact that mnesia_tm on remote nodes can process
%% match_delete op even if mnesia:match_delete/2 is not implemented.
case erlang:function_exported(mnesia, match_delete, 2) of
true ->
call_backend_rw_dirty(?FUNCTION_NAME, Table, [Pattern]);
false ->
case Fallback of
error ->
{error, unsupported_otp_version};
transaction ->
Shard = mria_config:shard_rlookup(Table),
transaction(Shard, fun ?MODULE:match_delete_fallback_fun/2, [Table, Pattern]);
Dirty when Dirty =:= async_dirty;
Dirty =:= sync_dirty ->
Shard = mria_config:shard_rlookup(Table),
call_backend_rw(Shard, mnesia, Dirty,
[fun ?MODULE:match_delete_fallback_fun/2, [Table, Pattern]])
end
end.

%% Fallback function if mnesia:match_delete/2 is not implemented
match_delete_fallback_fun(Tab, Pattern) ->
lists:foreach(
fun(Obj) -> mnesia:delete_object(Tab, Obj, write) end,
mnesia:match_object(Tab, Pattern, write)
).

-spec dirty_write(tuple()) -> ok.
dirty_write(Record) ->
Expand Down
5 changes: 5 additions & 0 deletions src/mria_replica_importer_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ import_op_dirty(Op, Acc) ->
mnesia:clear_table(Tab),
Acc;
{clear_table, Tab, Pattern} ->
%% If this op is received, we assume that this node also has
%% `mnesia:match_delete/2.
%% As mria protocol has been bumped, during rolling updates
%% new replicants must connect only to new cores,
%% so that both should have this new function.
mnesia:match_delete(Tab, Pattern),
Acc
end.
Expand Down
70 changes: 69 additions & 1 deletion test/mria_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,74 @@ t_rlog_match_delete(_) ->
end,
common_checks()).

t_rlog_match_delete_unsupported(_) ->
Cluster = mria_ct:cluster([core, replicant], mria_mnesia_test_util:common_env()),
?check_trace(
#{timetrap => 30000},
try
[N1, N2] = mria_ct:start_cluster(mria, Cluster),
ok = mock_mnesia_match_delete(N1),
ok = mock_mnesia_match_delete(N2),
Pat = {dummy_pattern, '_', '_'},
Exp = {error, unsupported_otp_version},
?assertEqual(Exp, rpc:call(N1, mria, match_delete, [dummy_tab, Pat])),
?assertEqual(Exp, rpc:call(N2, mria, match_delete, [dummy_tab, Pat])),
unmock_mnesia_match_delete(N1),
unmock_mnesia_match_delete(N2)
after
mria_ct:teardown_cluster(Cluster)
end,
common_checks()).

t_rlog_match_delete_fallback(_) ->
Cluster = mria_ct:cluster([core, replicant], mria_mnesia_test_util:common_env()),
?check_trace(
#{timetrap => 30000},
try
[N1, N2] = Nodes = mria_ct:start_cluster(mria, Cluster),
ok = mock_mnesia_match_delete(N1),
ok = mock_mnesia_match_delete(N2),

mria_mnesia_test_util:wait_tables(Nodes),
rpc:call(N1, mria_transaction_gen, create_data, []),
mria_mnesia_test_util:stabilize(1000),
mria_mnesia_test_util:compare_table_contents(test_tab, Nodes),
{atomic, Recs} = rpc:call(N1, mria_transaction_gen, ro_read_all_keys, []),

Pattern = {test_tab, {<<"match_delete">>, '_'}, '_'},
do_match_delete_test(N1, Nodes, Recs, Pattern, transaction, {atomic, ok}),
do_match_delete_test(N1, Nodes, Recs, Pattern, async_dirty, ok),
do_match_delete_test(N1, Nodes, Recs, Pattern, sync_dirty, ok),

Pattern1 = {test_tab, '_', <<"match_delete">>},
do_match_delete_test(N2, Nodes, Recs, Pattern1, transaction, {atomic, ok}),
do_match_delete_test(N2, Nodes, Recs, Pattern1, async_dirty, ok),
do_match_delete_test(N2, Nodes, Recs, Pattern1, sync_dirty, ok),

unmock_mnesia_match_delete(N1),
unmock_mnesia_match_delete(N2)
after
mria_ct:teardown_cluster(Cluster)
end,
common_checks()).

mock_mnesia_match_delete(Node) ->
?assert(rpc:call(Node, erlang, function_exported, [mnesia, match_delete, 2])),
ok = rpc:call(Node, meck, new, [mnesia, [no_link, no_history, unstick, passthrough]]),
ok = rpc:call(Node, meck, delete, [mnesia, match_delete, 2, true]),
?assertNot(rpc:call(Node, erlang, function_exported, [mnesia, match_delete, 2])),
ok.

unmock_mnesia_match_delete(Node) ->
ok = rpc:call(Node, meck, unload, [mnesia]),
_ = rpc:call(Node, mnesia, module_info, []),
?assert(rpc:call(Node, erlang, function_exported, [mnesia, match_delete, 2])),
ok.

do_match_delete_test(Node, Nodes, Recs, Pattern) ->
do_match_delete_test(Node, Nodes, Recs, Pattern, error, {atomic, ok}).

do_match_delete_test(Node, Nodes, Recs, Pattern, Fallback, Expected) ->
WriteFun = fun() ->
lists:foreach(
fun(Seq) ->
Expand All @@ -579,7 +646,8 @@ do_match_delete_test(Node, Nodes, Recs, Pattern) ->
mria_mnesia_test_util:stabilize(1000),
{atomic, Recs1} = rpc:call(Node, mria_transaction_gen, ro_read_all_keys, []),
?assertNotEqual(lists:sort(Recs), lists:sort(Recs1)),
?assertMatch({atomic, ok}, rpc:call(Node, mria, match_delete, [test_tab, Pattern])),
?assertMatch(Expected,
rpc:call(Node, mria, match_delete, [test_tab, Pattern, Fallback])),
mria_mnesia_test_util:stabilize(1000),
mria_mnesia_test_util:compare_table_contents(test_tab, Nodes),
{atomic, Recs2} = rpc:call(Node, mria_transaction_gen, ro_read_all_keys, []),
Expand Down

0 comments on commit 8f063b0

Please sign in to comment.