Skip to content

Commit

Permalink
Merge pull request #12641 from rabbitmq/rabbitmq-server-12640-for-v4.0.x
Browse files Browse the repository at this point in the history
Backport of #12640 to v4.0.x
  • Loading branch information
michaelklishin authored Nov 4, 2024
2 parents ae05ff2 + 7078a02 commit 215f218
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 22 deletions.
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ overview(#?STATE{consumers = Cons,
Conf = #{name => Cfg#cfg.name,
resource => Cfg#cfg.resource,
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
dead_letter_handler => Cfg#cfg.dead_letter_handler,
overflow_strategy => Cfg#cfg.overflow_strategy,
max_length => Cfg#cfg.max_length,
max_bytes => Cfg#cfg.max_bytes,
consumer_strategy => Cfg#cfg.consumer_strategy,
Expand Down
53 changes: 42 additions & 11 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,8 @@ declare_queue_error(Error, Queue, Leader, ActingUser) ->
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.

ra_machine_config(Q) when ?is_amqqueue(Q) ->
gather_policy_config(Q, IsQueueDeclaration) ->
QName = amqqueue:get_name(Q),
{Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q),
Expand All @@ -327,28 +326,42 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>,
fun resolve_delivery_limit/2, Q) of
undefined ->
rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]),
case IsQueueDeclaration of
true ->
rabbit_log:info(
"~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]);
false ->
ok
end,
?DEFAULT_DELIVERY_LIMIT;
DL ->
DL
end,
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dead_letter_handler(Q, Overflow),
become_leader_handler => {?MODULE, become_leader, [QName]},
DeadLetterHandler = dead_letter_handler(Q, Overflow),
#{dead_letter_handler => DeadLetterHandler,
max_length => MaxLength,
max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit,
overflow_strategy => Overflow,
created => erlang:system_time(millisecond),
expires => Expires,
msg_ttl => MsgTTL
}.

ra_machine_config(Q) when ?is_amqqueue(Q) ->
PolicyConfig = gather_policy_config(Q, true),
QName = amqqueue:get_name(Q),
{Name, _} = amqqueue:get_pid(Q),
PolicyConfig#{
name => Name,
queue_resource => QName,
become_leader_handler => {?MODULE, become_leader, [QName]},
single_active_consumer_on => single_active_consumer_on(Q),
created => erlang:system_time(millisecond)
}.

resolve_delivery_limit(PolVal, ArgVal)
when PolVal < 0 orelse ArgVal < 0 ->
max(PolVal, ArgVal);
Expand Down Expand Up @@ -624,7 +637,9 @@ handle_tick(QName,
ok;
_ ->
ok
end
end,
maybe_apply_policies(Q, Overview),
ok
catch
_:Err ->
rabbit_log:debug("~ts: handle tick failed with ~p",
Expand Down Expand Up @@ -708,6 +723,21 @@ system_recover(quorum_queues) ->
ok
end.

maybe_apply_policies(Q, #{config := CurrentConfig}) ->
NewPolicyConfig = gather_policy_config(Q, false),

RelevantKeys = maps:keys(NewPolicyConfig),
CurrentPolicyConfig = maps:with(RelevantKeys, CurrentConfig),

ShouldUpdate = NewPolicyConfig =/= CurrentPolicyConfig,
case ShouldUpdate of
true ->
rabbit_log:debug("Re-applying policies to ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]),
policy_changed(Q),
ok;
false -> ok
end.

-spec recover(binary(), [amqqueue:amqqueue()]) ->
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
recover(_Vhost, Queues) ->
Expand Down Expand Up @@ -2064,3 +2094,4 @@ file_handle_other_reservation() ->

file_handle_release_reservation() ->
ok.

204 changes: 193 additions & 11 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ groups() ->
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
policy_repair,
gh_12635
]
++ all_tests()},
Expand Down Expand Up @@ -1303,20 +1304,175 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].


% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)
policy_repair(Config) ->
[Server0, _Server1, _Server2] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),

QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
ExpectedMaxLength1 = 10,
Priority1 = 1,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_1">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}],
Priority1,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
QueryFun = fun rabbit_fifo:overview/1,
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength1 + some messages but after consuming all messages only
% MaxLength1 are retrieved.
% Checking twice to ensure consistency
publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1),
% +1 because QQs let one pass
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Set higher priority policy, allowing more messages
ExpectedMaxLength2 = 20,
Priority2 = 2,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_2">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}],
Priority2,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength2 + some messages but after consuming all messages only
% MaxLength2 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Ensure the queue process is unavailable
lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers),

% Add policy with higher priority, allowing even more messages.
ExpectedMaxLength3 = 30,
Priority3 = 3,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_3">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}],
Priority3,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Restart the queue process.
{ok, Queue} =
rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_amqqueue,
lookup,
[{resource, <<"/">>, queue, QQ}]),
lists:foreach(
fun(Srv) ->
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
rabbit_quorum_queue,
recover,
[foo, [Queue]]
)
end,
Servers),

% Wait for the queue to be available again.
lists:foreach(fun(Srv) ->
rabbit_ct_helpers:await_condition(
fun () ->
is_pid(
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
erlang,
whereis,
[RaName]))
end)
end,
Servers),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength3 + some messages but after consuming all messages only
% MaxLength3 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ).


gh_12635(Config) ->
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
[Server0, _Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
[rabbit, quorum_min_checkpoint_interval, 1]),
[rabbit, quorum_min_checkpoint_interval, 1]),

Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

%% stop member to simulate slow or down member
ok = rpc:call(Server2, ra, stop_server, [quorum_queues, {RaName, Server2}]),
Expand All @@ -1327,10 +1483,10 @@ gh_12635(Config) ->
%% force a checkpoint on leader
ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]),
rabbit_ct_helpers:await_condition(
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(latest_checkpoint_index, Log)
end),
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(latest_checkpoint_index, Log)
end),

%% publish 1 more message
publish_confirm(Ch0, QQ),
Expand All @@ -1346,10 +1502,10 @@ gh_12635(Config) ->
#'queue.purge_ok'{} = amqp_channel:call(Ch0, #'queue.purge'{queue = QQ}),

rabbit_ct_helpers:await_condition(
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(snapshot_index, Log)
end),
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(snapshot_index, Log)
end),
%% restart the down member
ok = rpc:call(Server2, ra, restart_server, [quorum_queues, {RaName, Server2}]),
Pid2 = rpc:call(Server2, erlang, whereis, [RaName]),
Expand All @@ -1359,11 +1515,12 @@ gh_12635(Config) ->
{'DOWN',Ref, process,_, _} ->
ct:fail("unexpected DOWN")
after 500 ->
ok
ok
end,
flush(1),
ok.


priority_queue_fifo(Config) ->
%% testing: if hi priority messages are published before lo priority
%% messages they are always consumed first (fifo)
Expand Down Expand Up @@ -4397,3 +4554,28 @@ lists_interleave([Item | Items], List)
{Left, Right} = lists:split(2, List),
Left ++ [Item | lists_interleave(Items, Right)].

publish_confirm_many(Ch, Queue, Count) ->
lists:foreach(fun(_) -> publish_confirm(Ch, Queue) end, lists:seq(1, Count)).

consume_all(Ch, QQ) ->
Consume = fun C(Acc) ->
case amqp_channel:call(Ch, #'basic.get'{queue = QQ}) of
{#'basic.get_ok'{}, Msg} ->
C([Msg | Acc]);
_ ->
Acc
end
end,
Consume([]).

ensure_qq_proc_dead(Config, Server, RaName) ->
case rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [RaName]) of
undefined ->
ok;
Pid ->
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, exit, [Pid, kill]),
%% Give some time for the supervisor to restart the process
timer:sleep(500),
ensure_qq_proc_dead(Config, Server, RaName)
end.

0 comments on commit 215f218

Please sign in to comment.