diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index fef3decba6d7..5f73f81c500a 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -1523,7 +1523,14 @@ notify_policy_changed(Q) when ?is_amqqueue(Q) -> consumers(Q) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), - delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}); + try + delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}) + catch + exit:_ -> + %% The queue process exited during the call. + %% Note that `delegate:invoke/2' catches errors but not exits. + [] + end; consumers(Q) when ?amqqueue_is_quorum(Q) -> QPid = amqqueue:get_pid(Q), case ra:local_query(QPid, fun rabbit_fifo:query_consumers/1) of @@ -1619,17 +1626,23 @@ delete_immediately_by_resource(Resources) -> -spec delete (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) -> qlen() | + rabbit_types:error(timeout) | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) -> - qlen() | rabbit_types:error('in_use') | + qlen() | + rabbit_types:error('in_use') | + rabbit_types:error(timeout) | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) -> - qlen() | rabbit_types:error('not_empty') | + qlen() | + rabbit_types:error('not_empty') | + rabbit_types:error(timeout) | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) -> qlen() | rabbit_types:error('in_use') | rabbit_types:error('not_empty') | + rabbit_types:error(timeout) | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. delete(Q, IfUnused, IfEmpty, ActingUser) -> rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser). diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index a7fea8d18187..b7ed084ac0a3 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -171,11 +171,8 @@ delete(Q0, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_classic(Q0) -> case delete_crashed_internal(Q, ActingUser) of ok -> {ok, 0}; - {error, timeout} -> - {error, protocol_error, - "The operation to delete ~ts from the " - "metadata store timed out", - [rabbit_misc:rs(QName)]} + {error, timeout} = Err -> + Err end end end; diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index a7bb45aac12f..938588da6662 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -383,6 +383,7 @@ declare(Q0, Node) -> boolean(), rabbit_types:username()) -> rabbit_types:ok(non_neg_integer()) | rabbit_types:error(in_use | not_empty) | + rabbit_types:error(timeout) | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. delete(Q, IfUnused, IfEmpty, ActingUser) -> Mod = amqqueue:get_type(Q), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 45b97d93eb6e..ed229e7d6ac2 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -823,10 +823,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> _ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], ?RPC_TIMEOUT), {ok, ReadyMsgs}; - {error, timeout} -> - {protocol_error, internal_error, - "The operation to delete ~ts from the metadata store " - "timed out", [rabbit_misc:rs(QName)]} + {error, timeout} = Err -> + Err end; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -849,10 +847,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> case delete_queue_data(Q, ActingUser) of ok -> {ok, ReadyMsgs}; - {error, timeout} -> - {protocol_error, internal_error, - "The operation to delete queue ~ts from the metadata " - "store timed out", [rabbit_misc:rs(QName)]} + {error, timeout} = Err -> + Err end end. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 12c10c5e4ddc..6eac47fc781e 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -189,8 +189,12 @@ delete_stream(Q, ActingUser) #{name := StreamId} = amqqueue:get_type_state(Q), case process_command({delete_stream, StreamId, #{}}) of {ok, ok, _} -> - _ = rabbit_amqqueue:internal_delete(Q, ActingUser), - {ok, {ok, 0}}; + case rabbit_amqqueue:internal_delete(Q, ActingUser) of + ok -> + {ok, {ok, 0}}; + {error, timeout} = Err -> + Err + end; Err -> Err end. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index cbdc20daa5a0..a7aa3a5a18cc 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -214,11 +214,14 @@ create_stream(Q0) -> -spec delete(amqqueue:amqqueue(), boolean(), boolean(), rabbit_types:username()) -> rabbit_types:ok(non_neg_integer()) | - rabbit_types:error(in_use | not_empty). + rabbit_types:error(timeout) | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. delete(Q, _IfUnused, _IfEmpty, ActingUser) -> case rabbit_stream_coordinator:delete_stream(Q, ActingUser) of {ok, Reply} -> Reply; + {error, timeout} = Err -> + Err; Error -> {protocol_error, internal_error, "Cannot delete ~ts on node '~ts': ~255p ", [rabbit_misc:rs(amqqueue:get_name(Q)), node(), Error]} diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 9cd6887599ca..15a65ff5f986 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -769,7 +769,9 @@ handle_clean_start(_, QoS, State = #state{cfg = #cfg{clean_start = true}}) -> ok -> {ok, SessPresent, State}; {error, access_refused} -> - {error, ?RC_NOT_AUTHORIZED} + {error, ?RC_NOT_AUTHORIZED}; + {error, _Reason} -> + {error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR} end end; handle_clean_start(SessPresent, QoS, @@ -991,7 +993,8 @@ clear_will_msg(#state{cfg = #cfg{vhost = Vhost, QName = #resource{virtual_host = Vhost, kind = queue, name = QNameBin}, case delete_queue(QName, State) of ok -> ok; - {error, access_refused} -> {error, ?RC_NOT_AUTHORIZED} + {error, access_refused} -> {error, ?RC_NOT_AUTHORIZED}; + {error, _Reason} -> {error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR} end. make_will_msg(#mqtt_packet_connect{will_flag = false}) -> @@ -1323,8 +1326,10 @@ ensure_queue(QoS, State) -> case delete_queue(QName, State) of ok -> create_queue(QoS, State); - {error, access_refused} = E -> - E + {error, _} = Err -> + Err; + {protocol_error, _, _, _} = Err -> + {error, Err} end; {error, not_found} -> create_queue(QoS, State) @@ -1829,7 +1834,10 @@ maybe_delete_mqtt_qos0_queue(_) -> ok. -spec delete_queue(rabbit_amqqueue:name(), state()) -> - ok | {error, access_refused}. + ok | + {error, access_refused} | + {error, timeout} | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. delete_queue(QName, #state{auth_state = #auth_state{ user = User = #user{username = Username}, @@ -1841,8 +1849,12 @@ delete_queue(QName, fun (Q) -> case check_resource_access(User, QName, configure, AuthzCtx) of ok -> - {ok, _N} = rabbit_queue_type:delete(Q, false, false, Username), - ok; + case rabbit_queue_type:delete(Q, false, false, Username) of + {ok, _} -> + ok; + Err -> + Err + end; Err -> Err end diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 298dd0766deb..77e59848bec8 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -109,17 +109,16 @@ declare(Q0, _Node) -> boolean(), boolean(), rabbit_types:username()) -> - rabbit_types:ok(non_neg_integer()). + rabbit_types:ok(non_neg_integer()) | + rabbit_types:error(timeout). delete(Q, _IfUnused, _IfEmpty, ActingUser) -> QName = amqqueue:get_name(Q), log_delete(QName, amqqueue:get_exclusive_owner(Q)), case rabbit_amqqueue:internal_delete(Q, ActingUser) of ok -> {ok, 0}; - {error, timeout} -> - {protocol_error, internal_error, - "The operation to delete ~ts from the metadata store timed " - "out", [rabbit_misc:rs(QName)]} + {error, timeout} = Err -> + Err end. -spec deliver([{amqqueue:amqqueue(), stateless}],