Skip to content

Commit

Permalink
feat(mria_membership): notify monitoring processes when mria is down
Browse files Browse the repository at this point in the history
The event will be sent in the following cases:
1. A replicant node detects that another (either core or replicant) node is down.
3. A core node detects that a replicant node is down.

Core nodes still track each other only at Mnesia level (by subscribing to Mnesia system events).

An exception to the above is when Mria is down on a local node.
In this case mria_membership will notify its monitoring processes before terminating.
  • Loading branch information
SergeTupchiy committed Jan 12, 2024
1 parent d260ede commit c38cf1a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
22 changes: 19 additions & 3 deletions src/mria_membership.erl
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.

handle_info({'DOWN', _MRef, process, DownPid, _Reason},
handle_info({'DOWN', _MRef, process, DownPid, Reason},
State = #state{monitors = Monitors}) ->
Left = [M || M = {{_, Pid}, _} <- Monitors, Pid =/= DownPid],
case DownPid of
Expand All @@ -433,11 +433,15 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason},
[#member{status = leaving}] ->
ets:delete(?TAB, Node);
[Member] ->
%% Even if the reason is not shutdown, mark the node as stopped,
%% so that mria_lb will ping it soon as a new node, and,
%% as a result replicant node will be added as a member on both nodes.
insert(Member#member{mnesia = stopped});
[] -> ignore
end,
maybe_notify_mria_down(Reason, Node, State),
?tp(mria_membership_proc_down,
#{registered_name => ?MODULE, node => Node});
#{registered_name => ?MODULE, node => Node, reason => Reason});
_ -> ignore
end,
{noreply, State#state{monitors = Left}};
Expand All @@ -446,7 +450,8 @@ handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.

terminate(_Reason, _State) ->
terminate(Reason, State) ->
maybe_notify_mria_down(Reason, node(), State),
?terminate_tp,
ok.

Expand All @@ -457,6 +462,17 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------

maybe_notify_mria_down(shutdown, Node, State) ->
_ = notify({mria, down, Node}, State),
ok;
%% If an occasional crash occurs and mria_membership will be soon restarted,
%% it's probably not worth notifying all the monitors about it.
%% If the reason is noconnection, we expect `{node, down, Node}` to be sent out soon,
%% since mria_node_monitor tracks all connected nodes (replicants are covered),
%% so we can also skip it.
maybe_notify_mria_down(_Reason, _Node, _State) ->
ok.

make_new_local_member() ->
IsMnesiaRunning = case lists:member(node(), mria_mnesia:running_nodes()) of
true -> running;
Expand Down
17 changes: 16 additions & 1 deletion test/mria_membership_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ t_core_member_is_stopped_core_observes(_) ->
try
{[N0, N1] = Cores, Replicants} = start_core_replicant_cluster(Cluster),
assert_membership(Cores, Replicants),
test_member_is_stopped_replicant_observes(mria_membership_mnesia_down, N1, N0, members)
test_member_is_stopped_node_observes(mria_membership_mnesia_down, N1, N0, members)
after
mria_ct:teardown_cluster(Cluster)
end,
Expand Down Expand Up @@ -338,12 +338,18 @@ t_member_node_down(_) ->
try
{Cores, [N2, _N3] = Replicants} = start_core_replicant_cluster(Cluster),
assert_membership(Cores, Replicants),
ok = erpc:call(N, mria_membership, monitor, [membership, self(), true]),
?wait_async_action(
mria_ct:teardown_cluster([NodeSpec]),
#{ ?snk_kind := mria_membership_insert
, member := #member{node = N, status = down}
, ?snk_meta := #{node := N2}
}),
receive
{membership, {mria, down, N}} -> ok
after 5000 ->
ct:fail("expected_membership_event_not_received")
end,
?assertEqual(1, length(erpc:call(N2, mria_membership, running_core_nodelist, [])))
after
mria_ct:teardown_cluster(Cluster1)
Expand Down Expand Up @@ -453,6 +459,15 @@ test_node_leaves( LeaveKind, JoinKind, LeaveNode, ObserveNode, Seed
?assertEqual(ExpectAfterJoin, erpc:call(ObserveNode, mria_membership, AssertF, [])).

test_member_is_stopped_replicant_observes(WaitKind, StopNode, ObserveNode, AssertF) ->
ok = erpc:call(ObserveNode, mria_membership, monitor, [membership, self(), true]),
test_member_is_stopped_node_observes(WaitKind, StopNode, ObserveNode, AssertF),
receive
{membership, {mria, down, StopNode}} -> ok
after 5000 ->
ct:fail("expected_membership_event_not_received")
end.

test_member_is_stopped_node_observes(WaitKind, StopNode, ObserveNode, AssertF) ->
wait_action(WaitKind, StopNode, ObserveNode, mria, stop, []),
%% No leave announce, StopNode must not be deleted from membership table
?assertEqual( [stopped]
Expand Down

0 comments on commit c38cf1a

Please sign in to comment.