Skip to content

Commit

Permalink
Merge pull request #10004 from cloudamqp/fix_dup_shovel_3_13
Browse files Browse the repository at this point in the history
Fix handling shovels with old supervisor id format
  • Loading branch information
michaelklishin authored Dec 3, 2023
2 parents d2ab7f2 + d284eaf commit 5ede862
Showing 1 changed file with 57 additions and 26 deletions.
83 changes: 57 additions & 26 deletions deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,30 @@ start_child({VHost, ShovelName} = Name, Def) ->
LockId = rabbit_shovel_locks:lock(Name),
cleanup_specs(),
rabbit_log_shovel:debug("Starting a mirrored supervisor named '~ts' in virtual host '~ts'", [ShovelName, VHost]),
Result = case mirrored_supervisor:start_child(
case child_exists(Name)
orelse mirrored_supervisor:start_child(
?SUPERVISOR,
{id(Name), {rabbit_shovel_dyn_worker_sup, start_link, [Name, obfuscated_uris_parameters(Def)]},
transient, ?WORKER_WAIT, worker, [rabbit_shovel_dyn_worker_sup]}) of
true -> ok;
{ok, _Pid} -> ok;
{error, {already_started, _Pid}} -> ok
end,
%% release the lock if we managed to acquire one
rabbit_shovel_locks:unlock(LockId),
Result.
ok.

obfuscated_uris_parameters(Def) when is_map(Def) ->
to_map(rabbit_shovel_parameters:obfuscate_uris_in_definition(to_list(Def)));
obfuscated_uris_parameters(Def) when is_list(Def) ->
rabbit_shovel_parameters:obfuscate_uris_in_definition(Def).

child_exists(Name) ->
lists:any(fun ({{_, N}, _, _, _}) -> N =:= Name;
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
({N, _, _, _}) -> N =:= Name
Id = id(Name),
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
OldId = old_id(Name),
lists:any(fun ({ChildId, _, _, _}) ->
ChildId =:= Id orelse ChildId =:= OldId
end,
mirrored_supervisor:which_children(?SUPERVISOR)).

Expand All @@ -70,13 +74,34 @@ stop_child({VHost, ShovelName} = Name) ->
case get({shovel_worker_autodelete, Name}) of
true -> ok; %% [1]
_ ->
ok = mirrored_supervisor:terminate_child(?SUPERVISOR, id(Name)),
ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name)),
case stop_and_delete_child(id(Name)) of
ok ->
ok;
{error, not_found} ->
case rabbit_khepri:is_enabled() of
true ->
%% Old id format is not supported by and cannot exist in Khepri
ok;
false ->
%% try older format, pre 3.13.0 and 3.12.8.
%% See rabbitmq/rabbitmq-server#9894.
_ = stop_and_delete_child(old_id(Name)),
ok
end
end,
rabbit_shovel_status:remove(Name)
end,
rabbit_shovel_locks:unlock(LockId),
ok.

stop_and_delete_child(Id) ->
case mirrored_supervisor:terminate_child(?SUPERVISOR, Id) of
ok ->
ok = mirrored_supervisor:delete_child(?SUPERVISOR, Id);
{error, not_found} = Error ->
Error
end.

%% [1] An autodeleting worker removes its own parameter, and thus ends
%% up here via the parameter callback. It is a transient worker that
%% is just about to terminate normally - so we don't need to tell the
Expand All @@ -88,41 +113,47 @@ stop_child({VHost, ShovelName} = Name) ->
cleanup_specs() ->
Children = mirrored_supervisor:which_children(?SUPERVISOR),

%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
OldStyleSpecsSet = sets:from_list([element(1, S) || S <- Children]),
NewStyleSpecsSet = sets:from_list([element(2, element(1, S)) || S <- Children]),
ParamsSet = sets:from_list([ {proplists:get_value(vhost, S), proplists:get_value(name, S)}
|| S <- rabbit_runtime_parameters:list_component(<<"shovel">>) ]),
F = fun(Name, ok) ->
ChildIdSet = sets:from_list([element(1, S) || S <- Children]),
ParamsSet = params_to_child_ids(rabbit_khepri:is_enabled()),
F = fun(ChildId, ok) ->
try
%% The supervisor operation is very unlikely to fail, it's the schema
%% data stores that can make a fuss about a non-existent or non-standard value passed in.
%% For example, an old style Shovel name is an invalid Khepri query path element. MK.
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name))
_ = mirrored_supervisor:delete_child(?SUPERVISOR, ChildId)
catch _:_:_Stacktrace ->
ok
end,
ok
end,
%% Khepri won't handle values in OldStyleSpecsSet in its path well. At the same time,
%% those older elements simply cannot exist in Khepri because having Khepri enabled
%% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
AllSpecs =
case rabbit_khepri:is_enabled() of
true -> NewStyleSpecsSet;
false -> sets:union(NewStyleSpecsSet, OldStyleSpecsSet)
end,
%% Delete any supervisor children that do not have their respective runtime parameters in the database.
SetToCleanUp = sets:subtract(AllSpecs, ParamsSet),
SetToCleanUp = sets:subtract(ChildIdSet, ParamsSet),
ok = sets:fold(F, ok, SetToCleanUp).

params_to_child_ids(_KhepriEnabled = true) ->
%% Old id format simply cannot exist in Khepri because having Khepri enabled
%% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
sets:from_list([id({proplists:get_value(vhost, S), proplists:get_value(name, S)})
|| S <- rabbit_runtime_parameters:list_component(<<"shovel">>)]);
params_to_child_ids(_KhepriEnabled = false) ->
sets:from_list(
lists:flatmap(
fun(S) ->
Name = {proplists:get_value(vhost, S), proplists:get_value(name, S)},
%% Supervisor Id format was different pre 3.13.0 and 3.12.8.
%% Try both formats to cover the transitionary mixed version cluster period.
[id(Name), old_id(Name)]
end,
rabbit_runtime_parameters:list_component(<<"shovel">>))).

%%----------------------------------------------------------------------------

init([]) ->
{ok, {{one_for_one, 3, 10}, []}}.

id({V, S} = Name) ->
{[V, S], Name};
{[V, S], Name}.

%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
id(Other) ->
Other.
old_id({_V, _S} = Name) ->
Name.

0 comments on commit 5ede862

Please sign in to comment.