Skip to content

Commit

Permalink
Move feature flag check outside of mc
Browse files Browse the repository at this point in the history
the `mc` module is ideally meant to be kept pure and portable
 and feature flags have external infrastructure dependencies
as well as impure semantics.

Moving the check of this feature flag into the amqp session
simplifies the code (as no message containers with the new
format will enter the system before the feature flag is enabled).
  • Loading branch information
kjnilsson committed May 10, 2024
1 parent 078f4b3 commit 06378c4
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 25 deletions.
8 changes: 0 additions & 8 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,6 @@ last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).

-spec prepare(read | store, state()) -> state().
prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
true ->
State#?MODULE{data = mc_amqp:prepare(store, State#?MODULE.data)};
false ->
State1 = convert(mc_amqpl, State, #{message_containers_store_amqp_v1 => false}),
State1#?MODULE{data = mc_amqpl:prepare(store, State1#?MODULE.data)}
end;
prepare(For, #?MODULE{protocol = Proto,
data = Data} = State) ->
State#?MODULE{data = Proto:prepare(For, Data)};
Expand Down
23 changes: 19 additions & 4 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1872,19 +1872,21 @@ incoming_link_transfer(
end,
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_incoming_message_size(PayloadBin),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
{ok, X, RoutingKey, Mc1, PermCache} ->
Mc = rabbit_message_interceptor:intercept(Mc1),
check_user_id(Mc, User),
Mc2 = rabbit_message_interceptor:intercept(Mc1),
check_user_id(Mc2, User),
TopicPermCache = check_write_permitted_on_topic(
X, User, RoutingKey, TopicPermCache0),
messages_received(Settled),
QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace),
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Qs0 = rabbit_amqqueue:lookup_many(QNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
Mc = ensure_mc_cluster_compat(Mc2),
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
{ok, QStates, Actions} ->
State1 = State0#state{queue_states = QStates,
Expand Down Expand Up @@ -2975,3 +2977,16 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
mc:convert(mc_amqpl, Mc, McEnv)
end.
10 changes: 6 additions & 4 deletions deps/rabbit/test/amqp_system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ end_per_testcase(Testcase, Config) ->

build_dotnet_test_project(Config) ->
TestProjectDir = filename:join(
[?config(data_dir, Config), "fsharp-tests"]),
[?config(data_dir, Config), "fsharp-tests"]),
Ret = rabbit_ct_helpers:exec(["dotnet", "restore"],
[{cd, TestProjectDir}]),
[{cd, TestProjectDir}]),
case Ret of
{ok, _} ->
rabbit_ct_helpers:set_config(Config,
{dotnet_test_project_dir, TestProjectDir});
rabbit_ct_helpers:set_config(
Config, {dotnet_test_project_dir, TestProjectDir});
_ ->
{skip, "Failed to fetch .NET Core test project dependencies"}
end.
Expand All @@ -119,6 +119,8 @@ roundtrip(Config) ->
{java, "RoundTripTest"}]).

streams(Config) ->
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config,
message_containers_store_amqp_v1),
Ch = rabbit_ct_client_helpers:open_channel(Config),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q2">>,
durable = true,
Expand Down
10 changes: 1 addition & 9 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
Msg :: mc:state(),
rabbit_queue_type:delivery_options()) ->
{[], rabbit_queue_type:actions()}.
deliver(Qs, Msg0, Options) ->
Msg = case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
true ->
Msg0;
false ->
%% An old node might not understand our new mc_amqp state.
%% The following line takes care of converting mc_amqp to mc_amqpl.
mc:prepare(store, Msg0)
end,
deliver(Qs, Msg, Options) ->
Evt = {queue_event, ?MODULE,
{?MODULE, _QPid = none, _QMsgId = none, _Redelivered = false, Msg}},
{Pids, Actions} =
Expand Down

0 comments on commit 06378c4

Please sign in to comment.