Skip to content

Commit

Permalink
Merge pull request #11222 from SimonUnge/move_vhost_limit_check
Browse files Browse the repository at this point in the history
Enforce/honor per-vhost queue limit for all protocols
  • Loading branch information
michaelklishin authored May 21, 2024
2 parents 6047583 + 4a6c009 commit f124bca
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
10 changes: 0 additions & 10 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1013,15 +1013,6 @@ check_msg_size(Content, GCThreshold) ->
Fmt, [Size, MaxMessageSize])
end.

check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
false -> ok;
{true, Limit} -> rabbit_misc:precondition_failed("cannot declare queue '~ts': "
"queue limit in vhost '~ts' (~tp) is reached",
[QueueName, VHost, Limit])

end.

qbin_to_resource(QueueNameBin, VHostPath) ->
name_to_resource(queue, QueueNameBin, VHostPath).

Expand Down Expand Up @@ -2471,7 +2462,6 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{ok, QueueName, MessageCount, ConsumerCount};
{error, not_found} ->
%% enforce the limit for newly declared queues only
check_vhost_queue_limit(QueueName, VHostPath),
DlxKey = <<"x-dead-letter-exchange">>,
case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
undefined ->
Expand Down
30 changes: 29 additions & 1 deletion deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
%%

-module(rabbit_queue_type).
-feature(maybe_expr, enable).

-behaviour(rabbit_registry_class).

Expand Down Expand Up @@ -307,7 +308,12 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) ->
declare(Q0, Node) ->
Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)),
Mod = amqqueue:get_type(Q),
Mod:declare(Q, Node).
case check_queue_limits(Q) of
ok ->
Mod:declare(Q, Node);
Error ->
Error
end.

-spec delete(amqqueue:amqqueue(), boolean(),
boolean(), rabbit_types:username()) ->
Expand Down Expand Up @@ -765,3 +771,25 @@ known_queue_type_names() ->
{QueueTypes, _} = lists:unzip(Registered),
QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes),
?KNOWN_QUEUE_TYPES ++ QTypeBins.

-spec check_queue_limits(amqqueue:amqqueue()) ->
ok |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
check_queue_limits(Q) ->
maybe
%% Prepare for more checks
ok ?= check_vhost_queue_limit(Q)
end.

check_vhost_queue_limit(Q) ->
#resource{name = QueueName} = amqqueue:get_name(Q),
VHost = amqqueue:get_vhost(Q),
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
false ->
ok;
{true, Limit} ->
{protocol_error, precondition_failed,
"cannot declare queue '~ts': "
"queue limit in vhost '~ts' (~tp) is reached",
[QueueName, VHost, Limit]}
end.

0 comments on commit f124bca

Please sign in to comment.