Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

forget_cluster_node: delete all local classic queues when using Khepri store (backport #12303) #12309

Merged
merged 4 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,7 @@ internal_delete(Queue, ActingUser, Reason) ->
%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running`
%% Does it make any sense once mnesia is not used/removed?
forget_all_durable(Node) ->
rabbit_log:info("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
UpdateFun = fun(Q) ->
forget_node_for_queue(Q)
end,
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ remove_reachable_member(NodeToRemove) ->
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
case Ret of
ok ->
rabbit_amqqueue:forget_all_durable(NodeToRemove),
?LOG_DEBUG(
"Node ~s removed from Khepri cluster \"~s\"",
[NodeToRemove, ?RA_CLUSTER_NAME],
Expand All @@ -559,6 +560,7 @@ remove_down_member(NodeToRemove) ->
Ret = ra:remove_member(ServerRef, ServerId, Timeout),
case Ret of
{ok, _, _} ->
rabbit_amqqueue:forget_all_durable(NodeToRemove),
?LOG_DEBUG(
"Node ~s removed from Khepri cluster \"~s\"",
[NodeToRemove, ?RA_CLUSTER_NAME],
Expand Down
27 changes: 26 additions & 1 deletion deps/rabbit/test/cli_forget_cluster_node_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ groups() ->
forget_cluster_node_with_all_last_streams,
forget_cluster_node_with_quorum_queues_and_streams,
forget_cluster_node_with_one_last_quorum_member_and_streams,
forget_cluster_node_with_one_last_stream_and_quorum_queues
forget_cluster_node_with_one_last_stream_and_quorum_queues,
forget_cluster_node_with_one_classic_queue
]}
].

Expand Down Expand Up @@ -354,6 +355,30 @@ forget_cluster_node_with_one_last_stream_and_quorum_queues(Config) ->
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ1), 30000),
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ2), 30000).

forget_cluster_node_with_one_classic_queue(Config) ->
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

assert_clustered([Rabbit, Hare, Bunny]),

Ch = rabbit_ct_client_helpers:open_channel(Config, Bunny),
CQ1 = <<"classic-queue-1">>,
declare(Ch, CQ1, [{<<"x-queue-type">>, longstr, <<"classic">>}]),

?awaitMatch([_], rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, Rabbit,
["list_queues", "name", "--no-table-headers"]),
30000),

?assertEqual(ok, rabbit_control_helper:command(stop_app, Bunny)),
?assertEqual(ok, forget_cluster_node(Rabbit, Bunny)),

assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Rabbit, Hare]},
[Rabbit, Hare]),
?awaitMatch([], rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, Rabbit,
["list_queues", "name", "--no-table-headers"]),
30000).

forget_cluster_node(Node, Removee) ->
rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)],
[]).
Expand Down
Loading