From f8ddfe601596f9c3daea3b6c10e754b72f6699a0 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 14 May 2024 13:31:53 +0200 Subject: [PATCH] Remove BCC from x-death routing-keys This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11174 which broke the following Java client test: ``` ./mvnw verify -P '!setup-test-cluster' -Drabbitmqctl.bin=DOCKER:rabbitmq -Dit.test=DeadLetterExchange#deadLetterNewRK ``` The desired documented behaviour is the following: > routing-keys: the routing keys (including CC keys but excluding BCC ones) the message was published with This behaviour should be respected also for messages dead lettered into a stream. Therefore, instead of first including the BCC keys in the `#death.routing_keys` field and removing it again in mc_amqpl before sending the routing-keys to the client as done in v3.13.2 in https://github.com/rabbitmq/rabbitmq-server/blob/dc25ef53292eb0b34588ab8eaae61082b966b784/deps/rabbit/src/mc_amqpl.erl#L527 we instead omit directly the BCC keys from `#death.routing_keys` when recording a death event. This commit records the BCC keys in their own mc `bcc` annotation in `mc_amqpl:init/1`. (cherry picked from commit 90a40107b464257baa944d9b788de7b23be8934d) --- deps/rabbit/src/mc.erl | 12 ++++++++-- deps/rabbit/src/mc_amqpl.erl | 22 ++++++++++++----- deps/rabbit/test/dead_lettering_SUITE.erl | 29 ++++++++++++++++------- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 3dc0c140d820..022d373d4fb4 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -348,14 +348,22 @@ record_death(Reason, SourceQueue, is_binary(SourceQueue) -> Key = {SourceQueue, Reason}, #{?ANN_EXCHANGE := Exchange, - ?ANN_ROUTING_KEYS := RoutingKeys} = Anns0, + ?ANN_ROUTING_KEYS := RKeys0} = Anns0, + %% The routing keys that we record in the death history and will + %% report to the client should include CC, but exclude BCC. + RKeys = case Anns0 of + #{bcc := BccKeys} -> + RKeys0 -- BccKeys; + _ -> + RKeys0 + end, Timestamp = os:system_time(millisecond), Ttl = maps:get(ttl, Anns0, undefined), DeathAnns = rabbit_misc:maps_put_truthy( ttl, Ttl, #{first_time => Timestamp, last_time => Timestamp}), NewDeath = #death{exchange = Exchange, - routing_keys = RoutingKeys, + routing_keys = RKeys, count = 1, anns = DeathAnns}, Anns = case Anns0 of diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index b1ad8eb17bc2..7462edae7143 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -49,10 +49,11 @@ %% mc implementation init(#content{} = Content0) -> - Content = rabbit_binary_parser:ensure_content_decoded(Content0), + Content1 = rabbit_binary_parser:ensure_content_decoded(Content0), %% project essential properties into annotations - Anns = essential_properties(Content), - {strip_header(Content, ?DELETED_HEADER), Anns}. + Anns = essential_properties(Content1), + Content = strip_header(Content1, ?DELETED_HEADER), + {Content, Anns}. convert_from(mc_amqp, Sections, _Env) -> {H, MAnn, Prop, AProp, BodyRev} = @@ -480,7 +481,7 @@ message(#resource{name = ExchangeNameBin}, RoutingKey, Error; HeaderRoutes -> {ok, mc:init(?MODULE, - rabbit_basic:strip_bcc_header(Content), + Content, Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes], ?ANN_EXCHANGE => ExchangeNameBin})} end; @@ -734,7 +735,8 @@ message_id(undefined, _HKey, H) -> essential_properties(#content{} = C) -> #'P_basic'{delivery_mode = Mode, priority = Priority, - timestamp = TimestampRaw} = Props = C#content.properties, + timestamp = TimestampRaw, + headers = Headers} = Props = C#content.properties, {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), Timestamp = case TimestampRaw of undefined -> @@ -744,6 +746,12 @@ essential_properties(#content{} = C) -> TimestampRaw * 1000 end, Durable = Mode == 2, + BccKeys = case rabbit_basic:header(<<"BCC">>, Headers) of + {<<"BCC">>, array, Routes} -> + [Route || {longstr, Route} <- Routes]; + _ -> + undefined + end, maps_put_truthy( ?ANN_PRIORITY, Priority, maps_put_truthy( @@ -752,7 +760,9 @@ essential_properties(#content{} = C) -> ?ANN_TIMESTAMP, Timestamp, maps_put_falsy( ?ANN_DURABLE, Durable, - #{})))). + maps_put_truthy( + bcc, BccKeys, + #{}))))). %% headers that are added as annotations during conversions is_internal_header(<<"x-basic-", _/binary>>) -> diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index a477a7d6d431..10622b9fd622 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -1391,17 +1391,18 @@ dead_letter_headers_BCC(Config) -> routing_key = DLXQName}), P1 = <<"msg1">>, - BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]}, - publish(Ch, QName, [P1], [BCCHeader]), + CCHeader = {<<"CC">>, array, [{longstr, <<"cc 1">>}, {longstr, <<"cc 2">>}]}, + BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}, {longstr, <<"bcc 2">>}]}, + publish(Ch, QName, [P1], [CCHeader, BCCHeader]), %% Message is published to both queues because of BCC header and DLX queue bound to both %% exchanges wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1, props = #'P_basic'{headers = Headers1}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), + amqp_channel:call(Ch, #'basic.get'{queue = QName}), {#'basic.get_ok'{}, #amqp_msg{payload = P1, props = #'P_basic'{headers = Headers2}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), %% We check the headers to ensure no dead lettering has happened ?assertEqual(undefined, header_lookup(Headers1, <<"x-death">>)), ?assertEqual(undefined, header_lookup(Headers2, <<"x-death">>)), @@ -1413,10 +1414,15 @@ dead_letter_headers_BCC(Config) -> wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), {#'basic.get_ok'{}, #amqp_msg{payload = P1, props = #'P_basic'{headers = Headers3}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), consume_empty(Ch, QName), ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)), - ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). + {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers3, <<"x-death">>), + {array, RKeys0} = rabbit_misc:table_lookup(Death, <<"routing-keys">>), + RKeys = [RKey || {longstr, RKey} <- RKeys0], + %% routing-keys in the death history should include CC but exclude BCC keys + ?assertEqual(lists:sort([QName, <<"cc 1">>, <<"cc 2">>]), + lists:sort(RKeys)). %% Three top-level headers are added for the very first dead-lettering event. %% They are @@ -1681,7 +1687,11 @@ stream(Config) -> #'basic.publish'{routing_key = Q1}, #amqp_msg{payload = Payload, props = #'P_basic'{expiration = <<"0">>, - headers = [{<<"CC">>, array, [{longstr, <<"other key">>}]}]} + headers = [{<<"CC">>, array, [{longstr, <<"cc 1">>}, + {longstr, <<"cc 2">>}]}, + {<<"BCC">>, array, [{longstr, <<"bcc 1">>}, + {longstr, <<"bcc 2">>}]} + ]} }), #'basic.qos_ok'{} = amqp_channel:call(Ch1, #'basic.qos'{prefetch_count = 1}), @@ -1722,7 +1732,10 @@ stream(Config) -> ?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Death1, <<"reason">>)), ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death1, <<"exchange">>)), ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), - ?assertEqual({array, [{longstr, Q1}, {longstr, <<"other key">>}]}, + %% routing-keys in the death history should include CC but exclude BCC keys + ?assertEqual({array, [{longstr, Q1}, + {longstr, <<"cc 1">>}, + {longstr, <<"cc 2">>}]}, rabbit_misc:table_lookup(Death1, <<"routing-keys">>)), ?assertEqual({longstr, <<"0">>}, rabbit_misc:table_lookup(Death1, <<"original-expiration">>)), {timestamp, T1} = rabbit_misc:table_lookup(Death1, <<"time">>),