Skip to content

Commit

Permalink
feat(mria_lb): add incompatibility reasons to custom compatibility check
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeTupchiy committed Jun 21, 2024
1 parent 2516d1a commit 0b48282
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 47 deletions.
107 changes: 62 additions & 45 deletions src/mria_lb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ do_update(State = #s{core_nodes = OldCoreNodes, node_info = OldNodeInfo}) ->
maybe_report_netsplit(OldCoreNodes, Clusters),
{IsChanged, NewCoreNodes} = find_best_cluster(OldCoreNodes, Clusters),
%% Update shards:
ShardBadness = shard_badness(maps:with(NewCoreNodes, NodeInfo)),
{ShardBadness, IncompatNodesInfo} = shard_badness(maps:with(NewCoreNodes, NodeInfo)),
_ = maybe_report_incompatibility(ShardBadness, IncompatNodesInfo),
maps:map(fun(Shard, {Node, _Badness}) ->
mria_status:notify_core_node_up(Shard, Node)
end,
Expand All @@ -190,6 +191,13 @@ do_update(State = #s{core_nodes = OldCoreNodes, node_info = OldNodeInfo}) ->
ping_new_nodes(NewCoreNodes, DiscoveredReplicants),
State#s{core_nodes = NewCoreNodes, node_info = NodeInfo}.

maybe_report_incompatibility(ShardBadness, IncompatInfo) when ShardBadness =:= #{},
IncompatInfo =/= #{} ->
?tp(warning, "No core node in the cluster is compatible with this replicant node",
#{nodes_info => IncompatInfo});
maybe_report_incompatibility(_ShardBadness, _Errs) ->
ok.

%% Find fully connected clusters (i.e. cliques of nodes)
-spec find_clusters(#{node() => node_info()}) -> [[node()]].
find_clusters(NodeInfo) ->
Expand All @@ -199,57 +207,66 @@ find_clusters(NodeInfo) ->
NodeInfo)).

%% Find the preferred core node for each shard:
-spec shard_badness(#{node() => node_info()}) -> #{mria_rlog:shard() => {node(), Badness}}
when Badness :: float().
-spec shard_badness(#{node() => node_info()}) -> {#{mria_rlog:shard() => {node(), Badness}}, #{node() => Reason}}
when Badness :: float(), Reason :: term().
shard_badness(NodeInfo) ->
maps:fold(
fun(Node, LbInfo = #{shard_badness := Shards}, Acc) ->
case verify_node_compatibility(LbInfo) of
true ->
lists:foldl(
fun({Shard, Badness}, Acc1) ->
maps:update_with(Shard,
fun({_OldNode, OldBadness}) when OldBadness > Badness ->
{Node, Badness};
(Old) ->
Old
end,
{Node, Badness},
Acc1)
end,
Acc,
Shards);
false ->
Acc
end
fun(Node, LbInfo = #{shard_badness := Shards}, {Acc, IncompatAcc}) ->
case verify_node_compatibility(LbInfo) of
true ->
Acc1 = lists:foldl(
fun({Shard, Badness}, Acc1) ->
maps:update_with(Shard,
fun({_OldNode, OldBadness}) when OldBadness > Badness ->
{Node, Badness};
(Old) ->
Old
end,
{Node, Badness},
Acc1)
end,
Acc,
Shards),
{Acc1, IncompatAcc};
{false, Reason} ->
{Acc, IncompatAcc#{Node => Reason}}
end
end,
#{},
{#{}, #{}},
NodeInfo).

verify_node_compatibility(LbInfo = #{protocol_version := ProtoVsn}) ->
case mria_config:callback(lb_custom_info_check) of
{ok, CustomCheckFun} ->
ok;
undefined ->
CustomCheckFun = fun(_) -> true end
end,
CustomInfo = maps:get(custom_info, LbInfo, undefined),
MyProtoVersion = mria_rlog:get_protocol_version(),
%% Actual check:
IsCustomCompat = try
Result = CustomCheckFun(CustomInfo),
is_boolean(Result) orelse
error({non_boolean_result, Result}),
Result
catch
%% TODO: this can get spammy:
EC:Err:Stack ->
?tp(error, mria_failed_to_check_upstream_compatibility,
#{lb_info => LbInfo, EC => Err, stacktrace => Stack}),
false
case ProtoVsn =:= MyProtoVersion of
true ->
verify_custom_compatibility(LbInfo);
false ->
{false, "Mria protocol version doesn't match"}
end.

verify_custom_compatibility(LbInfo) ->
CustomCheckFun = case mria_config:callback(lb_custom_info_check) of
{ok, Fun} -> Fun;
undefined -> fun(_) -> true end
end,
ProtoVsn =:= MyProtoVersion andalso
IsCustomCompat.
CustomInfo = maps:get(custom_info, LbInfo, undefined),
try
case CustomCheckFun(CustomInfo) of
true -> true;
%% backward-compatibility for CustomCheckFun that doesn't return
%% {false, Reason}
false -> {false, undefined};
{false, Reason} -> {false, Reason};
Other ->
error({non_boolean_result, Other})
end
catch
%% TODO: this can get spammy:
EC:Err:Stack ->
?tp(error, mria_failed_to_check_upstream_compatibility,
#{lb_info => LbInfo, EC => Err, stacktrace => Stack}),
{false, undefined}
end.

start_timer(LastUpdateTime) ->
%% Leave at least 100 ms between updates to leave some time to
Expand Down Expand Up @@ -482,7 +499,7 @@ find_clusters_test_() ->

shard_badness_test_() ->
Vsn = mria_rlog:get_protocol_version(),
[ ?_assertMatch( #{foo := {n1, 1}, bar := {n2, 2}}
[ ?_assertMatch( {#{foo := {n1, 1}, bar := {n2, 2}}, _}
, shard_badness(#{ n1 => #{shard_badness => [{foo, 1}], protocol_version => Vsn}
, n2 => #{shard_badness => [{foo, 2}, {bar, 2}], protocol_version => Vsn}
})
Expand Down
13 changes: 11 additions & 2 deletions test/mria_lb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,21 @@ t_node_leave_disable_discovery(_Config) ->
end, []).

t_custom_compat_check(_Config) ->
Env = [ {mria, {callback, lb_custom_info_check}, fun(Val) -> Val =:= chosen_one end}
custom_compat_check_test(fun(Val) -> Val =:= chosen_one end, chosen_one).

t_custom_compat_check_with_reason(_Config) ->
Fun = fun(Val) ->
Val =:= chosen_one orelse {false, "not a chosen one"}
end,
custom_compat_check_test(Fun, chosen_one).

custom_compat_check_test(CheckFun, CheckFunArg) ->
Env = [ {mria, {callback, lb_custom_info_check}, CheckFun}
| mria_mnesia_test_util:common_env()],
Cluster = mria_ct:cluster([ core
, core
, {core, [{mria, {callback, lb_custom_info},
fun() -> chosen_one end}]}
fun() -> CheckFunArg end}]}
, replicant
], Env),
?check_trace(
Expand Down

0 comments on commit 0b48282

Please sign in to comment.