Skip to content

Commit

Permalink
Merge pull request #11174 from rabbitmq/deaths-v2
Browse files Browse the repository at this point in the history
Fix dead lettering
  • Loading branch information
kjnilsson authored May 13, 2024
2 parents d180474 + 6b300a2 commit c35a0b8
Show file tree
Hide file tree
Showing 14 changed files with 926 additions and 273 deletions.
8 changes: 7 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,13 @@ rabbitmq_integration_suite(
additional_beam = [
":test_queue_utils_beam",
],
shard_count = 7,
shard_count = 8,
)

rabbitmq_integration_suite(
name = "message_containers_deaths_v2_SUITE",
size = "medium",
shard_count = 1,
)

rabbitmq_integration_suite(
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2219,3 +2219,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
)
erlang_bytecode(
name = "message_containers_deaths_v2_SUITE_beam_files",
testonly = True,
srcs = ["test/message_containers_deaths_v2_SUITE.erl"],
outs = ["test/message_containers_deaths_v2_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
36 changes: 22 additions & 14 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first
last_time := non_neg_integer(), %% the timestamp of the last
ttl => OriginalExpiration :: non_neg_integer()}.
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_anns()}).

-record(deaths, {first :: death_key(),
last :: death_key(),
records = #{} :: #{death_key() := #death{}}}).


%% good enough for most use cases
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).

Expand All @@ -26,3 +12,25 @@
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).

-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).

-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{%% timestamp of the first time this message
%% was dead lettered from this queue for this reason
first_time := pos_integer(),
%% timestamp of the last time this message
%% was dead lettered from this queue for this reason
last_time := pos_integer(),
ttl => OriginalTtlHeader :: non_neg_integer()}.

-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys :: OriginalRoutingKeys :: [rabbit_types:routing_key(),...],
%% how many times this message was dead lettered from this queue for this reason
count :: pos_integer(),
anns :: death_anns()}).

-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-*
last :: death_key(), % redundant to mc annotations x-last-death-*
records :: #{death_key() := #death{}}
}).
175 changes: 101 additions & 74 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
convert/3,
protocol_state/1,
prepare/2,
record_death/3,
record_death/4,
is_death_cycle/2,
last_death/1,
death_queue_names/1
]).

Expand Down Expand Up @@ -356,89 +355,103 @@ protocol_state(BasicMsg) ->
mc_compat:protocol_state(BasicMsg).

-spec record_death(rabbit_dead_letter:reason(),
SourceQueue :: rabbit_misc:resource_name(),
state()) -> state().
rabbit_misc:resource_name(),
state(),
environment()) -> state().
record_death(Reason, SourceQueue,
#?MODULE{protocol = _Mod,
data = _Data,
annotations = Anns0} = State)
when is_atom(Reason) andalso is_binary(SourceQueue) ->
#?MODULE{annotations = Anns0} = State,
Env)
when is_atom(Reason) andalso
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns0, undefined),

ReasonBin = atom_to_binary(Reason),
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{first_time => Timestamp,
last_time => Timestamp}),
case maps:get(deaths, Anns0, undefined) of
undefined ->
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}}},
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
DeathAnns = rabbit_misc:maps_put_truthy(
ttl, Ttl, #{first_time => Timestamp,
last_time => Timestamp}),
NewDeath = #death{exchange = Exchange,
routing_keys = RoutingKeys,
count = 1,
anns = DeathAnns},
Anns = case Anns0 of
#{deaths := Deaths0} ->
Deaths = case Deaths0 of
#deaths{records = Rs0} ->
Rs = maps:update_with(
Key,
fun(Death) ->
update_death(Death, Timestamp)
end,
NewDeath,
Rs0),
Deaths0#deaths{last = Key,
records = Rs};
_ ->
%% Deaths are ordered by recency
case lists:keytake(Key, 1, Deaths0) of
{value, {Key, D0}, Deaths1} ->
D = update_death(D0, Timestamp),
[{Key, D} | Deaths1];
false ->
[{Key, NewDeath} | Deaths0]
end
end,
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange,
deaths := Deaths};
_ ->
Deaths = case Env of
#{?FF_MC_DEATHS_V2 := false} ->
#deaths{last = Key,
first = Key,
records = #{Key => NewDeath}};
_ ->
[{Key, NewDeath}]
end,
ReasonBin = atom_to_binary(Reason),
Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange
},

State#?MODULE{annotations = Anns#{deaths => Ds}};
#deaths{records = Rs} = Ds0 ->
Death = #death{count = C,
anns = DA} = maps:get(Key, Rs,
#death{exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}),
Ds = Ds0#deaths{last = Key,
records = Rs#{Key =>
Death#death{count = C + 1,
anns = DA#{last_time => Timestamp}}}},
Anns = Anns0#{deaths => Ds,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange},
State#?MODULE{annotations = Anns}
end;
record_death(Reason, SourceQueue, BasicMsg) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg).

<<"x-last-death-exchange">> => Exchange,
deaths => Deaths}
end,
State#?MODULE{annotations = Anns};
record_death(Reason, SourceQueue, BasicMsg, Env) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env).

update_death(#death{count = Count,
anns = DeathAnns} = Death, Timestamp) ->
Death#death{count = Count + 1,
anns = DeathAnns#{last_time := Timestamp}}.

-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) ->
is_cycle_v1(TargetQueue, maps:keys(Rs));
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
is_cycle(TargetQueue, maps:keys(Deaths#deaths.records));
is_cycle_v2(TargetQueue, Deaths);
is_death_cycle(_TargetQueue, #?MODULE{}) ->
false;
is_death_cycle(TargetQueue, BasicMsg) ->
mc_compat:is_death_cycle(TargetQueue, BasicMsg).

%% Returns death queue names ordered by recency.
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
death_queue_names(#?MODULE{annotations = Anns}) ->
case maps:get(deaths, Anns, undefined) of
undefined ->
[];
#deaths{records = Records} ->
proplists:get_keys(maps:keys(Records))
end;
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) ->
proplists:get_keys(maps:keys(Rs));
death_queue_names(#?MODULE{annotations = #{deaths := Deaths}}) ->
lists:map(fun({{Queue, _Reason}, _Death}) ->
Queue
end, Deaths);
death_queue_names(#?MODULE{}) ->
[];
death_queue_names(BasicMsg) ->
mc_compat:death_queue_names(BasicMsg).

-spec last_death(state()) ->
undefined | {death_key(), #death{}}.
last_death(#?MODULE{annotations = Anns})
when not is_map_key(deaths, Anns) ->
undefined;
last_death(#?MODULE{annotations = #{deaths := #deaths{last = Last,
records = Rs}}}) ->
{Last, maps:get(Last, Rs)};
last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).

-spec prepare(read | store, state()) -> state().
prepare(For, #?MODULE{protocol = Proto,
data = Data} = State) ->
Expand All @@ -448,24 +461,38 @@ prepare(For, State) ->

%% INTERNAL

%% if there is a death with a source queue that is the same as the target
is_cycle_v2(TargetQueue, Deaths) ->
case lists:splitwith(fun({{SourceQueue, _Reason}, #death{}}) ->
SourceQueue =/= TargetQueue
end, Deaths) of
{_, []} ->
false;
{L, [H | _]} ->
%% There is a cycle, but we only want to drop the message
%% if the cycle is "fully automatic", i.e. without a client
%% expliclity rejecting the message somewhere in the cycle.
lists:all(fun({{_SourceQueue, Reason}, _Death}) ->
Reason =/= rejected
end, [H | L])
end.

%% The desired v1 behaviour is the following:
%% "If there is a death with a source queue that is the same as the target
%% queue name and there are no newer deaths with the 'rejected' reason then
%% consider this a cycle
is_cycle(_Queue, []) ->
%% consider this a cycle."
%% However, the correct death order cannot be reliably determined in v1.
%% deaths_v2 fixes this bug.
is_cycle_v1(_Queue, []) ->
false;
is_cycle(_Queue, [{_Q, rejected} | _]) ->
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->
%% any rejection breaks the cycle
false;
is_cycle(Queue, [{Queue, Reason} | _])
is_cycle_v1(Queue, [{Queue, Reason} | _])
when Reason =/= rejected ->
true;
is_cycle(Queue, [_ | Rem]) ->
is_cycle(Queue, Rem).
is_cycle_v1(Queue, [_ | Rem]) ->
is_cycle_v1(Queue, Rem).

set_received_at_timestamp(Anns) ->
Millis = os:system_time(millisecond),
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
Loading

0 comments on commit c35a0b8

Please sign in to comment.