Skip to content

Commit

Permalink
Merge pull request #12317 from rabbitmq/md/khepri/mqtt-fixes
Browse files Browse the repository at this point in the history
Handle database timeouts in MQTT queue deletion
  • Loading branch information
michaelklishin authored Sep 16, 2024
2 parents e144c58 + a65ceb6 commit 4805e31
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 31 deletions.
19 changes: 16 additions & 3 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,14 @@ notify_policy_changed(Q) when ?is_amqqueue(Q) ->

consumers(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
try
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]})
catch
exit:_ ->
%% The queue process exited during the call.
%% Note that `delegate:invoke/2' catches errors but not exits.
[]
end;
consumers(Q) when ?amqqueue_is_quorum(Q) ->
QPid = amqqueue:get_pid(Q),
case ra:local_query(QPid, fun rabbit_fifo:query_consumers/1) of
Expand Down Expand Up @@ -1619,17 +1626,23 @@ delete_immediately_by_resource(Resources) ->
-spec delete
(amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
qlen() |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
(amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
qlen() | rabbit_types:error('in_use') |
qlen() |
rabbit_types:error('in_use') |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
(amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) ->
qlen() | rabbit_types:error('not_empty') |
qlen() |
rabbit_types:error('not_empty') |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
(amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty') |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, IfUnused, IfEmpty, ActingUser) ->
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).
Expand Down
7 changes: 2 additions & 5 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,8 @@ delete(Q0, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_classic(Q0) ->
case delete_crashed_internal(Q, ActingUser) of
ok ->
{ok, 0};
{error, timeout} ->
{error, protocol_error,
"The operation to delete ~ts from the "
"metadata store timed out",
[rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end
end
end;
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ declare(Q0, Node) ->
boolean(), rabbit_types:username()) ->
rabbit_types:ok(non_neg_integer()) |
rabbit_types:error(in_use | not_empty) |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, IfUnused, IfEmpty, ActingUser) ->
Mod = amqqueue:get_type(Q),
Expand Down
12 changes: 4 additions & 8 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -823,10 +823,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete ~ts from the metadata store "
"timed out", [rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end;
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
Expand All @@ -849,10 +847,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
case delete_queue_data(Q, ActingUser) of
ok ->
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete queue ~ts from the metadata "
"store timed out", [rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end
end.

Expand Down
8 changes: 6 additions & 2 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ delete_stream(Q, ActingUser)
#{name := StreamId} = amqqueue:get_type_state(Q),
case process_command({delete_stream, StreamId, #{}}) of
{ok, ok, _} ->
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
{ok, {ok, 0}};
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
ok ->
{ok, {ok, 0}};
{error, timeout} = Err ->
Err
end;
Err ->
Err
end.
Expand Down
5 changes: 4 additions & 1 deletion deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,14 @@ create_stream(Q0) ->
-spec delete(amqqueue:amqqueue(), boolean(),
boolean(), rabbit_types:username()) ->
rabbit_types:ok(non_neg_integer()) |
rabbit_types:error(in_use | not_empty).
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
case rabbit_stream_coordinator:delete_stream(Q, ActingUser) of
{ok, Reply} ->
Reply;
{error, timeout} = Err ->
Err;
Error ->
{protocol_error, internal_error, "Cannot delete ~ts on node '~ts': ~255p ",
[rabbit_misc:rs(amqqueue:get_name(Q)), node(), Error]}
Expand Down
26 changes: 19 additions & 7 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,9 @@ handle_clean_start(_, QoS, State = #state{cfg = #cfg{clean_start = true}}) ->
ok ->
{ok, SessPresent, State};
{error, access_refused} ->
{error, ?RC_NOT_AUTHORIZED}
{error, ?RC_NOT_AUTHORIZED};
{error, _Reason} ->
{error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}
end
end;
handle_clean_start(SessPresent, QoS,
Expand Down Expand Up @@ -991,7 +993,8 @@ clear_will_msg(#state{cfg = #cfg{vhost = Vhost,
QName = #resource{virtual_host = Vhost, kind = queue, name = QNameBin},
case delete_queue(QName, State) of
ok -> ok;
{error, access_refused} -> {error, ?RC_NOT_AUTHORIZED}
{error, access_refused} -> {error, ?RC_NOT_AUTHORIZED};
{error, _Reason} -> {error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}
end.

make_will_msg(#mqtt_packet_connect{will_flag = false}) ->
Expand Down Expand Up @@ -1323,8 +1326,10 @@ ensure_queue(QoS, State) ->
case delete_queue(QName, State) of
ok ->
create_queue(QoS, State);
{error, access_refused} = E ->
E
{error, _} = Err ->
Err;
{protocol_error, _, _, _} = Err ->
{error, Err}
end;
{error, not_found} ->
create_queue(QoS, State)
Expand Down Expand Up @@ -1829,7 +1834,10 @@ maybe_delete_mqtt_qos0_queue(_) ->
ok.

-spec delete_queue(rabbit_amqqueue:name(), state()) ->
ok | {error, access_refused}.
ok |
{error, access_refused} |
{error, timeout} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete_queue(QName,
#state{auth_state = #auth_state{
user = User = #user{username = Username},
Expand All @@ -1841,8 +1849,12 @@ delete_queue(QName,
fun (Q) ->
case check_resource_access(User, QName, configure, AuthzCtx) of
ok ->
{ok, _N} = rabbit_queue_type:delete(Q, false, false, Username),
ok;
case rabbit_queue_type:delete(Q, false, false, Username) of
{ok, _} ->
ok;
Err ->
Err
end;
Err ->
Err
end
Expand Down
9 changes: 4 additions & 5 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,16 @@ declare(Q0, _Node) ->
boolean(),
boolean(),
rabbit_types:username()) ->
rabbit_types:ok(non_neg_integer()).
rabbit_types:ok(non_neg_integer()) |
rabbit_types:error(timeout).
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
QName = amqqueue:get_name(Q),
log_delete(QName, amqqueue:get_exclusive_owner(Q)),
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
ok ->
{ok, 0};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete ~ts from the metadata store timed "
"out", [rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end.

-spec deliver([{amqqueue:amqqueue(), stateless}],
Expand Down

0 comments on commit 4805e31

Please sign in to comment.