From 1be524a5af72b5a63f74ff9c0663116026f75b7d Mon Sep 17 00:00:00 2001 From: alexnk Date: Mon, 29 Jul 2024 09:53:17 +0300 Subject: [PATCH] Try alternative process name if cluster could not be formed --- deps/rabbit/src/rabbit_quorum_queue.erl | 41 +++++++++++++++++++------ deps/rabbit/test/quorum_queue_SUITE.erl | 33 ++++++++++++++++++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index a6020b0e02b5..af86f63e8bba 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -230,27 +230,48 @@ declare(Q, _Node) when ?amqqueue_is_quorum(Q) -> start_cluster(Q) -> QName = amqqueue:get_name(Q), - Durable = amqqueue:is_durable(Q), - AutoDelete = amqqueue:is_auto_delete(Q), - Arguments = amqqueue:get_arguments(Q), Opts = amqqueue:get_options(Q), + Arguments = amqqueue:get_arguments(Q), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), QuorumSize = get_default_quorum_initial_group_size(Arguments), + {LeaderNode, FollowerNodes} = + rabbit_queue_location:select_leader_and_followers(Q, QuorumSize), RaName = case qname_to_internal_name(QName) of {ok, A} -> A; {error, {too_long, N}} -> rabbit_data_coercion:to_atom(ra:new_uid(N)) end, - {LeaderNode, FollowerNodes} = - rabbit_queue_location:select_leader_and_followers(Q, QuorumSize), + rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'", + [QuorumSize, rabbit_misc:rs(QName), LeaderNode]), + case internal_declare(Q, RaName, LeaderNode, FollowerNodes) of + {error, cluster_not_formed, Queue} -> + _ = rabbit_amqqueue:internal_delete(Queue, ActingUser), + AlternativeRaName = rabbit_data_coercion:to_atom(ra:new_uid(rabbit_queue_type_util:name_concat(QName))), + rabbit_log:debug("Could not form cluster ~ts, trying alternative name ~ts", + [RaName, AlternativeRaName]), + case internal_declare(Q, AlternativeRaName, LeaderNode, FollowerNodes) of + {ok, NewQ} -> + {new, NewQ}; + {error, cluster_not_formed, NewQ} -> declare_queue_error(cluster_not_formed, NewQ, LeaderNode, ActingUser) + end; + {ok, NewQ} -> + {new, NewQ} + end. + +internal_declare(Q, RaName, LeaderNode, FollowerNodes) -> + QName = amqqueue:get_name(Q), + Durable = amqqueue:is_durable(Q), + AutoDelete = amqqueue:is_auto_delete(Q), + Arguments = amqqueue:get_arguments(Q), + Opts = amqqueue:get_options(Q), + ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), + LeaderId = {RaName, LeaderNode}, NewQ0 = amqqueue:set_pid(Q, LeaderId), NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => [LeaderNode | FollowerNodes]}), - rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'", - [QuorumSize, rabbit_misc:rs(QName), LeaderNode]), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> RaConfs = [make_ra_conf(NewQ, ServerId) @@ -281,7 +302,9 @@ start_cluster(Q) -> {arguments, Arguments}, {user_who_performed_action, ActingUser}]), - {new, NewQ}; + {ok, NewQ}; + {error, cluster_not_formed} -> + {error, cluster_not_formed, NewQ}; {error, Error} -> declare_queue_error(Error, NewQ, LeaderNode, ActingUser) catch @@ -291,7 +314,7 @@ start_cluster(Q) -> {existing, _} = Ex -> Ex end. - + declare_queue_error(Error, Queue, Leader, ActingUser) -> _ = rabbit_amqqueue:internal_delete(Queue, ActingUser), {protocol_error, internal_error, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 15b75fac4a69..375869c7a047 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -123,6 +123,7 @@ all_tests() -> consume_invalid_arg_2, start_queue, long_name, + conflicting_name, stop_queue, restart_queue, restart_all_types, @@ -564,6 +565,38 @@ long_name(Config) -> [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ok. +conflicting_name(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + User = ?config(rmq_username, Config), + VHost1 = <<"foo">>, + VHost2 = <<"foo_bar">>, + QName1 = <<"bar_baz">>, + QName2 = <<"baz">>, + + ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost1, User), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost1), + + ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost2, User), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost2), + + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, + VHost1), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + + ?assertEqual({'queue.declare_ok', QName1, 0, 0}, + declare(Ch1, QName1, + [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, + VHost2), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + ?assertEqual({'queue.declare_ok', QName2, 0, 0}, + declare(Ch2, QName2, + [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok. + start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), LQ = ?config(queue_name, Config),