Skip to content

Commit

Permalink
Fix link flow control in classic queues
Browse files Browse the repository at this point in the history
This commit fixes
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
followed by
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2

Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

Also note that quorum queues will use serial number arithmetic for
delivery-count in credit API v2.

Furthermore, the double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This simplifies a lot.

This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

1. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

2. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simulatenously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
  • Loading branch information
ansd committed Dec 3, 2023
1 parent df66a7d commit deaf78f
Show file tree
Hide file tree
Showing 15 changed files with 675 additions and 384 deletions.
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ bazel_dep(

bazel_dep(
name = "rabbitmq_osiris",
version = "1.7.0-rc.1",
version = "1.7.0",
repo_name = "osiris",
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
PLT_APPS += mnesia

dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.7.0-rc.1
dep_osiris = git https://github.com/rabbitmq/osiris v1.7.0
dep_systemd = hex 0.6.1

dep_seshat = git https://github.com/rabbitmq/seshat v0.6.1
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def all_beam_files(name = "all_beam_files"):
app_name = "rabbit",
dest = "ebin",
erlc_opts = "//:erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app"],
)
erlang_bytecode(
name = "other_beam",
Expand Down Expand Up @@ -286,7 +286,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
app_name = "rabbit",
dest = "test",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app"],
)
erlang_bytecode(
name = "test_other_beam",
Expand Down
115 changes: 57 additions & 58 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1337,11 +1337,10 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;

handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
Mode, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
ModeOrPrefetch, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
_From, State = #q{consumers = Consumers,
active_consumer = Holder,
single_active_consumer_on = SingleActiveConsumerOn}) ->
{PrefetchCount, _} = ParsedCreditMode = rabbit_queue_consumers:parse_credit_mode(Mode, Args),
ConsumerRegistration = case SingleActiveConsumerOn of
true ->
case ExclusiveConsume of
Expand All @@ -1350,30 +1349,27 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
false ->
Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
ParsedCreditMode, Args,
ActingUser, Consumers),

case Holder of
none ->
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
active_consumer = NewConsumer}};
_ ->
{state, State#q{consumers = Consumers1,
has_had_consumers = true}}
end
LimiterPid, LimiterActive, ModeOrPrefetch,
Args, ActingUser, Consumers),
case Holder of
none ->
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
active_consumer = NewConsumer}};
_ ->
{state, State#q{consumers = Consumers1,
has_had_consumers = true}}
end
end;
false ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use -> {error, reply({error, exclusive_consume_unavailable}, State)};
ok ->
Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
ParsedCreditMode, Args,
ActingUser, Consumers),
LimiterPid, LimiterActive, ModeOrPrefetch,
Args, ActingUser, Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
Expand All @@ -1400,7 +1396,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
{false, _} ->
{true, up}
end,
rabbit_core_metrics:consumer_created(
PrefetchCount = rabbit_queue_consumers:parse_prefetch_count(ModeOrPrefetch),
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
PrefetchCount, ConsumerIsActive, ActivityStatus, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
Expand Down Expand Up @@ -1631,52 +1628,54 @@ handle_cast(update_mirroring, State = #q{q = Q,
State1 = State#q{mirroring_policy_version = NewVersion},
noreply(update_mirroring(Policy, State1))
end;
handle_cast({credit, ChPid, CTag, Credit, Drain}, State) ->
%% Feature flag credit_api_v2 is disabled,
%% i.e. this function clause should be deleted once that feature flag becomes required.
handle_cast({credit, ChPid, CTag, Credit, Drain, true, #{}}, State);

handle_cast({credit, ChPid, CTag, Credit, Drain, Reply, LinkStateProperties},
#q{consumers = Consumers0,
q = Q,
handle_cast({credit, SessionPid, CTag, Credit, Drain},
#q{q = Q,
backing_queue = BQ,
backing_queue_state = BQS0} = State0) ->
backing_queue_state = BQS0} = State) ->
%% Credit API v1.
%% Delete this function clause when feature flag credit_api_v2 becomes required.
%% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries.
rabbit_classic_queue:send_credit_reply_credit_api_v1(
SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)),
handle_cast({credit, SessionPid, CTag, credit_api_v1, Credit, Drain, false}, State);
handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain, Echo},
#q{consumers = Consumers0,
q = Q} = State0) ->
QName = amqqueue:get_name(Q),
Vsn = rabbit_classic_queue:credit_api_vsn(),
case Vsn of
v1 ->
%% Behave like non-native AMQP 1.0:
%% Send send_credit_reply before deliveries.
rabbit_classic_queue:send_credit_reply(
ChPid, QName, CTag, Credit, BQ:len(BQS0),
false, LinkStateProperties);
v2 ->
%% Send credit_reply after deliveries.
ok
end,

State = #q{backing_queue_state = PostBQS,
consumers = Consumers2}
= case rabbit_queue_consumers:set_credit(Credit, ChPid, CTag, Consumers0) of
unchanged ->
State0;
{unblocked, Consumers1} ->
State1 = State0#q{consumers = Consumers1},
run_message_queue(true, State1)
end,

case rabbit_queue_consumers:get_credit(ChPid, CTag) of
PostCred
when is_integer(PostCred) andalso Drain andalso PostCred > 0 ->
unchanged = rabbit_queue_consumers:set_credit(0, ChPid, CTag, Consumers2),
backing_queue = BQ} = case rabbit_queue_consumers:process_credit(
DeliveryCountRcv, Credit, SessionPid, CTag, Consumers0) of
unchanged ->
State0;
{unblocked, Consumers1} ->
State1 = State0#q{consumers = Consumers1},
run_message_queue(true, State1)
end,
case rabbit_queue_consumers:get_link_state(SessionPid, CTag) of
{credit_api_v1, PostCred}
when Drain andalso
is_integer(PostCred) andalso PostCred > 0 ->
%% credit API v1
rabbit_queue_consumers:drained(credit_api_v1, SessionPid, CTag),
rabbit_classic_queue:send_drained_credit_api_v1(SessionPid, QName, CTag, PostCred);
{PostDeliveryCountSnd, PostCred}
when is_integer(PostDeliveryCountSnd) andalso
Drain andalso
is_integer(PostCred) andalso PostCred > 0 ->
%% credit API v2
AdvancedDeliveryCount = serial_number:add(PostDeliveryCountSnd, PostCred),
rabbit_queue_consumers:drained(AdvancedDeliveryCount, SessionPid, CTag),
Avail = BQ:len(PostBQS),
rabbit_classic_queue:send_credit_reply(
ChPid, QName, CTag, PostCred, Avail, Drain, LinkStateProperties);
PostCred
when is_integer(PostCred) andalso Vsn =:= v2 andalso Reply ->
SessionPid, QName, CTag, AdvancedDeliveryCount, 0, Avail, Drain);
{PostDeliveryCountSnd, PostCred}
when is_integer(PostDeliveryCountSnd) andalso
Echo ->
%% credit API v2
Avail = BQ:len(PostBQS),
rabbit_classic_queue:send_credit_reply(
ChPid, QName, CTag, PostCred, Avail, Drain, LinkStateProperties);
SessionPid, QName, CTag, PostDeliveryCountSnd, PostCred, Avail, Drain);
_ ->
ok
end,
Expand Down
81 changes: 39 additions & 42 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
handle_event/3,
deliver/3,
settle/5,
credit_v1/5,
credit/7,
dequeue/5,
info/2,
Expand All @@ -58,7 +59,8 @@
-export([confirm_to_sender/3,
send_rejection/3,
deliver_to_consumer/5,
credit_api_vsn/0,
send_credit_reply_credit_api_v1/3,
send_drained_credit_api_v1/4,
send_credit_reply/7]).

-spec is_enabled() -> boolean().
Expand Down Expand Up @@ -237,20 +239,17 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
channel_pid := ChPid,
limiter_pid := LimiterPid,
limiter_active := LimiterActive,
mode := Mode0,
mode := Mode,
consumer_tag := ConsumerTag,
exclusive_consume := ExclusiveConsume,
args := Args0,
ok_msg := OkMsg,
acting_user := ActingUser} = Spec,
{Mode, Args} = case credit_api_vsn() of
v2 -> {Mode0, Args0};
v1 -> consumer_spec_v2_to_v1(Mode0, Args0)
end,
{ModeOrPrefetch, Args} = consume_backwards_compat(Mode, Args0),
case delegate:invoke(QPid,
{gen_server2, call,
[{basic_consume, NoAck, ChPid, LimiterPid,
LimiterActive, Mode, ConsumerTag,
LimiterActive, ModeOrPrefetch, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser},
infinity]}) of
ok ->
Expand All @@ -261,9 +260,18 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
Err
end.

consumer_spec_v2_to_v1({simple_prefetch, PrefetchCount}, Args) ->
{PrefetchCount, Args};
consumer_spec_v2_to_v1(credited, Args) ->
%% Delete this function when feature flag credit_api_v2 becomes required.
consume_backwards_compat({simple_prefetch, PrefetchCount} = Mode, Args) ->
case rabbit_feature_flags:is_enabled(credit_api_v2) of
true -> {Mode, Args};
false -> {PrefetchCount, Args}
end;
consume_backwards_compat({credited, InitialDeliveryCount} = Mode, Args)
when is_integer(InitialDeliveryCount) ->
%% credit API v2
{Mode, Args};
consume_backwards_compat({credited, credit_api_v1}, Args) ->
%% credit API v1
{_PrefetchCount = 0,
[{<<"x-credit">>, table, [{<<"credit">>, long, 0},
{<<"drain">>, bool, false}]} | Args]}.
Expand Down Expand Up @@ -293,15 +301,13 @@ settle(_QName, Op, _CTag, MsgIds, State) ->
[{reject, Op == requeue, MsgIds, ChPid}]}),
{State, []}.

credit(_QName, CTag, Credit, Drain, Reply, Properties, #?STATE{pid = QPid} = State) ->
ChPid = self(),
Request = case credit_api_vsn() of
v2 ->
%% TODO use a map for more flexiblity in the future?
{credit, ChPid, CTag, Credit, Drain, Reply, Properties};
v1 ->
{credit, ChPid, CTag, Credit, Drain}
end,
credit_v1(_QName, Ctag, LinkCreditSnd, Drain, #?STATE{pid = QPid} = State) ->
Request = {credit, self(), Ctag, LinkCreditSnd, Drain},
delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}),
{State, []}.

credit(_QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, #?STATE{pid = QPid} = State) ->
Request = {credit, self(), Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo},
delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}),
{State, []}.

Expand Down Expand Up @@ -374,8 +380,7 @@ handle_event(_QName, Action, State)
handle_event(_QName, {send_drained, {Ctag, Credit}}, State) ->
%% This function clause should be deleted when feature flag
%% credit_api_v2 becomes required.
Action = {credit_reply, Ctag, Credit, _Available = 0,
_Drain = true, _Properties = #{}},
Action = {credit_reply_v1, Ctag, Credit, _Available = 0, _Drain = true},
{ok, State, [Action]}.

settlement_action(_Type, _QRef, [], Acc) ->
Expand Down Expand Up @@ -642,27 +647,19 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
Evt = {deliver, CTag, AckRequired, [Message]},
send_queue_event(Pid, QName, Evt).

send_credit_reply(Pid, QName, Ctag, Credit, Available, Drain, Properties) ->
case credit_api_vsn() of
v2 ->
Evt = {credit_reply, Ctag, Credit, Available, Drain, Properties},
send_queue_event(Pid, QName, Evt);
v1 ->
case Drain of
true ->
Evt = {send_drained, {Ctag, Credit}},
send_queue_event(Pid, QName, Evt);
false ->
Evt = {send_credit_reply, Available},
send_queue_event(Pid, QName, Evt)
end
end.
%% Delete this function when feature flag credit_api_v2 becomes required.
send_credit_reply_credit_api_v1(Pid, QName, Available) ->
Evt = {send_credit_reply, Available},
send_queue_event(Pid, QName, Evt).

%% Delete this function when feature flag credit_api_v2 becomes required.
send_drained_credit_api_v1(Pid, QName, Ctag, Credit) ->
Evt = {send_drained, {Ctag, Credit}},
send_queue_event(Pid, QName, Evt).

send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
Evt = {credit_reply, Ctag, DeliveryCount, Credit, Available, Drain},
send_queue_event(Pid, QName, Evt).

send_queue_event(Pid, QName, Event) ->
gen_server:cast(Pid, {queue_event, QName, Event}).

credit_api_vsn() ->
case rabbit_feature_flags:is_enabled(credit_api_v2) of
true -> v2;
false -> v1
end.
Loading

0 comments on commit deaf78f

Please sign in to comment.