Skip to content

Commit

Permalink
Merge pull request #9034 from rabbitmq/mqtt-nack
Browse files Browse the repository at this point in the history
Nack rejected messages to MQTT 5.0 client
  • Loading branch information
michaelklishin authored Aug 9, 2023
2 parents 79e3c10 + 2a4301e commit 17c9acf
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 107 deletions.
6 changes: 0 additions & 6 deletions deps/rabbit/src/rabbit_confirms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,3 @@ next_smallest(S, U) when is_map_key(S, U) ->
next_smallest(S, U) ->
%% TODO: this is potentially infinitely recursive if called incorrectly
next_smallest(S+1, U).



-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
50 changes: 6 additions & 44 deletions deps/rabbit/test/rabbit_confirms_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
-module(rabbit_confirms_SUITE).

-compile(export_all).

-export([
]).

-include_lib("common_test/include/ct.hrl").
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").

%%%===================================================================
Expand All @@ -17,40 +12,13 @@ all() ->
{group, tests}
].


all_tests() ->
[
confirm,
reject,
remove_queue
].

groups() ->
[
{tests, [], all_tests()}
].

init_per_suite(Config) ->
Config.

end_per_suite(_Config) ->
ok.

init_per_group(_Group, Config) ->
Config.

end_per_group(_Group, _Config) ->
ok.

init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(_TestCase, _Config) ->
ok.

%%%===================================================================
%%% Test cases
%%%===================================================================
{tests, [shuffle],
[confirm,
reject,
remove_queue
]}].

confirm(_Config) ->
XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
Expand Down Expand Up @@ -93,7 +61,6 @@ confirm(_Config) ->
?assertEqual(0, rabbit_confirms:size(U7)),
?assertEqual(undefined, rabbit_confirms:smallest(U7)),


U8 = rabbit_confirms:insert(2, [QName], XName, U1),
{[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8),
ok.
Expand Down Expand Up @@ -126,7 +93,6 @@ reject(_Config) ->
{error, not_found} = rabbit_confirms:reject(2, U5),
?assertEqual(1, rabbit_confirms:size(U5)),
?assertEqual(1, rabbit_confirms:smallest(U5)),

ok.

remove_queue(_Config) ->
Expand All @@ -147,8 +113,4 @@ remove_queue(_Config) ->
U5 = rabbit_confirms:insert(1, [QName], XName, U0),
U6 = rabbit_confirms:insert(2, [QName], XName, U5),
{[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6),

ok.


%% Utility
8 changes: 8 additions & 0 deletions deps/rabbitmq_mqtt/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ rabbitmq_suite(
],
)

rabbitmq_suite(
name = "rabbit_mqtt_confirms_SUITE",
size = "small",
deps = [
"//deps/rabbit_common:erlang_app",
],
)

assert_suites()

alias(
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbitmq_mqtt/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,11 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "rabbit_mqtt_confirms_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_mqtt_confirms_SUITE.erl"],
outs = ["test/rabbit_mqtt_confirms_SUITE.beam"],
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)
31 changes: 15 additions & 16 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_confirms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,21 @@ insert(PktId, QNames, State)
-spec confirm([packet_id()], queue_name(), state()) ->
{[packet_id()], state()}.
confirm(PktIds, QName, State0) ->
{L0, State} = lists:foldl(fun(PktId, Acc) ->
confirm_one(PktId, QName, Acc)
end, {[], State0}, PktIds),
L = lists:reverse(L0),
{L, State}.
lists:foldl(fun(PktId, Acc) ->
confirm_one(PktId, QName, Acc)
end, {[], State0}, PktIds).

-spec reject(packet_id(), state()) ->
{ok, state()} | {error, not_found}.
reject(PktId, State0)
when is_integer(PktId) ->
case maps:take(PktId, State0) of
{_, State} ->
{ok, State};
error ->
{error, not_found}
end.
-spec reject([packet_id()], state()) ->
{[packet_id()], state()}.
reject(PktIds, State0) ->
lists:foldl(fun(PktId, Acc = {Rejs, S0}) ->
case maps:take(PktId, S0) of
{_, S} ->
{[PktId | Rejs], S};
error ->
Acc
end
end, {[], State0}, PktIds).

%% idempotent
-spec remove_queue(queue_name(), state()) ->
Expand All @@ -77,7 +76,7 @@ remove_queue(QName, State) ->
(_, _, PktIds) ->
PktIds
end, [], State),
confirm(lists:sort(PktIds), QName, State).
confirm(PktIds, QName, State).

%% INTERNAL

Expand Down
40 changes: 20 additions & 20 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1686,17 +1686,15 @@ process_routing_confirm(#delivery{confirm = true,
U = rabbit_mqtt_confirms:insert(PktId, QNames, U0),
State#state{unacked_client_pubs = U}.

-spec send_puback(list(packet_id()), state()) -> ok.
send_puback(PktIds0, State)
-spec send_puback(packet_id() | list(packet_id()), reason_code(), state()) -> ok.
send_puback(PktIds0, ReasonCode, State)
when is_list(PktIds0) ->
%% Classic queues confirm messages unordered.
%% Let's sort them here assuming most MQTT clients send with an increasing packet identifier.
PktIds = lists:usort(PktIds0),
lists:foreach(fun(Id) ->
send_puback(Id, ?RC_SUCCESS, State)
end, PktIds).

-spec send_puback(packet_id(), reason_code(), state()) -> ok.
send_puback(Id, ReasonCode, State)
end, PktIds);
send_puback(PktId, ReasonCode, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
rabbit_global_counters:messages_confirmed(ProtoVer, 1),
Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?PUBACK},
Expand Down Expand Up @@ -1971,7 +1969,7 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
QStates = rabbit_queue_type:remove(QRef, QStates1),
State = State0#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, State),
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State}
end.

Expand Down Expand Up @@ -2001,7 +1999,7 @@ handle_queue_event({queue_event, QName, Evt},
QStates = rabbit_queue_type:remove(QName, QStates0),
State = State1#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, State),
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State};
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
{error, Error, State0}
Expand All @@ -2013,19 +2011,21 @@ handle_queue_actions(Actions, #state{} = State0) ->
deliver_to_client(Msgs, Ack, S);
({settled, QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
{ConfirmPktIds, U} = rabbit_mqtt_confirms:confirm(PktIds, QName, U0),
send_puback(ConfirmPktIds, S),
S#state{unacked_client_pubs = U};
({rejected, _QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
%% Negative acks are supported in MQTT v5 only.
%% Therefore, in MQTT v3 and v4 we ignore rejected messages.
U = lists:foldl(
fun(PktId, Acc0) ->
case rabbit_mqtt_confirms:reject(PktId, Acc0) of
{ok, Acc} -> Acc;
{error, not_found} -> Acc0
end
end, U0, PktIds),
send_puback(ConfirmPktIds, ?RC_SUCCESS, S),
S#state{unacked_client_pubs = U};
({rejected, _QName, PktIds}, S0 = #state{unacked_client_pubs = U0,
cfg = #cfg{proto_ver = ProtoVer}}) ->
{RejectPktIds, U} = rabbit_mqtt_confirms:reject(PktIds, U0),
S = S0#state{unacked_client_pubs = U},
%% Negative acks are supported only in MQTT v5. In MQTT v3 and v4 we ignore
%% rejected messages since we can only (but must not) send a positive ack.
case ProtoVer of
?MQTT_PROTO_V5 ->
send_puback(RejectPktIds, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, S);
_ ->
ok
end,
S;
({block, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
S#state{queues_soft_limit_exceeded = sets:add_element(QName, QSLE)};
({unblock, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
Expand Down
109 changes: 109 additions & 0 deletions deps/rabbitmq_mqtt/test/rabbit_mqtt_confirms_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
-module(rabbit_mqtt_confirms_SUITE).

-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").

%%%===================================================================
%%% Common Test callbacks
%%%===================================================================

all() ->
[
{group, tests}
].

groups() ->
[
{tests, [shuffle],
[confirm,
reject,
remove_queue
]}].

-define(MOD, rabbit_mqtt_confirms).

confirm(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
?assertEqual(0, ?MOD:size(U0)),

U1 = ?MOD:insert(1, [QName], U0),
?assertEqual(1, ?MOD:size(U1)),
?assert(?MOD:contains(1, U1)),

{[1], U2} = ?MOD:confirm([1], QName, U1),
?assertEqual(0, ?MOD:size(U2)),
?assertNot(?MOD:contains(1, U2)),

U3 = ?MOD:insert(2, [QName], U1),
?assertEqual(2, ?MOD:size(U3)),
?assert(?MOD:contains(1, U3)),
?assert(?MOD:contains(2, U3)),

{[1], U4} = ?MOD:confirm([1], QName, U3),
?assertEqual(1, ?MOD:size(U4)),
?assertNot(?MOD:contains(1, U4)),
?assert(?MOD:contains(2, U4)),

U5 = ?MOD:insert(2, [QName, QName2], U1),
?assertEqual(2, ?MOD:size(U5)),
?assert(?MOD:contains(1, U5)),
?assert(?MOD:contains(2, U5)),

{[1], U6} = ?MOD:confirm([1, 2], QName, U5),
{[2], U7} = ?MOD:confirm([2], QName2, U6),
?assertEqual(0, ?MOD:size(U7)),

U8 = ?MOD:insert(2, [QName], U1),
{Confirmed, _U9} = ?MOD:confirm([1, 2], QName, U8),
?assertEqual([1, 2], lists:sort(Confirmed)),
ok.


reject(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
?assertEqual(0, ?MOD:size(U0)),

U1 = ?MOD:insert(1, [QName], U0),
?assert(?MOD:contains(1, U1)),

{[1], U2} = ?MOD:reject([1], U1),
{[], U2} = ?MOD:reject([1], U2),
?assertEqual(0, ?MOD:size(U2)),
?assertNot(?MOD:contains(1, U2)),

U3 = ?MOD:insert(2, [QName, QName2], U1),

{[1], U4} = ?MOD:reject([1], U3),
{[], U4} = ?MOD:reject([1], U4),
?assertEqual(1, ?MOD:size(U4)),

{[2], U5} = ?MOD:reject([2], U3),
{[], U5} = ?MOD:reject([2], U5),
?assertEqual(1, ?MOD:size(U5)),
?assert(?MOD:contains(1, U5)),
ok.

remove_queue(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),

U1 = ?MOD:insert(1, [QName, QName2], U0),
U2 = ?MOD:insert(2, [QName2], U1),
{[2], U3} = ?MOD:remove_queue(QName2, U2),
?assertEqual(1, ?MOD:size(U3)),
?assert(?MOD:contains(1, U3)),
{[1], U4} = ?MOD:remove_queue(QName, U3),
?assertEqual(0, ?MOD:size(U4)),
?assertNot(?MOD:contains(1, U4)),

U5 = ?MOD:insert(1, [QName], U0),
U6 = ?MOD:insert(2, [QName], U5),
{Confirmed, U7} = ?MOD:remove_queue(QName, U6),
?assertEqual([1, 2], lists:sort(Confirmed)),
?assertEqual(0, ?MOD:size(U7)),
ok.
Loading

0 comments on commit 17c9acf

Please sign in to comment.