Skip to content

Commit

Permalink
Move feature flag check outside of mc
Browse files Browse the repository at this point in the history
the `mc` module is ideally meant to be kept pure and portable
 and feature flags have external infrastructure dependencies
as well as impure semantics.

Moving the check of this feature flag into the amqp session
simplifies the code (as no message containers with the new
format will enter the system before the feature flag is enabled).
  • Loading branch information
kjnilsson committed May 10, 2024
1 parent 078f4b3 commit a03ee26
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 27 deletions.
8 changes: 0 additions & 8 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,6 @@ last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).

-spec prepare(read | store, state()) -> state().
prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
true ->
State#?MODULE{data = mc_amqp:prepare(store, State#?MODULE.data)};
false ->
State1 = convert(mc_amqpl, State, #{message_containers_store_amqp_v1 => false}),
State1#?MODULE{data = mc_amqpl:prepare(store, State1#?MODULE.data)}
end;
prepare(For, #?MODULE{protocol = Proto,
data = Data} = State) ->
State#?MODULE{data = Proto:prepare(For, Data)};
Expand Down
20 changes: 16 additions & 4 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1872,19 +1872,21 @@ incoming_link_transfer(
end,
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_incoming_message_size(PayloadBin),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
{ok, X, RoutingKey, Mc1, PermCache} ->
Mc = rabbit_message_interceptor:intercept(Mc1),
check_user_id(Mc, User),
Mc2 = rabbit_message_interceptor:intercept(Mc1),
check_user_id(Mc2, User),
TopicPermCache = check_write_permitted_on_topic(
X, User, RoutingKey, TopicPermCache0),
messages_received(Settled),
QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace),
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Qs0 = rabbit_amqqueue:lookup_many(QNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
Mc = ensure_mc_cluster_compat(Mc2),
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
{ok, QStates, Actions} ->
State1 = State0#state{queue_states = QStates,
Expand Down Expand Up @@ -2975,3 +2977,13 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

ensure_mc_cluster_compat(Mc) ->
McEnv = #{message_containers_store_amqp_v1 =>
rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1)},
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
true ->
Mc;
false ->
mc:convert(mc_amqpl, Mc, McEnv)
end.
13 changes: 8 additions & 5 deletions deps/rabbit/test/amqp_system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ end_per_testcase(Testcase, Config) ->

build_dotnet_test_project(Config) ->
TestProjectDir = filename:join(
[?config(data_dir, Config), "fsharp-tests"]),
[?config(data_dir, Config), "fsharp-tests"]),
Ret = rabbit_ct_helpers:exec(["dotnet", "restore"],
[{cd, TestProjectDir}]),
[{cd, TestProjectDir}]),
case Ret of
{ok, _} ->
rabbit_ct_helpers:set_config(Config,
{dotnet_test_project_dir, TestProjectDir});
_ ->
rabbit_ct_helpers:set_config(
Config, {dotnet_test_project_dir, TestProjectDir});
Err ->
ct:pal("Error cd to ~s ~p", [TestProjectDir, Err]),
{skip, "Failed to fetch .NET Core test project dependencies"}
end.

Expand All @@ -119,6 +120,8 @@ roundtrip(Config) ->
{java, "RoundTripTest"}]).

streams(Config) ->
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config,
message_containers_store_amqp_v1),
Ch = rabbit_ct_client_helpers:open_channel(Config),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q2">>,
durable = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ module Test =
receiver.SetCredit(100, true)
let rtd = receiver.Receive()
assertNotNull rtd
assertEqual 3 rtd.MessageAnnotations.Map.Count
assertTrue (rtd.MessageAnnotations.Map.ContainsKey(Symbol "x-stream-offset"))
assertTrue (rtd.MessageAnnotations.Map.ContainsKey(Symbol "x-exchange"))
assertTrue (rtd.MessageAnnotations.Map.ContainsKey(Symbol "x-routing-key"))
assertEqual 3 rtd.MessageAnnotations.Map.Count

assertEqual body rtd.Body
assertEqual rtd.Properties.CorrelationId corr
Expand Down
10 changes: 1 addition & 9 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
Msg :: mc:state(),
rabbit_queue_type:delivery_options()) ->
{[], rabbit_queue_type:actions()}.
deliver(Qs, Msg0, Options) ->
Msg = case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
true ->
Msg0;
false ->
%% An old node might not understand our new mc_amqp state.
%% The following line takes care of converting mc_amqp to mc_amqpl.
mc:prepare(store, Msg0)
end,
deliver(Qs, Msg, Options) ->
Evt = {queue_event, ?MODULE,
{?MODULE, _QPid = none, _QMsgId = none, _Redelivered = false, Msg}},
{Pids, Actions} =
Expand Down

0 comments on commit a03ee26

Please sign in to comment.