Skip to content

Commit

Permalink
Try alternative process name if cluster could not be formed
Browse files Browse the repository at this point in the history
  • Loading branch information
alexnk committed Jul 30, 2024
1 parent 71dc1f1 commit 58b8c9d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 9 deletions.
41 changes: 32 additions & 9 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,48 @@ declare(Q, _Node) when ?amqqueue_is_quorum(Q) ->

start_cluster(Q) ->
QName = amqqueue:get_name(Q),
Durable = amqqueue:is_durable(Q),
AutoDelete = amqqueue:is_auto_delete(Q),
Arguments = amqqueue:get_arguments(Q),
Opts = amqqueue:get_options(Q),
Arguments = amqqueue:get_arguments(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
QuorumSize = get_default_quorum_initial_group_size(Arguments),
{LeaderNode, FollowerNodes} =
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
RaName = case qname_to_internal_name(QName) of
{ok, A} ->
A;
{error, {too_long, N}} ->
rabbit_data_coercion:to_atom(ra:new_uid(N))
end,
{LeaderNode, FollowerNodes} =
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'",
[QuorumSize, rabbit_misc:rs(QName), LeaderNode]),
case internal_declare(Q, RaName, LeaderNode, FollowerNodes) of
{error, cluster_not_formed, Queue} ->
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
AlternativeRaName = rabbit_data_coercion:to_atom(ra:new_uid(rabbit_queue_type_util:name_concat(QName))),
rabbit_log:debug("Could not form cluster ~ts, trying alternative name ~ts",
[RaName, AlternativeRaName]),
case internal_declare(Q, AlternativeRaName, LeaderNode, FollowerNodes) of
{ok, NewQ} ->
{new, NewQ};
{error, cluster_not_formed, NewQ} -> declare_queue_error(cluster_not_formed, NewQ, LeaderNode, ActingUser)
end;
{ok, NewQ} ->
{new, NewQ}
end.

internal_declare(Q, RaName, LeaderNode, FollowerNodes) ->
QName = amqqueue:get_name(Q),
Durable = amqqueue:is_durable(Q),
AutoDelete = amqqueue:is_auto_delete(Q),
Arguments = amqqueue:get_arguments(Q),
Opts = amqqueue:get_options(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),

LeaderId = {RaName, LeaderNode},
NewQ0 = amqqueue:set_pid(Q, LeaderId),
NewQ1 = amqqueue:set_type_state(NewQ0,
#{nodes => [LeaderNode | FollowerNodes]}),

rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'",
[QuorumSize, rabbit_misc:rs(QName), LeaderNode]),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
RaConfs = [make_ra_conf(NewQ, ServerId)
Expand Down Expand Up @@ -275,7 +296,9 @@ start_cluster(Q) ->
{arguments, Arguments},
{user_who_performed_action,
ActingUser}]),
{new, NewQ};
{ok, NewQ};
{error, cluster_not_formed} ->
{error, cluster_not_formed, NewQ};
{error, Error} ->
declare_queue_error(Error, NewQ, LeaderNode, ActingUser)
catch
Expand All @@ -285,7 +308,7 @@ start_cluster(Q) ->
{existing, _} = Ex ->
Ex
end.

declare_queue_error(Error, Queue, Leader, ActingUser) ->
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
{protocol_error, internal_error,
Expand Down
33 changes: 33 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ all_tests() ->
consume_invalid_arg_2,
start_queue,
long_name,
conflicting_name,
stop_queue,
restart_queue,
restart_all_types,
Expand Down Expand Up @@ -564,6 +565,38 @@ long_name(Config) ->
[{<<"x-queue-type">>, longstr, <<"quorum">>}])),
ok.

conflicting_name(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
User = ?config(rmq_username, Config),
VHost1 = <<"foo">>,
VHost2 = <<"foo_bar">>,
QName1 = <<"bar_baz">>,
QName2 = <<"baz">>,

ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost1, User),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost1),

ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost2, User),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost2),

Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
VHost1),
{ok, Ch1} = amqp_connection:open_channel(Conn1),

?assertEqual({'queue.declare_ok', QName1, 0, 0},
declare(Ch1, QName1,
[{<<"x-queue-type">>, longstr, <<"quorum">>}])),

Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
VHost2),
{ok, Ch2} = amqp_connection:open_channel(Conn2),

?assertEqual({'queue.declare_ok', QName2, 0, 0},
declare(Ch2, QName2,
[{<<"x-queue-type">>, longstr, <<"quorum">>}])),

ok.

start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
LQ = ?config(queue_name, Config),
Expand Down

0 comments on commit 58b8c9d

Please sign in to comment.